1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* RxRPC Tx data buffering. 3 * 4 * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved. 5 * Written by David Howells (dhowells@redhat.com) 6 */ 7 8 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt 9 10 #include <linux/slab.h> 11 #include "ar-internal.h" 12 13 static atomic_t rxrpc_txbuf_debug_ids; 14 atomic_t rxrpc_nr_txbuf; 15 16 /* 17 * Allocate and partially initialise an I/O request structure. 18 */ 19 struct rxrpc_txbuf *rxrpc_alloc_txbuf(struct rxrpc_call *call, u8 packet_type, 20 gfp_t gfp) 21 { 22 struct rxrpc_txbuf *txb; 23 24 txb = kmalloc(sizeof(*txb), gfp); 25 if (txb) { 26 INIT_LIST_HEAD(&txb->call_link); 27 INIT_LIST_HEAD(&txb->tx_link); 28 refcount_set(&txb->ref, 1); 29 txb->call_debug_id = call->debug_id; 30 txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids); 31 txb->space = sizeof(txb->data); 32 txb->len = 0; 33 txb->offset = 0; 34 txb->flags = 0; 35 txb->ack_why = 0; 36 txb->seq = call->tx_prepared + 1; 37 txb->wire.epoch = htonl(call->conn->proto.epoch); 38 txb->wire.cid = htonl(call->cid); 39 txb->wire.callNumber = htonl(call->call_id); 40 txb->wire.seq = htonl(txb->seq); 41 txb->wire.type = packet_type; 42 txb->wire.flags = call->conn->out_clientflag; 43 txb->wire.userStatus = 0; 44 txb->wire.securityIndex = call->security_ix; 45 txb->wire._rsvd = 0; 46 txb->wire.serviceId = htons(call->dest_srx.srx_service); 47 48 trace_rxrpc_txbuf(txb->debug_id, 49 txb->call_debug_id, txb->seq, 1, 50 packet_type == RXRPC_PACKET_TYPE_DATA ? 51 rxrpc_txbuf_alloc_data : 52 rxrpc_txbuf_alloc_ack); 53 atomic_inc(&rxrpc_nr_txbuf); 54 } 55 56 return txb; 57 } 58 59 void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 60 { 61 int r; 62 63 __refcount_inc(&txb->ref, &r); 64 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what); 65 } 66 67 void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 68 { 69 int r = refcount_read(&txb->ref); 70 71 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what); 72 } 73 74 static void rxrpc_free_txbuf(struct rcu_head *rcu) 75 { 76 struct rxrpc_txbuf *txb = container_of(rcu, struct rxrpc_txbuf, rcu); 77 78 trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0, 79 rxrpc_txbuf_free); 80 kfree(txb); 81 atomic_dec(&rxrpc_nr_txbuf); 82 } 83 84 void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what) 85 { 86 unsigned int debug_id, call_debug_id; 87 rxrpc_seq_t seq; 88 bool dead; 89 int r; 90 91 if (txb) { 92 debug_id = txb->debug_id; 93 call_debug_id = txb->call_debug_id; 94 seq = txb->seq; 95 dead = __refcount_dec_and_test(&txb->ref, &r); 96 trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what); 97 if (dead) 98 call_rcu(&txb->rcu, rxrpc_free_txbuf); 99 } 100 } 101 102 /* 103 * Shrink the transmit buffer. 104 */ 105 void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call) 106 { 107 struct rxrpc_txbuf *txb; 108 rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack); 109 bool wake = false; 110 111 _enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top); 112 113 for (;;) { 114 spin_lock(&call->tx_lock); 115 txb = list_first_entry_or_null(&call->tx_buffer, 116 struct rxrpc_txbuf, call_link); 117 if (!txb) 118 break; 119 hard_ack = smp_load_acquire(&call->acks_hard_ack); 120 if (before(hard_ack, txb->seq)) 121 break; 122 123 if (txb->seq != call->tx_bottom + 1) 124 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step); 125 ASSERTCMP(txb->seq, ==, call->tx_bottom + 1); 126 smp_store_release(&call->tx_bottom, call->tx_bottom + 1); 127 list_del_rcu(&txb->call_link); 128 129 trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue); 130 131 spin_unlock(&call->tx_lock); 132 133 rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated); 134 if (after(call->acks_hard_ack, call->tx_bottom + 128)) 135 wake = true; 136 } 137 138 spin_unlock(&call->tx_lock); 139 140 if (wake) 141 wake_up(&call->waitq); 142 } 143