1 /****************************************************************************** 2 ******************************************************************************* 3 ** 4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. 5 ** Copyright (C) 2005-2008 Red Hat, Inc. All rights reserved. 6 ** 7 ** This copyrighted material is made available to anyone wishing to use, 8 ** modify, copy, or redistribute it subject to the terms and conditions 9 ** of the GNU General Public License v.2. 10 ** 11 ******************************************************************************* 12 ******************************************************************************/ 13 14 #include "dlm_internal.h" 15 #include "lockspace.h" 16 #include "member.h" 17 #include "lowcomms.h" 18 #include "midcomms.h" 19 #include "rcom.h" 20 #include "recover.h" 21 #include "dir.h" 22 #include "config.h" 23 #include "memory.h" 24 #include "lock.h" 25 #include "util.h" 26 27 28 static int rcom_response(struct dlm_ls *ls) 29 { 30 return test_bit(LSFL_RCOM_READY, &ls->ls_flags); 31 } 32 33 static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len, 34 struct dlm_rcom **rc_ret, struct dlm_mhandle **mh_ret) 35 { 36 struct dlm_rcom *rc; 37 struct dlm_mhandle *mh; 38 char *mb; 39 int mb_len = sizeof(struct dlm_rcom) + len; 40 41 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb); 42 if (!mh) { 43 log_print("create_rcom to %d type %d len %d ENOBUFS", 44 to_nodeid, type, len); 45 return -ENOBUFS; 46 } 47 memset(mb, 0, mb_len); 48 49 rc = (struct dlm_rcom *) mb; 50 51 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 52 rc->rc_header.h_lockspace = ls->ls_global_id; 53 rc->rc_header.h_nodeid = dlm_our_nodeid(); 54 rc->rc_header.h_length = mb_len; 55 rc->rc_header.h_cmd = DLM_RCOM; 56 57 rc->rc_type = type; 58 59 spin_lock(&ls->ls_recover_lock); 60 rc->rc_seq = ls->ls_recover_seq; 61 spin_unlock(&ls->ls_recover_lock); 62 63 *mh_ret = mh; 64 *rc_ret = rc; 65 return 0; 66 } 67 68 static void send_rcom(struct dlm_ls *ls, struct dlm_mhandle *mh, 69 struct dlm_rcom *rc) 70 { 71 dlm_rcom_out(rc); 72 dlm_lowcomms_commit_buffer(mh); 73 } 74 75 /* When replying to a status request, a node also sends back its 76 configuration values. The requesting node then checks that the remote 77 node is configured the same way as itself. */ 78 79 static void make_config(struct dlm_ls *ls, struct rcom_config *rf) 80 { 81 rf->rf_lvblen = ls->ls_lvblen; 82 rf->rf_lsflags = ls->ls_exflags; 83 } 84 85 static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 86 { 87 struct rcom_config *rf = (struct rcom_config *) rc->rc_buf; 88 89 if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) { 90 log_error(ls, "version mismatch: %x nodeid %d: %x", 91 DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid, 92 rc->rc_header.h_version); 93 return -EPROTO; 94 } 95 96 if (rf->rf_lvblen != ls->ls_lvblen || 97 rf->rf_lsflags != ls->ls_exflags) { 98 log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x", 99 ls->ls_lvblen, ls->ls_exflags, 100 nodeid, rf->rf_lvblen, rf->rf_lsflags); 101 return -EPROTO; 102 } 103 return 0; 104 } 105 106 static void allow_sync_reply(struct dlm_ls *ls, uint64_t *new_seq) 107 { 108 spin_lock(&ls->ls_rcom_spin); 109 *new_seq = ++ls->ls_rcom_seq; 110 set_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 111 spin_unlock(&ls->ls_rcom_spin); 112 } 113 114 static void disallow_sync_reply(struct dlm_ls *ls) 115 { 116 spin_lock(&ls->ls_rcom_spin); 117 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 118 clear_bit(LSFL_RCOM_READY, &ls->ls_flags); 119 spin_unlock(&ls->ls_rcom_spin); 120 } 121 122 int dlm_rcom_status(struct dlm_ls *ls, int nodeid) 123 { 124 struct dlm_rcom *rc; 125 struct dlm_mhandle *mh; 126 int error = 0; 127 128 ls->ls_recover_nodeid = nodeid; 129 130 if (nodeid == dlm_our_nodeid()) { 131 rc = (struct dlm_rcom *) ls->ls_recover_buf; 132 rc->rc_result = dlm_recover_status(ls); 133 goto out; 134 } 135 136 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh); 137 if (error) 138 goto out; 139 140 allow_sync_reply(ls, &rc->rc_id); 141 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size); 142 143 send_rcom(ls, mh, rc); 144 145 error = dlm_wait_function(ls, &rcom_response); 146 disallow_sync_reply(ls); 147 if (error) 148 goto out; 149 150 rc = (struct dlm_rcom *) ls->ls_recover_buf; 151 152 if (rc->rc_result == -ESRCH) { 153 /* we pretend the remote lockspace exists with 0 status */ 154 log_debug(ls, "remote node %d not ready", nodeid); 155 rc->rc_result = 0; 156 } else 157 error = check_config(ls, rc, nodeid); 158 /* the caller looks at rc_result for the remote recovery status */ 159 out: 160 return error; 161 } 162 163 static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in) 164 { 165 struct dlm_rcom *rc; 166 struct dlm_mhandle *mh; 167 int error, nodeid = rc_in->rc_header.h_nodeid; 168 169 error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY, 170 sizeof(struct rcom_config), &rc, &mh); 171 if (error) 172 return; 173 rc->rc_id = rc_in->rc_id; 174 rc->rc_seq_reply = rc_in->rc_seq; 175 rc->rc_result = dlm_recover_status(ls); 176 make_config(ls, (struct rcom_config *) rc->rc_buf); 177 178 send_rcom(ls, mh, rc); 179 } 180 181 static void receive_sync_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 182 { 183 spin_lock(&ls->ls_rcom_spin); 184 if (!test_bit(LSFL_RCOM_WAIT, &ls->ls_flags) || 185 rc_in->rc_id != ls->ls_rcom_seq) { 186 log_debug(ls, "reject reply %d from %d seq %llx expect %llx", 187 rc_in->rc_type, rc_in->rc_header.h_nodeid, 188 (unsigned long long)rc_in->rc_id, 189 (unsigned long long)ls->ls_rcom_seq); 190 goto out; 191 } 192 memcpy(ls->ls_recover_buf, rc_in, rc_in->rc_header.h_length); 193 set_bit(LSFL_RCOM_READY, &ls->ls_flags); 194 clear_bit(LSFL_RCOM_WAIT, &ls->ls_flags); 195 wake_up(&ls->ls_wait_general); 196 out: 197 spin_unlock(&ls->ls_rcom_spin); 198 } 199 200 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len) 201 { 202 struct dlm_rcom *rc; 203 struct dlm_mhandle *mh; 204 int error = 0, len = sizeof(struct dlm_rcom); 205 206 ls->ls_recover_nodeid = nodeid; 207 208 if (nodeid == dlm_our_nodeid()) { 209 dlm_copy_master_names(ls, last_name, last_len, 210 ls->ls_recover_buf + len, 211 dlm_config.ci_buffer_size - len, nodeid); 212 goto out; 213 } 214 215 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh); 216 if (error) 217 goto out; 218 memcpy(rc->rc_buf, last_name, last_len); 219 220 allow_sync_reply(ls, &rc->rc_id); 221 memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size); 222 223 send_rcom(ls, mh, rc); 224 225 error = dlm_wait_function(ls, &rcom_response); 226 disallow_sync_reply(ls); 227 out: 228 return error; 229 } 230 231 static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in) 232 { 233 struct dlm_rcom *rc; 234 struct dlm_mhandle *mh; 235 int error, inlen, outlen, nodeid; 236 237 nodeid = rc_in->rc_header.h_nodeid; 238 inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); 239 outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom); 240 241 error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh); 242 if (error) 243 return; 244 rc->rc_id = rc_in->rc_id; 245 rc->rc_seq_reply = rc_in->rc_seq; 246 247 dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen, 248 nodeid); 249 send_rcom(ls, mh, rc); 250 } 251 252 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid) 253 { 254 struct dlm_rcom *rc; 255 struct dlm_mhandle *mh; 256 struct dlm_ls *ls = r->res_ls; 257 int error; 258 259 error = create_rcom(ls, dir_nodeid, DLM_RCOM_LOOKUP, r->res_length, 260 &rc, &mh); 261 if (error) 262 goto out; 263 memcpy(rc->rc_buf, r->res_name, r->res_length); 264 rc->rc_id = (unsigned long) r; 265 266 send_rcom(ls, mh, rc); 267 out: 268 return error; 269 } 270 271 static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in) 272 { 273 struct dlm_rcom *rc; 274 struct dlm_mhandle *mh; 275 int error, ret_nodeid, nodeid = rc_in->rc_header.h_nodeid; 276 int len = rc_in->rc_header.h_length - sizeof(struct dlm_rcom); 277 278 error = create_rcom(ls, nodeid, DLM_RCOM_LOOKUP_REPLY, 0, &rc, &mh); 279 if (error) 280 return; 281 282 error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid); 283 if (error) 284 ret_nodeid = error; 285 rc->rc_result = ret_nodeid; 286 rc->rc_id = rc_in->rc_id; 287 rc->rc_seq_reply = rc_in->rc_seq; 288 289 send_rcom(ls, mh, rc); 290 } 291 292 static void receive_rcom_lookup_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in) 293 { 294 dlm_recover_master_reply(ls, rc_in); 295 } 296 297 static void pack_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb, 298 struct rcom_lock *rl) 299 { 300 memset(rl, 0, sizeof(*rl)); 301 302 rl->rl_ownpid = lkb->lkb_ownpid; 303 rl->rl_lkid = lkb->lkb_id; 304 rl->rl_exflags = lkb->lkb_exflags; 305 rl->rl_flags = lkb->lkb_flags; 306 rl->rl_lvbseq = lkb->lkb_lvbseq; 307 rl->rl_rqmode = lkb->lkb_rqmode; 308 rl->rl_grmode = lkb->lkb_grmode; 309 rl->rl_status = lkb->lkb_status; 310 rl->rl_wait_type = lkb->lkb_wait_type; 311 312 if (lkb->lkb_bastaddr) 313 rl->rl_asts |= AST_BAST; 314 if (lkb->lkb_astaddr) 315 rl->rl_asts |= AST_COMP; 316 317 rl->rl_namelen = r->res_length; 318 memcpy(rl->rl_name, r->res_name, r->res_length); 319 320 /* FIXME: might we have an lvb without DLM_LKF_VALBLK set ? 321 If so, receive_rcom_lock_args() won't take this copy. */ 322 323 if (lkb->lkb_lvbptr) 324 memcpy(rl->rl_lvb, lkb->lkb_lvbptr, r->res_ls->ls_lvblen); 325 } 326 327 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) 328 { 329 struct dlm_ls *ls = r->res_ls; 330 struct dlm_rcom *rc; 331 struct dlm_mhandle *mh; 332 struct rcom_lock *rl; 333 int error, len = sizeof(struct rcom_lock); 334 335 if (lkb->lkb_lvbptr) 336 len += ls->ls_lvblen; 337 338 error = create_rcom(ls, r->res_nodeid, DLM_RCOM_LOCK, len, &rc, &mh); 339 if (error) 340 goto out; 341 342 rl = (struct rcom_lock *) rc->rc_buf; 343 pack_rcom_lock(r, lkb, rl); 344 rc->rc_id = (unsigned long) r; 345 346 send_rcom(ls, mh, rc); 347 out: 348 return error; 349 } 350 351 static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in) 352 { 353 struct dlm_rcom *rc; 354 struct dlm_mhandle *mh; 355 int error, nodeid = rc_in->rc_header.h_nodeid; 356 357 dlm_recover_master_copy(ls, rc_in); 358 359 error = create_rcom(ls, nodeid, DLM_RCOM_LOCK_REPLY, 360 sizeof(struct rcom_lock), &rc, &mh); 361 if (error) 362 return; 363 364 /* We send back the same rcom_lock struct we received, but 365 dlm_recover_master_copy() has filled in rl_remid and rl_result */ 366 367 memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock)); 368 rc->rc_id = rc_in->rc_id; 369 rc->rc_seq_reply = rc_in->rc_seq; 370 371 send_rcom(ls, mh, rc); 372 } 373 374 /* If the lockspace doesn't exist then still send a status message 375 back; it's possible that it just doesn't have its global_id yet. */ 376 377 int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) 378 { 379 struct dlm_rcom *rc; 380 struct rcom_config *rf; 381 struct dlm_mhandle *mh; 382 char *mb; 383 int mb_len = sizeof(struct dlm_rcom) + sizeof(struct rcom_config); 384 385 mh = dlm_lowcomms_get_buffer(nodeid, mb_len, GFP_NOFS, &mb); 386 if (!mh) 387 return -ENOBUFS; 388 memset(mb, 0, mb_len); 389 390 rc = (struct dlm_rcom *) mb; 391 392 rc->rc_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR); 393 rc->rc_header.h_lockspace = rc_in->rc_header.h_lockspace; 394 rc->rc_header.h_nodeid = dlm_our_nodeid(); 395 rc->rc_header.h_length = mb_len; 396 rc->rc_header.h_cmd = DLM_RCOM; 397 398 rc->rc_type = DLM_RCOM_STATUS_REPLY; 399 rc->rc_id = rc_in->rc_id; 400 rc->rc_seq_reply = rc_in->rc_seq; 401 rc->rc_result = -ESRCH; 402 403 rf = (struct rcom_config *) rc->rc_buf; 404 rf->rf_lvblen = -1; 405 406 dlm_rcom_out(rc); 407 dlm_lowcomms_commit_buffer(mh); 408 409 return 0; 410 } 411 412 static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc) 413 { 414 uint64_t seq; 415 int rv = 0; 416 417 switch (rc->rc_type) { 418 case DLM_RCOM_STATUS_REPLY: 419 case DLM_RCOM_NAMES_REPLY: 420 case DLM_RCOM_LOOKUP_REPLY: 421 case DLM_RCOM_LOCK_REPLY: 422 spin_lock(&ls->ls_recover_lock); 423 seq = ls->ls_recover_seq; 424 spin_unlock(&ls->ls_recover_lock); 425 if (rc->rc_seq_reply != seq) { 426 log_debug(ls, "ignoring old reply %x from %d " 427 "seq_reply %llx expect %llx", 428 rc->rc_type, rc->rc_header.h_nodeid, 429 (unsigned long long)rc->rc_seq_reply, 430 (unsigned long long)seq); 431 rv = 1; 432 } 433 } 434 return rv; 435 } 436 437 /* Called by dlm_recv; corresponds to dlm_receive_message() but special 438 recovery-only comms are sent through here. */ 439 440 void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) 441 { 442 if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) { 443 log_debug(ls, "ignoring recovery message %x from %d", 444 rc->rc_type, nodeid); 445 goto out; 446 } 447 448 if (is_old_reply(ls, rc)) 449 goto out; 450 451 switch (rc->rc_type) { 452 case DLM_RCOM_STATUS: 453 receive_rcom_status(ls, rc); 454 break; 455 456 case DLM_RCOM_NAMES: 457 receive_rcom_names(ls, rc); 458 break; 459 460 case DLM_RCOM_LOOKUP: 461 receive_rcom_lookup(ls, rc); 462 break; 463 464 case DLM_RCOM_LOCK: 465 receive_rcom_lock(ls, rc); 466 break; 467 468 case DLM_RCOM_STATUS_REPLY: 469 receive_sync_reply(ls, rc); 470 break; 471 472 case DLM_RCOM_NAMES_REPLY: 473 receive_sync_reply(ls, rc); 474 break; 475 476 case DLM_RCOM_LOOKUP_REPLY: 477 receive_rcom_lookup_reply(ls, rc); 478 break; 479 480 case DLM_RCOM_LOCK_REPLY: 481 dlm_recover_process_copy(ls, rc); 482 break; 483 484 default: 485 log_error(ls, "receive_rcom bad type %d", rc->rc_type); 486 } 487 out: 488 return; 489 } 490 491