Lines Matching refs:con
133 struct connection *con; member
163 int (*connect)(struct connection *con, struct socket *sock,
212 static void lowcomms_queue_swork(struct connection *con) in lowcomms_queue_swork() argument
214 assert_spin_locked(&con->writequeue_lock); in lowcomms_queue_swork()
216 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_swork()
217 !test_bit(CF_APP_LIMITED, &con->flags) && in lowcomms_queue_swork()
218 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) in lowcomms_queue_swork()
219 queue_work(io_workqueue, &con->swork); in lowcomms_queue_swork()
222 static void lowcomms_queue_rwork(struct connection *con) in lowcomms_queue_rwork() argument
225 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); in lowcomms_queue_rwork()
228 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_rwork()
229 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) in lowcomms_queue_rwork()
230 queue_work(io_workqueue, &con->rwork); in lowcomms_queue_rwork()
252 static struct writequeue_entry *con_next_wq(struct connection *con) in con_next_wq() argument
256 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, in con_next_wq()
269 struct connection *con; in __find_con() local
271 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { in __find_con()
272 if (con->nodeid == nodeid) in __find_con()
273 return con; in __find_con()
279 static void dlm_con_init(struct connection *con, int nodeid) in dlm_con_init() argument
281 con->nodeid = nodeid; in dlm_con_init()
282 init_rwsem(&con->sock_lock); in dlm_con_init()
283 INIT_LIST_HEAD(&con->writequeue); in dlm_con_init()
284 spin_lock_init(&con->writequeue_lock); in dlm_con_init()
285 INIT_WORK(&con->swork, process_send_sockets); in dlm_con_init()
286 INIT_WORK(&con->rwork, process_recv_sockets); in dlm_con_init()
287 spin_lock_init(&con->addrs_lock); in dlm_con_init()
288 init_waitqueue_head(&con->shutdown_wait); in dlm_con_init()
297 struct connection *con, *tmp; in nodeid2con() local
301 con = __find_con(nodeid, r); in nodeid2con()
302 if (con || !alloc) in nodeid2con()
303 return con; in nodeid2con()
305 con = kzalloc(sizeof(*con), alloc); in nodeid2con()
306 if (!con) in nodeid2con()
309 dlm_con_init(con, nodeid); in nodeid2con()
321 kfree(con); in nodeid2con()
325 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
328 return con; in nodeid2con()
364 struct connection *con; in nodeid_to_addr() local
371 con = nodeid2con(nodeid, 0); in nodeid_to_addr()
372 if (!con) { in nodeid_to_addr()
377 spin_lock(&con->addrs_lock); in nodeid_to_addr()
378 if (!con->addr_count) { in nodeid_to_addr()
379 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
384 memcpy(&sas, &con->addr[con->curr_addr_index], in nodeid_to_addr()
388 con->curr_addr_index++; in nodeid_to_addr()
389 if (con->curr_addr_index == con->addr_count) in nodeid_to_addr()
390 con->curr_addr_index = 0; in nodeid_to_addr()
393 *mark = con->mark; in nodeid_to_addr()
394 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
421 struct connection *con; in addr_to_nodeid() local
426 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in addr_to_nodeid()
427 WARN_ON_ONCE(!con->addr_count); in addr_to_nodeid()
429 spin_lock(&con->addrs_lock); in addr_to_nodeid()
430 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { in addr_to_nodeid()
431 if (addr_compare(&con->addr[addr_i], addr)) { in addr_to_nodeid()
432 *nodeid = con->nodeid; in addr_to_nodeid()
433 *mark = con->mark; in addr_to_nodeid()
434 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
439 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
447 static bool dlm_lowcomms_con_has_addr(const struct connection *con, in dlm_lowcomms_con_has_addr() argument
452 for (i = 0; i < con->addr_count; i++) { in dlm_lowcomms_con_has_addr()
453 if (addr_compare(&con->addr[i], addr)) in dlm_lowcomms_con_has_addr()
462 struct connection *con; in dlm_lowcomms_addr() local
467 con = nodeid2con(nodeid, GFP_NOFS); in dlm_lowcomms_addr()
468 if (!con) { in dlm_lowcomms_addr()
473 spin_lock(&con->addrs_lock); in dlm_lowcomms_addr()
474 if (!con->addr_count) { in dlm_lowcomms_addr()
475 memcpy(&con->addr[0], addr, sizeof(*addr)); in dlm_lowcomms_addr()
476 con->addr_count = 1; in dlm_lowcomms_addr()
477 con->mark = dlm_config.ci_mark; in dlm_lowcomms_addr()
478 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
483 ret = dlm_lowcomms_con_has_addr(con, addr); in dlm_lowcomms_addr()
485 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
490 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { in dlm_lowcomms_addr()
491 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
496 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); in dlm_lowcomms_addr()
498 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
505 struct connection *con = sock2con(sk); in lowcomms_data_ready() local
509 set_bit(CF_RECV_INTR, &con->flags); in lowcomms_data_ready()
510 lowcomms_queue_rwork(con); in lowcomms_data_ready()
515 struct connection *con = sock2con(sk); in lowcomms_write_space() local
517 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
519 spin_lock_bh(&con->writequeue_lock); in lowcomms_write_space()
520 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
521 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
522 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
525 lowcomms_queue_swork(con); in lowcomms_write_space()
526 spin_unlock_bh(&con->writequeue_lock); in lowcomms_write_space()
547 struct connection *con; in dlm_lowcomms_connect_node() local
551 con = nodeid2con(nodeid, 0); in dlm_lowcomms_connect_node()
552 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_connect_node()
557 down_read(&con->sock_lock); in dlm_lowcomms_connect_node()
558 if (!con->sock) { in dlm_lowcomms_connect_node()
559 spin_lock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
560 lowcomms_queue_swork(con); in dlm_lowcomms_connect_node()
561 spin_unlock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
563 up_read(&con->sock_lock); in dlm_lowcomms_connect_node()
572 struct connection *con; in dlm_lowcomms_nodes_set_mark() local
576 con = nodeid2con(nodeid, 0); in dlm_lowcomms_nodes_set_mark()
577 if (!con) { in dlm_lowcomms_nodes_set_mark()
582 spin_lock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
583 con->mark = mark; in dlm_lowcomms_nodes_set_mark()
584 spin_unlock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
591 struct connection *con = sock2con(sk); in lowcomms_error_report() local
600 con->nodeid, &inet->inet_daddr, in lowcomms_error_report()
609 con->nodeid, &sk->sk_v6_daddr, in lowcomms_error_report()
623 dlm_midcomms_unack_msg_resend(con->nodeid); in lowcomms_error_report()
642 static void add_sock(struct socket *sock, struct connection *con) in add_sock() argument
647 con->sock = sock; in add_sock()
649 sk->sk_user_data = con; in add_sock()
724 static void allow_connection_io(struct connection *con) in allow_connection_io() argument
726 if (con->othercon) in allow_connection_io()
727 clear_bit(CF_IO_STOP, &con->othercon->flags); in allow_connection_io()
728 clear_bit(CF_IO_STOP, &con->flags); in allow_connection_io()
731 static void stop_connection_io(struct connection *con) in stop_connection_io() argument
733 if (con->othercon) in stop_connection_io()
734 stop_connection_io(con->othercon); in stop_connection_io()
736 spin_lock_bh(&con->writequeue_lock); in stop_connection_io()
737 set_bit(CF_IO_STOP, &con->flags); in stop_connection_io()
738 spin_unlock_bh(&con->writequeue_lock); in stop_connection_io()
740 down_write(&con->sock_lock); in stop_connection_io()
741 if (con->sock) { in stop_connection_io()
742 lock_sock(con->sock->sk); in stop_connection_io()
743 restore_callbacks(con->sock->sk); in stop_connection_io()
744 release_sock(con->sock->sk); in stop_connection_io()
746 up_write(&con->sock_lock); in stop_connection_io()
748 cancel_work_sync(&con->swork); in stop_connection_io()
749 cancel_work_sync(&con->rwork); in stop_connection_io()
753 static void close_connection(struct connection *con, bool and_other) in close_connection() argument
757 if (con->othercon && and_other) in close_connection()
758 close_connection(con->othercon, false); in close_connection()
760 down_write(&con->sock_lock); in close_connection()
761 if (!con->sock) { in close_connection()
762 up_write(&con->sock_lock); in close_connection()
766 dlm_close_sock(&con->sock); in close_connection()
779 spin_lock_bh(&con->writequeue_lock); in close_connection()
780 if (!list_empty(&con->writequeue)) { in close_connection()
781 e = list_first_entry(&con->writequeue, struct writequeue_entry, in close_connection()
786 spin_unlock_bh(&con->writequeue_lock); in close_connection()
788 con->rx_leftover = 0; in close_connection()
789 con->retries = 0; in close_connection()
790 clear_bit(CF_APP_LIMITED, &con->flags); in close_connection()
791 clear_bit(CF_RECV_PENDING, &con->flags); in close_connection()
792 clear_bit(CF_SEND_PENDING, &con->flags); in close_connection()
793 up_write(&con->sock_lock); in close_connection()
796 static void shutdown_connection(struct connection *con, bool and_other) in shutdown_connection() argument
800 if (con->othercon && and_other) in shutdown_connection()
801 shutdown_connection(con->othercon, false); in shutdown_connection()
804 down_read(&con->sock_lock); in shutdown_connection()
806 if (!con->sock) { in shutdown_connection()
807 up_read(&con->sock_lock); in shutdown_connection()
811 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
812 up_read(&con->sock_lock); in shutdown_connection()
815 con, ret); in shutdown_connection()
818 ret = wait_event_timeout(con->shutdown_wait, !con->sock, in shutdown_connection()
822 con); in shutdown_connection()
830 close_connection(con, false); in shutdown_connection()
900 static int receive_from_sock(struct connection *con, int buflen) in receive_from_sock() argument
907 pentry = new_processqueue_entry(con->nodeid, buflen); in receive_from_sock()
911 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); in receive_from_sock()
916 iov.iov_base = pentry->buf + con->rx_leftover; in receive_from_sock()
917 iov.iov_len = buflen - con->rx_leftover; in receive_from_sock()
921 clear_bit(CF_RECV_INTR, &con->flags); in receive_from_sock()
923 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
925 trace_dlm_recv(con->nodeid, ret); in receive_from_sock()
927 lock_sock(con->sock->sk); in receive_from_sock()
928 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { in receive_from_sock()
929 release_sock(con->sock->sk); in receive_from_sock()
933 clear_bit(CF_RECV_PENDING, &con->flags); in receive_from_sock()
934 release_sock(con->sock->sk); in receive_from_sock()
947 buflen_real = ret + con->rx_leftover; in receive_from_sock()
948 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, in receive_from_sock()
961 con->rx_leftover = buflen_real - ret; in receive_from_sock()
962 memmove(con->rx_leftover_buf, pentry->buf + ret, in receive_from_sock()
963 con->rx_leftover); in receive_from_sock()
1160 static struct writequeue_entry *new_writequeue_entry(struct connection *con) in new_writequeue_entry() argument
1178 entry->con = con; in new_writequeue_entry()
1184 static struct writequeue_entry *new_wq_entry(struct connection *con, int len, in new_wq_entry() argument
1190 spin_lock_bh(&con->writequeue_lock); in new_wq_entry()
1191 if (!list_empty(&con->writequeue)) { in new_wq_entry()
1192 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); in new_wq_entry()
1206 e = new_writequeue_entry(con); in new_wq_entry()
1216 list_add_tail(&e->list, &con->writequeue); in new_wq_entry()
1219 spin_unlock_bh(&con->writequeue_lock); in new_wq_entry()
1223 static struct dlm_msg *dlm_lowcomms_new_msg_con(struct connection *con, int len, in dlm_lowcomms_new_msg_con() argument
1237 e = new_wq_entry(con, len, ppc, cb, data); in dlm_lowcomms_new_msg_con()
1260 struct connection *con; in dlm_lowcomms_new_msg() local
1273 con = nodeid2con(nodeid, 0); in dlm_lowcomms_new_msg()
1274 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_new_msg()
1279 msg = dlm_lowcomms_new_msg_con(con, len, allocation, ppc, cb, data); in dlm_lowcomms_new_msg()
1296 struct connection *con = e->con; in _dlm_lowcomms_commit_msg() local
1299 spin_lock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1309 lowcomms_queue_swork(con); in _dlm_lowcomms_commit_msg()
1312 spin_unlock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1343 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, in dlm_lowcomms_resend_msg()
1360 static int send_to_sock(struct connection *con) in send_to_sock() argument
1369 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1370 e = con_next_wq(con); in send_to_sock()
1372 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1373 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1380 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1384 ret = sock_sendmsg(con->sock, &msg); in send_to_sock()
1385 trace_dlm_send(con->nodeid, ret); in send_to_sock()
1387 lock_sock(con->sock->sk); in send_to_sock()
1388 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1389 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1390 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1394 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); in send_to_sock()
1395 con->sock->sk->sk_write_pending++; in send_to_sock()
1397 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1398 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1399 release_sock(con->sock->sk); in send_to_sock()
1404 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1405 release_sock(con->sock->sk); in send_to_sock()
1412 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1414 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1419 static void clean_one_writequeue(struct connection *con) in clean_one_writequeue() argument
1423 spin_lock_bh(&con->writequeue_lock); in clean_one_writequeue()
1424 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1427 spin_unlock_bh(&con->writequeue_lock); in clean_one_writequeue()
1432 struct connection *con = container_of(rcu, struct connection, rcu); in connection_release() local
1434 WARN_ON_ONCE(!list_empty(&con->writequeue)); in connection_release()
1435 WARN_ON_ONCE(con->sock); in connection_release()
1436 kfree(con); in connection_release()
1443 struct connection *con; in dlm_lowcomms_close() local
1449 con = nodeid2con(nodeid, 0); in dlm_lowcomms_close()
1450 if (WARN_ON_ONCE(!con)) { in dlm_lowcomms_close()
1455 stop_connection_io(con); in dlm_lowcomms_close()
1457 close_connection(con, true); in dlm_lowcomms_close()
1460 hlist_del_rcu(&con->list); in dlm_lowcomms_close()
1463 clean_one_writequeue(con); in dlm_lowcomms_close()
1464 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_close()
1465 if (con->othercon) { in dlm_lowcomms_close()
1466 clean_one_writequeue(con->othercon); in dlm_lowcomms_close()
1467 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); in dlm_lowcomms_close()
1483 struct connection *con = container_of(work, struct connection, rwork); in process_recv_sockets() local
1486 down_read(&con->sock_lock); in process_recv_sockets()
1487 if (!con->sock) { in process_recv_sockets()
1488 up_read(&con->sock_lock); in process_recv_sockets()
1494 ret = receive_from_sock(con, buflen); in process_recv_sockets()
1496 up_read(&con->sock_lock); in process_recv_sockets()
1503 close_connection(con, false); in process_recv_sockets()
1504 wake_up(&con->shutdown_wait); in process_recv_sockets()
1509 queue_work(io_workqueue, &con->rwork); in process_recv_sockets()
1514 if (test_bit(CF_IS_OTHERCON, &con->flags)) { in process_recv_sockets()
1515 close_connection(con, false); in process_recv_sockets()
1517 spin_lock_bh(&con->writequeue_lock); in process_recv_sockets()
1518 lowcomms_queue_swork(con); in process_recv_sockets()
1519 spin_unlock_bh(&con->writequeue_lock); in process_recv_sockets()
1549 static int dlm_connect(struct connection *con) in dlm_connect() argument
1557 result = nodeid_to_addr(con->nodeid, &addr, NULL, in dlm_connect()
1560 log_print("no address for nodeid %d", con->nodeid); in dlm_connect()
1579 add_sock(sock, con); in dlm_connect()
1581 log_print_ratelimited("connecting to %d", con->nodeid); in dlm_connect()
1583 result = dlm_proto_ops->connect(con, sock, (struct sockaddr *)&addr, in dlm_connect()
1593 dlm_close_sock(&con->sock); in dlm_connect()
1604 struct connection *con = container_of(work, struct connection, swork); in process_send_sockets() local
1607 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); in process_send_sockets()
1609 down_read(&con->sock_lock); in process_send_sockets()
1610 if (!con->sock) { in process_send_sockets()
1611 up_read(&con->sock_lock); in process_send_sockets()
1612 down_write(&con->sock_lock); in process_send_sockets()
1613 if (!con->sock) { in process_send_sockets()
1614 ret = dlm_connect(con); in process_send_sockets()
1627 up_write(&con->sock_lock); in process_send_sockets()
1629 con->nodeid, con->retries++, ret); in process_send_sockets()
1636 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1640 downgrade_write(&con->sock_lock); in process_send_sockets()
1644 ret = send_to_sock(con); in process_send_sockets()
1646 up_read(&con->sock_lock); in process_send_sockets()
1655 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1659 close_connection(con, false); in process_send_sockets()
1662 spin_lock_bh(&con->writequeue_lock); in process_send_sockets()
1663 lowcomms_queue_swork(con); in process_send_sockets()
1664 spin_unlock_bh(&con->writequeue_lock); in process_send_sockets()
1712 struct connection *con; in dlm_lowcomms_shutdown() local
1725 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in dlm_lowcomms_shutdown()
1726 shutdown_connection(con, true); in dlm_lowcomms_shutdown()
1727 stop_connection_io(con); in dlm_lowcomms_shutdown()
1729 close_connection(con, true); in dlm_lowcomms_shutdown()
1731 clean_one_writequeue(con); in dlm_lowcomms_shutdown()
1732 if (con->othercon) in dlm_lowcomms_shutdown()
1733 clean_one_writequeue(con->othercon); in dlm_lowcomms_shutdown()
1734 allow_connection_io(con); in dlm_lowcomms_shutdown()
1819 static int dlm_tcp_connect(struct connection *con, struct socket *sock, in dlm_tcp_connect() argument
1874 static int dlm_sctp_connect(struct connection *con, struct socket *sock, in dlm_sctp_connect() argument
1981 struct connection *con; in dlm_lowcomms_exit() local
1986 hlist_for_each_entry_rcu(con, &connection_hash[i], list) { in dlm_lowcomms_exit()
1988 hlist_del_rcu(&con->list); in dlm_lowcomms_exit()
1991 if (con->othercon) in dlm_lowcomms_exit()
1992 call_srcu(&connections_srcu, &con->othercon->rcu, in dlm_lowcomms_exit()
1994 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_exit()