1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved. 5 ** 6 ** This copyrighted material is made available to anyone wishing to use, 7 ** modify, copy, or redistribute it subject to the terms and conditions 8 ** of the GNU General Public License v.2. 9 ** 10 ******************************************************************************* 11 ******************************************************************************/ 12 13 #include "dlm_internal.h" 14 #include "member.h" 15 #include "lock.h" 16 #include "dir.h" 17 #include "config.h" 18 #include "requestqueue.h" 19 20 struct rq_entry { 21 struct list_head list; 22 int nodeid; 23 struct dlm_message request; 24 }; 25 26 /* 27 * Requests received while the lockspace is in recovery get added to the 28 * request queue and processed when recovery is complete. This happens when 29 * the lockspace is suspended on some nodes before it is on others, or the 30 * lockspace is enabled on some while still suspended on others. 31 */ 32 33 void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms) 34 { 35 struct rq_entry *e; 36 int length = ms->m_header.h_length - sizeof(struct dlm_message); 37 38 e = kmalloc(sizeof(struct rq_entry) + length, GFP_NOFS); 39 if (!e) { 40 log_print("dlm_add_requestqueue: out of memory len %d", length); 41 return; 42 } 43 44 e->nodeid = nodeid; 45 memcpy(&e->request, ms, ms->m_header.h_length); 46 47 mutex_lock(&ls->ls_requestqueue_mutex); 48 list_add_tail(&e->list, &ls->ls_requestqueue); 49 mutex_unlock(&ls->ls_requestqueue_mutex); 50 } 51 52 /* 53 * Called by dlm_recoverd to process normal messages saved while recovery was 54 * happening. Normal locking has been enabled before this is called. dlm_recv 55 * upon receiving a message, will wait for all saved messages to be drained 56 * here before processing the message it got. If a new dlm_ls_stop() arrives 57 * while we're processing these saved messages, it may block trying to suspend 58 * dlm_recv if dlm_recv is waiting for us in dlm_wait_requestqueue. In that 59 * case, we don't abort since locking_stopped is still 0. If dlm_recv is not 60 * waiting for us, then this processing may be aborted due to locking_stopped. 61 */ 62 63 int dlm_process_requestqueue(struct dlm_ls *ls) 64 { 65 struct rq_entry *e; 66 int error = 0; 67 68 mutex_lock(&ls->ls_requestqueue_mutex); 69 70 for (;;) { 71 if (list_empty(&ls->ls_requestqueue)) { 72 mutex_unlock(&ls->ls_requestqueue_mutex); 73 error = 0; 74 break; 75 } 76 e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); 77 mutex_unlock(&ls->ls_requestqueue_mutex); 78 79 dlm_receive_message_saved(ls, &e->request); 80 81 mutex_lock(&ls->ls_requestqueue_mutex); 82 list_del(&e->list); 83 kfree(e); 84 85 if (dlm_locking_stopped(ls)) { 86 log_debug(ls, "process_requestqueue abort running"); 87 mutex_unlock(&ls->ls_requestqueue_mutex); 88 error = -EINTR; 89 break; 90 } 91 schedule(); 92 } 93 94 return error; 95 } 96 97 /* 98 * After recovery is done, locking is resumed and dlm_recoverd takes all the 99 * saved requests and processes them as they would have been by dlm_recv. At 100 * the same time, dlm_recv will start receiving new requests from remote nodes. 101 * We want to delay dlm_recv processing new requests until dlm_recoverd has 102 * finished processing the old saved requests. We don't check for locking 103 * stopped here because dlm_ls_stop won't stop locking until it's suspended us 104 * (dlm_recv). 105 */ 106 107 void dlm_wait_requestqueue(struct dlm_ls *ls) 108 { 109 for (;;) { 110 mutex_lock(&ls->ls_requestqueue_mutex); 111 if (list_empty(&ls->ls_requestqueue)) 112 break; 113 mutex_unlock(&ls->ls_requestqueue_mutex); 114 schedule(); 115 } 116 mutex_unlock(&ls->ls_requestqueue_mutex); 117 } 118 119 static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) 120 { 121 uint32_t type = ms->m_type; 122 123 /* the ls is being cleaned up and freed by release_lockspace */ 124 if (!ls->ls_count) 125 return 1; 126 127 if (dlm_is_removed(ls, nodeid)) 128 return 1; 129 130 /* directory operations are always purged because the directory is 131 always rebuilt during recovery and the lookups resent */ 132 133 if (type == DLM_MSG_REMOVE || 134 type == DLM_MSG_LOOKUP || 135 type == DLM_MSG_LOOKUP_REPLY) 136 return 1; 137 138 if (!dlm_no_directory(ls)) 139 return 0; 140 141 /* with no directory, the master is likely to change as a part of 142 recovery; requests to/from the defunct master need to be purged */ 143 144 switch (type) { 145 case DLM_MSG_REQUEST: 146 case DLM_MSG_CONVERT: 147 case DLM_MSG_UNLOCK: 148 case DLM_MSG_CANCEL: 149 /* we're no longer the master of this resource, the sender 150 will resend to the new master (see waiter_needs_recovery) */ 151 152 if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid()) 153 return 1; 154 break; 155 156 case DLM_MSG_REQUEST_REPLY: 157 case DLM_MSG_CONVERT_REPLY: 158 case DLM_MSG_UNLOCK_REPLY: 159 case DLM_MSG_CANCEL_REPLY: 160 case DLM_MSG_GRANT: 161 /* this reply is from the former master of the resource, 162 we'll resend to the new master if needed */ 163 164 if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid) 165 return 1; 166 break; 167 } 168 169 return 0; 170 } 171 172 void dlm_purge_requestqueue(struct dlm_ls *ls) 173 { 174 struct dlm_message *ms; 175 struct rq_entry *e, *safe; 176 177 mutex_lock(&ls->ls_requestqueue_mutex); 178 list_for_each_entry_safe(e, safe, &ls->ls_requestqueue, list) { 179 ms = &e->request; 180 181 if (purge_request(ls, ms, e->nodeid)) { 182 list_del(&e->list); 183 kfree(e); 184 } 185 } 186 mutex_unlock(&ls->ls_requestqueue_mutex); 187 } 188 189