1 // SPDX-License-Identifier: GPL-2.0-only 2 /****************************************************************************** 3 ******************************************************************************* 4 ** 5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 6 ** Copyright (C) 2004-2010 Red Hat, Inc. All rights reserved. 7 ** 8 ** 9 ******************************************************************************* 10 ******************************************************************************/ 11 12 #include "dlm_internal.h" 13 #include "lock.h" 14 #include "user.h" 15 #include "ast.h" 16 17 static uint64_t dlm_cb_seq; 18 static DEFINE_SPINLOCK(dlm_cb_seq_spin); 19 20 static void dlm_dump_lkb_callbacks(struct dlm_lkb *lkb) 21 { 22 int i; 23 24 log_print("last_bast %x %llu flags %x mode %d sb %d %x", 25 lkb->lkb_id, 26 (unsigned long long)lkb->lkb_last_bast.seq, 27 lkb->lkb_last_bast.flags, 28 lkb->lkb_last_bast.mode, 29 lkb->lkb_last_bast.sb_status, 30 lkb->lkb_last_bast.sb_flags); 31 32 log_print("last_cast %x %llu flags %x mode %d sb %d %x", 33 lkb->lkb_id, 34 (unsigned long long)lkb->lkb_last_cast.seq, 35 lkb->lkb_last_cast.flags, 36 lkb->lkb_last_cast.mode, 37 lkb->lkb_last_cast.sb_status, 38 lkb->lkb_last_cast.sb_flags); 39 40 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 41 log_print("cb %x %llu flags %x mode %d sb %d %x", 42 lkb->lkb_id, 43 (unsigned long long)lkb->lkb_callbacks[i].seq, 44 lkb->lkb_callbacks[i].flags, 45 lkb->lkb_callbacks[i].mode, 46 lkb->lkb_callbacks[i].sb_status, 47 lkb->lkb_callbacks[i].sb_flags); 48 } 49 } 50 51 int dlm_add_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode, 52 int status, uint32_t sbflags, uint64_t seq) 53 { 54 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 55 uint64_t prev_seq; 56 int prev_mode; 57 int i, rv; 58 59 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 60 if (lkb->lkb_callbacks[i].seq) 61 continue; 62 63 /* 64 * Suppress some redundant basts here, do more on removal. 65 * Don't even add a bast if the callback just before it 66 * is a bast for the same mode or a more restrictive mode. 67 * (the addional > PR check is needed for PR/CW inversion) 68 */ 69 70 if ((i > 0) && (flags & DLM_CB_BAST) && 71 (lkb->lkb_callbacks[i-1].flags & DLM_CB_BAST)) { 72 73 prev_seq = lkb->lkb_callbacks[i-1].seq; 74 prev_mode = lkb->lkb_callbacks[i-1].mode; 75 76 if ((prev_mode == mode) || 77 (prev_mode > mode && prev_mode > DLM_LOCK_PR)) { 78 79 log_debug(ls, "skip %x add bast %llu mode %d " 80 "for bast %llu mode %d", 81 lkb->lkb_id, 82 (unsigned long long)seq, 83 mode, 84 (unsigned long long)prev_seq, 85 prev_mode); 86 rv = 0; 87 goto out; 88 } 89 } 90 91 lkb->lkb_callbacks[i].seq = seq; 92 lkb->lkb_callbacks[i].flags = flags; 93 lkb->lkb_callbacks[i].mode = mode; 94 lkb->lkb_callbacks[i].sb_status = status; 95 lkb->lkb_callbacks[i].sb_flags = (sbflags & 0x000000FF); 96 rv = 0; 97 break; 98 } 99 100 if (i == DLM_CALLBACKS_SIZE) { 101 log_error(ls, "no callbacks %x %llu flags %x mode %d sb %d %x", 102 lkb->lkb_id, (unsigned long long)seq, 103 flags, mode, status, sbflags); 104 dlm_dump_lkb_callbacks(lkb); 105 rv = -1; 106 goto out; 107 } 108 out: 109 return rv; 110 } 111 112 int dlm_rem_lkb_callback(struct dlm_ls *ls, struct dlm_lkb *lkb, 113 struct dlm_callback *cb, int *resid) 114 { 115 int i, rv; 116 117 *resid = 0; 118 119 if (!lkb->lkb_callbacks[0].seq) { 120 rv = -ENOENT; 121 goto out; 122 } 123 124 /* oldest undelivered cb is callbacks[0] */ 125 126 memcpy(cb, &lkb->lkb_callbacks[0], sizeof(struct dlm_callback)); 127 memset(&lkb->lkb_callbacks[0], 0, sizeof(struct dlm_callback)); 128 129 /* shift others down */ 130 131 for (i = 1; i < DLM_CALLBACKS_SIZE; i++) { 132 if (!lkb->lkb_callbacks[i].seq) 133 break; 134 memcpy(&lkb->lkb_callbacks[i-1], &lkb->lkb_callbacks[i], 135 sizeof(struct dlm_callback)); 136 memset(&lkb->lkb_callbacks[i], 0, sizeof(struct dlm_callback)); 137 (*resid)++; 138 } 139 140 /* if cb is a bast, it should be skipped if the blocking mode is 141 compatible with the last granted mode */ 142 143 if ((cb->flags & DLM_CB_BAST) && lkb->lkb_last_cast.seq) { 144 if (dlm_modes_compat(cb->mode, lkb->lkb_last_cast.mode)) { 145 cb->flags |= DLM_CB_SKIP; 146 147 log_debug(ls, "skip %x bast %llu mode %d " 148 "for cast %llu mode %d", 149 lkb->lkb_id, 150 (unsigned long long)cb->seq, 151 cb->mode, 152 (unsigned long long)lkb->lkb_last_cast.seq, 153 lkb->lkb_last_cast.mode); 154 rv = 0; 155 goto out; 156 } 157 } 158 159 if (cb->flags & DLM_CB_CAST) { 160 memcpy(&lkb->lkb_last_cast, cb, sizeof(struct dlm_callback)); 161 lkb->lkb_last_cast_time = ktime_get(); 162 } 163 164 if (cb->flags & DLM_CB_BAST) { 165 memcpy(&lkb->lkb_last_bast, cb, sizeof(struct dlm_callback)); 166 lkb->lkb_last_bast_time = ktime_get(); 167 } 168 rv = 0; 169 out: 170 return rv; 171 } 172 173 void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status, 174 uint32_t sbflags) 175 { 176 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 177 uint64_t new_seq, prev_seq; 178 int rv; 179 180 spin_lock(&dlm_cb_seq_spin); 181 new_seq = ++dlm_cb_seq; 182 if (!dlm_cb_seq) 183 new_seq = ++dlm_cb_seq; 184 spin_unlock(&dlm_cb_seq_spin); 185 186 if (lkb->lkb_flags & DLM_IFL_USER) { 187 dlm_user_add_ast(lkb, flags, mode, status, sbflags, new_seq); 188 return; 189 } 190 191 mutex_lock(&lkb->lkb_cb_mutex); 192 prev_seq = lkb->lkb_callbacks[0].seq; 193 194 rv = dlm_add_lkb_callback(lkb, flags, mode, status, sbflags, new_seq); 195 if (rv < 0) 196 goto out; 197 198 if (!prev_seq) { 199 kref_get(&lkb->lkb_ref); 200 201 if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) { 202 mutex_lock(&ls->ls_cb_mutex); 203 list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay); 204 mutex_unlock(&ls->ls_cb_mutex); 205 } else { 206 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); 207 } 208 } 209 out: 210 mutex_unlock(&lkb->lkb_cb_mutex); 211 } 212 213 void dlm_callback_work(struct work_struct *work) 214 { 215 struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work); 216 struct dlm_ls *ls = lkb->lkb_resource->res_ls; 217 void (*castfn) (void *astparam); 218 void (*bastfn) (void *astparam, int mode); 219 struct dlm_callback callbacks[DLM_CALLBACKS_SIZE]; 220 int i, rv, resid; 221 222 memset(&callbacks, 0, sizeof(callbacks)); 223 224 mutex_lock(&lkb->lkb_cb_mutex); 225 if (!lkb->lkb_callbacks[0].seq) { 226 /* no callback work exists, shouldn't happen */ 227 log_error(ls, "dlm_callback_work %x no work", lkb->lkb_id); 228 dlm_print_lkb(lkb); 229 dlm_dump_lkb_callbacks(lkb); 230 } 231 232 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 233 rv = dlm_rem_lkb_callback(ls, lkb, &callbacks[i], &resid); 234 if (rv < 0) 235 break; 236 } 237 238 if (resid) { 239 /* cbs remain, loop should have removed all, shouldn't happen */ 240 log_error(ls, "dlm_callback_work %x resid %d", lkb->lkb_id, 241 resid); 242 dlm_print_lkb(lkb); 243 dlm_dump_lkb_callbacks(lkb); 244 } 245 mutex_unlock(&lkb->lkb_cb_mutex); 246 247 castfn = lkb->lkb_astfn; 248 bastfn = lkb->lkb_bastfn; 249 250 for (i = 0; i < DLM_CALLBACKS_SIZE; i++) { 251 if (!callbacks[i].seq) 252 break; 253 if (callbacks[i].flags & DLM_CB_SKIP) { 254 continue; 255 } else if (callbacks[i].flags & DLM_CB_BAST) { 256 bastfn(lkb->lkb_astparam, callbacks[i].mode); 257 } else if (callbacks[i].flags & DLM_CB_CAST) { 258 lkb->lkb_lksb->sb_status = callbacks[i].sb_status; 259 lkb->lkb_lksb->sb_flags = callbacks[i].sb_flags; 260 castfn(lkb->lkb_astparam); 261 } 262 } 263 264 /* undo kref_get from dlm_add_callback, may cause lkb to be freed */ 265 dlm_put_lkb(lkb); 266 } 267 268 int dlm_callback_start(struct dlm_ls *ls) 269 { 270 ls->ls_callback_wq = alloc_workqueue("dlm_callback", 271 WQ_HIGHPRI | WQ_MEM_RECLAIM, 0); 272 if (!ls->ls_callback_wq) { 273 log_print("can't start dlm_callback workqueue"); 274 return -ENOMEM; 275 } 276 return 0; 277 } 278 279 void dlm_callback_stop(struct dlm_ls *ls) 280 { 281 if (ls->ls_callback_wq) 282 destroy_workqueue(ls->ls_callback_wq); 283 } 284 285 void dlm_callback_suspend(struct dlm_ls *ls) 286 { 287 set_bit(LSFL_CB_DELAY, &ls->ls_flags); 288 289 if (ls->ls_callback_wq) 290 flush_workqueue(ls->ls_callback_wq); 291 } 292 293 #define MAX_CB_QUEUE 25 294 295 void dlm_callback_resume(struct dlm_ls *ls) 296 { 297 struct dlm_lkb *lkb, *safe; 298 int count = 0; 299 300 clear_bit(LSFL_CB_DELAY, &ls->ls_flags); 301 302 if (!ls->ls_callback_wq) 303 return; 304 305 more: 306 mutex_lock(&ls->ls_cb_mutex); 307 list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) { 308 list_del_init(&lkb->lkb_cb_list); 309 queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work); 310 count++; 311 if (count == MAX_CB_QUEUE) 312 break; 313 } 314 mutex_unlock(&ls->ls_cb_mutex); 315 316 if (count) 317 log_rinfo(ls, "dlm_callback_resume %d", count); 318 if (count == MAX_CB_QUEUE) { 319 count = 0; 320 cond_resched(); 321 goto more; 322 } 323 } 324 325