1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 #include "dlm_internal.h" 15 #include "lock.h" 16 #include "user.h" 17 18 static uint64_t dlm_cb_seq; 19 static spinlock_t dlm_cb_seq_spin; 20 21 static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) 22 { 23 int i; 24 25 log_print("last_bast %x %llu flags %x mode %d sb %d %x", 26 lkb->lkb_id, 27 (unsigned long long)lkb->lkb_last_bast.seq, 28 lkb->lkb_last_bast.flags, 29 lkb->lkb_last_bast.mode, 30 lkb->lkb_last_bast.sb_status, 31 lkb->lkb_last_bast.sb_flags); 32 33 log_print("last_cast %x %llu flags %x mode %d sb %d %x", 34 lkb->lkb_id, 35 (unsigned long long)lkb->lkb_last_cast.seq, 36 lkb->lkb_last_cast.flags, 37 lkb->lkb_last_cast.mode, 38 lkb->lkb_last_cast.sb_status, 39 lkb->lkb_last_cast.sb_flags); 40 41 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 42 log_print("cb %x %llu flags %x mode %d sb %d %x", 43 lkb->lkb_id, 44 (unsigned long long)lkb->lkb_callbacks[i].seq, 45 lkb->lkb_callbacks[i].flags, 46 lkb->lkb_callbacks[i].mode, 47 lkb->lkb_callbacks[i].sb_status, 48 lkb->lkb_callbacks[i].sb_flags); 49 } 50 } 51 52 int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 53 int status, uint32_t sbflags, uint64_t seq) 54 { 55 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 56 uint64_t prev_seq; 57 int prev_mode; 58 int i, rv; 59 60 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 61 if (lkb->lkb_callbacks[i].seq) 62 continue; 63 64 /* 65 * Suppress some redundant basts here, do more on removal. 66 * Don't even add a bast if the callback just before it 67 * is a bast for the same mode or a more restrictive mode. 68 * (the addional > PR check is needed for PR/CW inversion) 69 */ 70 71 if ((i > 0) && (flags & DLM_CB_BAST) && 72 (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) { 73 74 prev_seq = lkb->lkb_callbacks[i-1].seq; 75 prev_mode = lkb->lkb_callbacks[i-1].mode; 76 77 if ((prev_mode == mode) || 78 (prev_mode > mode && prev_mode > DLM_LOCK_PR)) { 79 80 log_debug(ls, "skip %x add bast %llu mode %d " 81 "for bast %llu mode %d", 82 lkb->lkb_id, 83 (unsigned long long)seq, 84 mode, 85 (unsigned long long)prev_seq, 86 prev_mode); 87 rv = 0; 88 goto out; 89 } 90 } 91 92 lkb->lkb_callbacks[i].seq = seq; 93 lkb->lkb_callbacks[i].flags = flags; 94 lkb->lkb_callbacks[i].mode = mode; 95 lkb->lkb_callbacks[i].sb_status = status; 96 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF); 97 rv = 0; 98 break; 99 } 100 101 if (i == DLM_CALLBACKS_SIZE) { 102 log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x", 103 lkb->lkb_id, (unsigned long long)seq, 104 flags, mode, status, sbflags); 105 dlm_dump_lkb_callbacks(lkb); 106 rv = -1; 107 goto out; 108 } 109 out: 110 return rv; 111 } 112 113 int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, 114 struct dlm_callback *cb, int *resid) 115 { 116 int i, rv; 117 118 *resid = 0; 119 120 if (!lkb->lkb_callbacks[0].seq) { 121 rv = -ENOENT; 122 goto out; 123 } 124 125 /* oldest undelivered cb is callbacks[0] */ 126 127 memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback)); 128 memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback)); 129 130 /* shift others down */ 131 132 for (i = 1; i < DLM_CALLBACKS_SIZE; i++) { 133 if (!lkb->lkb_callbacks[i].seq) 134 break; 135 memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i], 136 sizeof(struct dlm_callback)); 137 memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback)); 138 (*resid)++; 139 } 140 141 /* if cb is a bast, it should be skipped if the blocking mode is 142 compatible with the last granted mode */ 143 144 if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) { 145 if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) { 146 cb->flags |= DLM_CB_SKIP; 147 148 log_debug(ls, "skip %x bast %llu mode %d " 149 "for cast %llu mode %d", 150 lkb->lkb_id, 151 (unsigned long long)cb->seq, 152 cb->mode, 153 (unsigned long long)lkb->lkb_last_cast.seq, 154 lkb->lkb_last_cast.mode); 155 rv = 0; 156 goto out; 157 } 158 } 159 160 if (cb->flags & DLM_CB_CAST) { 161 memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback)); 162 lkb->lkb_last_cast_time = ktime_get(); 163 } 164 165 if (cb->flags & DLM_CB_BAST) { 166 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback)); 167 lkb->lkb_last_bast_time = ktime_get(); 168 } 169 rv = 0; 170 out: 171 return rv; 172 } 173 174 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 175 uint32_t sbflags) 176 { 177 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 178 uint64_t new_seq, prev_seq; 179 int rv; 180 181 spin_lock(&dlm_cb_seq_spin); 182 new_seq = ++dlm_cb_seq; 183 spin_unlock(&dlm_cb_seq_spin); 184 185 if (lkb->lkb_flags & DLM_IFL_USER) { 186 dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq); 187 return; 188 } 189 190 mutex_lock(&lkb->lkb_cb_mutex); 191 prev_seq = lkb->lkb_callbacks[0].seq; 192 193 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq); 194 if (rv < 0) 195 goto out; 196 197 if (!prev_seq) { 198 kref_get(&lkb->lkb_ref); 199 200 if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { 201 mutex_lock(&ls->ls_cb_mutex); 202 list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay); 203 mutex_unlock(&ls->ls_cb_mutex); 204 } else { 205 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); 206 } 207 } 208 out: 209 mutex_unlock(&lkb->lkb_cb_mutex); 210 } 211 212 void dlm_callback_work(struct work_struct *work) 213 { 214 struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work); 215 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 216 void (*castfn) (void *astparam); 217 void (*bastfn) (void *astparam, int mode); 218 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE]; 219 int i, rv, resid; 220 221 memset(&callbacks, 0, sizeof(callbacks)); 222 223 mutex_lock(&lkb->lkb_cb_mutex); 224 if (!lkb->lkb_callbacks[0].seq) { 225 /* no callback work exists, shouldn't happen */ 226 log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id); 227 dlm_print_lkb(lkb); 228 dlm_dump_lkb_callbacks(lkb); 229 } 230 231 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 232 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid); 233 if (rv < 0) 234 break; 235 } 236 237 if (resid) { 238 /* cbs remain, loop should have removed all, shouldn't happen */ 239 log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id, 240 resid); 241 dlm_print_lkb(lkb); 242 dlm_dump_lkb_callbacks(lkb); 243 } 244 mutex_unlock(&lkb->lkb_cb_mutex); 245 246 castfn = lkb->lkb_astfn; 247 bastfn = lkb->lkb_bastfn; 248 249 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 250 if (!callbacks[i].seq) 251 break; 252 if (callbacks[i].flags & DLM_CB_SKIP) { 253 continue; 254 } else if (callbacks[i].flags & DLM_CB_BAST) { 255 bastfn(lkb->lkb_astparam, callbacks[i].mode); 256 } else if (callbacks[i].flags & DLM_CB_CAST) { 257 lkb->lkb_lksb->sb_status = callbacks[i].sb_status; 258 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags; 259 castfn(lkb->lkb_astparam); 260 } 261 } 262 263 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */ 264 dlm_put_lkb(lkb); 265 } 266 267 int dlm_callback_start(struct dlm_ls *ls) 268 { 269 ls->ls_callback_wq = alloc_workqueue("dlm_callback", 270 WQ_UNBOUND | 271 WQ_MEM_RECLAIM | 272 WQ_NON_REENTRANT, 273 0); 274 if (!ls->ls_callback_wq) { 275 log_print("can't start dlm_callback workqueue"); 276 return -ENOMEM; 277 } 278 return 0; 279 } 280 281 void dlm_callback_stop(struct dlm_ls *ls) 282 { 283 if (ls->ls_callback_wq) 284 destroy_workqueue(ls->ls_callback_wq); 285 } 286 287 void dlm_callback_suspend(struct dlm_ls *ls) 288 { 289 set_bit(LSFL_CB_DELAY, &ls->ls_flags); 290 291 if (ls->ls_callback_wq) 292 flush_workqueue(ls->ls_callback_wq); 293 } 294 295 void dlm_callback_resume(struct dlm_ls *ls) 296 { 297 struct dlm_lkb *lkb, *safe; 298 int count = 0; 299 300 clear_bit(LSFL_CB_DELAY, &ls->ls_flags); 301 302 if (!ls->ls_callback_wq) 303 return; 304 305 mutex_lock(&ls->ls_cb_mutex); 306 list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) { 307 list_del_init(&lkb->lkb_cb_list); 308 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); 309 count++; 310 } 311 mutex_unlock(&ls->ls_cb_mutex); 312 313 if (count) 314 log_debug(ls, "dlm_callback_resume %d", count); 315 } 316 317