1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 26 static void __send_queued(struct ceph_osd_client *osdc); 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 static void __register_request(struct ceph_osd_client *osdc, 29 struct ceph_osd_request *req); 30 static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __send_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 35 /* 36 * Implement client access to distributed object storage cluster. 37 * 38 * All data objects are stored within a cluster/cloud of OSDs, or 39 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 40 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 41 * remote daemons serving up and coordinating consistent and safe 42 * access to storage. 43 * 44 * Cluster membership and the mapping of data objects onto storage devices 45 * are described by the osd map. 46 * 47 * We keep track of pending OSD requests (read, write), resubmit 48 * requests to different OSDs when the cluster topology/data layout 49 * change, or retry the affected requests when the communications 50 * channel with an OSD is reset. 51 */ 52 53 /* 54 * calculate the mapping of a file extent onto an object, and fill out the 55 * request accordingly. shorten extent as necessary if it crosses an 56 * object boundary. 57 * 58 * fill osd op in request message. 59 */ 60 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 61 u64 *objnum, u64 *objoff, u64 *objlen) 62 { 63 u64 orig_len = *plen; 64 int r; 65 66 /* object extent? */ 67 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 68 objoff, objlen); 69 if (r < 0) 70 return r; 71 if (*objlen < orig_len) { 72 *plen = *objlen; 73 dout(" skipping last %llu, final file extent %llu~%llu\n", 74 orig_len - *plen, off, *plen); 75 } 76 77 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 78 79 return 0; 80 } 81 82 /* 83 * requests 84 */ 85 void ceph_osdc_release_request(struct kref *kref) 86 { 87 int num_pages; 88 struct ceph_osd_request *req = container_of(kref, 89 struct ceph_osd_request, 90 r_kref); 91 92 if (req->r_request) 93 ceph_msg_put(req->r_request); 94 if (req->r_con_filling_msg) { 95 dout("%s revoking msg %p from con %p\n", __func__, 96 req->r_reply, req->r_con_filling_msg); 97 ceph_msg_revoke_incoming(req->r_reply); 98 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 99 req->r_con_filling_msg = NULL; 100 } 101 if (req->r_reply) 102 ceph_msg_put(req->r_reply); 103 104 if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES && 105 req->r_data_in.own_pages) { 106 num_pages = calc_pages_for((u64)req->r_data_in.alignment, 107 (u64)req->r_data_in.length); 108 ceph_release_page_vector(req->r_data_in.pages, num_pages); 109 } 110 if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES && 111 req->r_data_out.own_pages) { 112 num_pages = calc_pages_for((u64)req->r_data_out.alignment, 113 (u64)req->r_data_out.length); 114 ceph_release_page_vector(req->r_data_out.pages, num_pages); 115 } 116 117 ceph_put_snap_context(req->r_snapc); 118 if (req->r_mempool) 119 mempool_free(req, req->r_osdc->req_mempool); 120 else 121 kfree(req); 122 } 123 EXPORT_SYMBOL(ceph_osdc_release_request); 124 125 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 126 struct ceph_snap_context *snapc, 127 unsigned int num_ops, 128 bool use_mempool, 129 gfp_t gfp_flags) 130 { 131 struct ceph_osd_request *req; 132 struct ceph_msg *msg; 133 size_t msg_size; 134 135 msg_size = 4 + 4 + 8 + 8 + 4+8; 136 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 137 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 138 msg_size += 4 + MAX_OBJ_NAME_SIZE; 139 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 140 msg_size += 8; /* snapid */ 141 msg_size += 8; /* snap_seq */ 142 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 143 msg_size += 4; 144 145 if (use_mempool) { 146 req = mempool_alloc(osdc->req_mempool, gfp_flags); 147 memset(req, 0, sizeof(*req)); 148 } else { 149 req = kzalloc(sizeof(*req), gfp_flags); 150 } 151 if (req == NULL) 152 return NULL; 153 154 req->r_osdc = osdc; 155 req->r_mempool = use_mempool; 156 157 kref_init(&req->r_kref); 158 init_completion(&req->r_completion); 159 init_completion(&req->r_safe_completion); 160 RB_CLEAR_NODE(&req->r_node); 161 INIT_LIST_HEAD(&req->r_unsafe_item); 162 INIT_LIST_HEAD(&req->r_linger_item); 163 INIT_LIST_HEAD(&req->r_linger_osd); 164 INIT_LIST_HEAD(&req->r_req_lru_item); 165 INIT_LIST_HEAD(&req->r_osd_item); 166 167 /* create reply message */ 168 if (use_mempool) 169 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 170 else 171 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 172 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 173 if (!msg) { 174 ceph_osdc_put_request(req); 175 return NULL; 176 } 177 req->r_reply = msg; 178 179 req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE; 180 req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE; 181 182 /* create request message; allow space for oid */ 183 if (use_mempool) 184 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 185 else 186 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 187 if (!msg) { 188 ceph_osdc_put_request(req); 189 return NULL; 190 } 191 192 memset(msg->front.iov_base, 0, msg->front.iov_len); 193 194 req->r_request = msg; 195 196 return req; 197 } 198 EXPORT_SYMBOL(ceph_osdc_alloc_request); 199 200 static bool osd_req_opcode_valid(u16 opcode) 201 { 202 switch (opcode) { 203 case CEPH_OSD_OP_READ: 204 case CEPH_OSD_OP_STAT: 205 case CEPH_OSD_OP_MAPEXT: 206 case CEPH_OSD_OP_MASKTRUNC: 207 case CEPH_OSD_OP_SPARSE_READ: 208 case CEPH_OSD_OP_NOTIFY: 209 case CEPH_OSD_OP_NOTIFY_ACK: 210 case CEPH_OSD_OP_ASSERT_VER: 211 case CEPH_OSD_OP_WRITE: 212 case CEPH_OSD_OP_WRITEFULL: 213 case CEPH_OSD_OP_TRUNCATE: 214 case CEPH_OSD_OP_ZERO: 215 case CEPH_OSD_OP_DELETE: 216 case CEPH_OSD_OP_APPEND: 217 case CEPH_OSD_OP_STARTSYNC: 218 case CEPH_OSD_OP_SETTRUNC: 219 case CEPH_OSD_OP_TRIMTRUNC: 220 case CEPH_OSD_OP_TMAPUP: 221 case CEPH_OSD_OP_TMAPPUT: 222 case CEPH_OSD_OP_TMAPGET: 223 case CEPH_OSD_OP_CREATE: 224 case CEPH_OSD_OP_ROLLBACK: 225 case CEPH_OSD_OP_WATCH: 226 case CEPH_OSD_OP_OMAPGETKEYS: 227 case CEPH_OSD_OP_OMAPGETVALS: 228 case CEPH_OSD_OP_OMAPGETHEADER: 229 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 230 case CEPH_OSD_OP_OMAPSETVALS: 231 case CEPH_OSD_OP_OMAPSETHEADER: 232 case CEPH_OSD_OP_OMAPCLEAR: 233 case CEPH_OSD_OP_OMAPRMKEYS: 234 case CEPH_OSD_OP_OMAP_CMP: 235 case CEPH_OSD_OP_CLONERANGE: 236 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 237 case CEPH_OSD_OP_SRC_CMPXATTR: 238 case CEPH_OSD_OP_GETXATTR: 239 case CEPH_OSD_OP_GETXATTRS: 240 case CEPH_OSD_OP_CMPXATTR: 241 case CEPH_OSD_OP_SETXATTR: 242 case CEPH_OSD_OP_SETXATTRS: 243 case CEPH_OSD_OP_RESETXATTRS: 244 case CEPH_OSD_OP_RMXATTR: 245 case CEPH_OSD_OP_PULL: 246 case CEPH_OSD_OP_PUSH: 247 case CEPH_OSD_OP_BALANCEREADS: 248 case CEPH_OSD_OP_UNBALANCEREADS: 249 case CEPH_OSD_OP_SCRUB: 250 case CEPH_OSD_OP_SCRUB_RESERVE: 251 case CEPH_OSD_OP_SCRUB_UNRESERVE: 252 case CEPH_OSD_OP_SCRUB_STOP: 253 case CEPH_OSD_OP_SCRUB_MAP: 254 case CEPH_OSD_OP_WRLOCK: 255 case CEPH_OSD_OP_WRUNLOCK: 256 case CEPH_OSD_OP_RDLOCK: 257 case CEPH_OSD_OP_RDUNLOCK: 258 case CEPH_OSD_OP_UPLOCK: 259 case CEPH_OSD_OP_DNLOCK: 260 case CEPH_OSD_OP_CALL: 261 case CEPH_OSD_OP_PGLS: 262 case CEPH_OSD_OP_PGLS_FILTER: 263 return true; 264 default: 265 return false; 266 } 267 } 268 269 /* 270 * This is an osd op init function for opcodes that have no data or 271 * other information associated with them. It also serves as a 272 * common init routine for all the other init functions, below. 273 */ 274 void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode) 275 { 276 BUG_ON(!osd_req_opcode_valid(opcode)); 277 278 memset(op, 0, sizeof (*op)); 279 280 op->op = opcode; 281 } 282 283 void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, 284 u64 offset, u64 length, 285 u64 truncate_size, u32 truncate_seq) 286 { 287 size_t payload_len = 0; 288 289 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 290 291 osd_req_op_init(op, opcode); 292 293 op->extent.offset = offset; 294 op->extent.length = length; 295 op->extent.truncate_size = truncate_size; 296 op->extent.truncate_seq = truncate_seq; 297 if (opcode == CEPH_OSD_OP_WRITE) 298 payload_len += length; 299 300 op->payload_len = payload_len; 301 } 302 EXPORT_SYMBOL(osd_req_op_extent_init); 303 304 void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, 305 const char *class, const char *method, 306 const void *request_data, size_t request_data_size) 307 { 308 size_t payload_len = 0; 309 size_t size; 310 311 BUG_ON(opcode != CEPH_OSD_OP_CALL); 312 313 osd_req_op_init(op, opcode); 314 315 op->cls.class_name = class; 316 size = strlen(class); 317 BUG_ON(size > (size_t) U8_MAX); 318 op->cls.class_len = size; 319 payload_len += size; 320 321 op->cls.method_name = method; 322 size = strlen(method); 323 BUG_ON(size > (size_t) U8_MAX); 324 op->cls.method_len = size; 325 payload_len += size; 326 327 op->cls.indata = request_data; 328 BUG_ON(request_data_size > (size_t) U32_MAX); 329 op->cls.indata_len = (u32) request_data_size; 330 payload_len += request_data_size; 331 332 op->cls.argc = 0; /* currently unused */ 333 334 op->payload_len = payload_len; 335 } 336 EXPORT_SYMBOL(osd_req_op_cls_init); 337 338 void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, 339 u64 cookie, u64 version, int flag) 340 { 341 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 342 343 osd_req_op_init(op, opcode); 344 345 op->watch.cookie = cookie; 346 /* op->watch.ver = version; */ /* XXX 3847 */ 347 op->watch.ver = cpu_to_le64(version); 348 if (opcode == CEPH_OSD_OP_WATCH && flag) 349 op->watch.flag = (u8) 1; 350 } 351 EXPORT_SYMBOL(osd_req_op_watch_init); 352 353 static u64 osd_req_encode_op(struct ceph_osd_request *req, 354 struct ceph_osd_op *dst, 355 struct ceph_osd_req_op *src) 356 { 357 u64 out_data_len = 0; 358 struct ceph_pagelist *pagelist; 359 360 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 361 pr_err("unrecognized osd opcode %d\n", src->op); 362 363 return 0; 364 } 365 366 switch (src->op) { 367 case CEPH_OSD_OP_STAT: 368 break; 369 case CEPH_OSD_OP_READ: 370 case CEPH_OSD_OP_WRITE: 371 if (src->op == CEPH_OSD_OP_WRITE) 372 out_data_len = src->extent.length; 373 dst->extent.offset = cpu_to_le64(src->extent.offset); 374 dst->extent.length = cpu_to_le64(src->extent.length); 375 dst->extent.truncate_size = 376 cpu_to_le64(src->extent.truncate_size); 377 dst->extent.truncate_seq = 378 cpu_to_le32(src->extent.truncate_seq); 379 break; 380 case CEPH_OSD_OP_CALL: 381 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 382 BUG_ON(!pagelist); 383 ceph_pagelist_init(pagelist); 384 385 dst->cls.class_len = src->cls.class_len; 386 dst->cls.method_len = src->cls.method_len; 387 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 388 ceph_pagelist_append(pagelist, src->cls.class_name, 389 src->cls.class_len); 390 ceph_pagelist_append(pagelist, src->cls.method_name, 391 src->cls.method_len); 392 ceph_pagelist_append(pagelist, src->cls.indata, 393 src->cls.indata_len); 394 395 req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST; 396 req->r_data_out.pagelist = pagelist; 397 out_data_len = pagelist->length; 398 break; 399 case CEPH_OSD_OP_STARTSYNC: 400 break; 401 case CEPH_OSD_OP_NOTIFY_ACK: 402 case CEPH_OSD_OP_WATCH: 403 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 404 dst->watch.ver = cpu_to_le64(src->watch.ver); 405 dst->watch.flag = src->watch.flag; 406 break; 407 default: 408 pr_err("unsupported osd opcode %s\n", 409 ceph_osd_op_name(src->op)); 410 WARN_ON(1); 411 412 return 0; 413 } 414 dst->op = cpu_to_le16(src->op); 415 dst->payload_len = cpu_to_le32(src->payload_len); 416 417 return out_data_len; 418 } 419 420 /* 421 * build new request AND message 422 * 423 */ 424 void ceph_osdc_build_request(struct ceph_osd_request *req, 425 u64 off, unsigned int num_ops, 426 struct ceph_osd_req_op *src_ops, 427 struct ceph_snap_context *snapc, u64 snap_id, 428 struct timespec *mtime) 429 { 430 struct ceph_msg *msg = req->r_request; 431 struct ceph_osd_req_op *src_op; 432 void *p; 433 size_t msg_size; 434 int flags = req->r_flags; 435 u64 data_len; 436 int i; 437 438 req->r_num_ops = num_ops; 439 req->r_snapid = snap_id; 440 req->r_snapc = ceph_get_snap_context(snapc); 441 442 /* encode request */ 443 msg->hdr.version = cpu_to_le16(4); 444 445 p = msg->front.iov_base; 446 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 447 req->r_request_osdmap_epoch = p; 448 p += 4; 449 req->r_request_flags = p; 450 p += 4; 451 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 452 ceph_encode_timespec(p, mtime); 453 p += sizeof(struct ceph_timespec); 454 req->r_request_reassert_version = p; 455 p += sizeof(struct ceph_eversion); /* will get filled in */ 456 457 /* oloc */ 458 ceph_encode_8(&p, 4); 459 ceph_encode_8(&p, 4); 460 ceph_encode_32(&p, 8 + 4 + 4); 461 req->r_request_pool = p; 462 p += 8; 463 ceph_encode_32(&p, -1); /* preferred */ 464 ceph_encode_32(&p, 0); /* key len */ 465 466 ceph_encode_8(&p, 1); 467 req->r_request_pgid = p; 468 p += 8 + 4; 469 ceph_encode_32(&p, -1); /* preferred */ 470 471 /* oid */ 472 ceph_encode_32(&p, req->r_oid_len); 473 memcpy(p, req->r_oid, req->r_oid_len); 474 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 475 p += req->r_oid_len; 476 477 /* ops--can imply data */ 478 ceph_encode_16(&p, num_ops); 479 src_op = src_ops; 480 req->r_request_ops = p; 481 data_len = 0; 482 for (i = 0; i < num_ops; i++, src_op++) { 483 data_len += osd_req_encode_op(req, p, src_op); 484 p += sizeof(struct ceph_osd_op); 485 } 486 487 /* snaps */ 488 ceph_encode_64(&p, req->r_snapid); 489 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 490 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 491 if (req->r_snapc) { 492 for (i = 0; i < snapc->num_snaps; i++) { 493 ceph_encode_64(&p, req->r_snapc->snaps[i]); 494 } 495 } 496 497 req->r_request_attempts = p; 498 p += 4; 499 500 /* data */ 501 if (flags & CEPH_OSD_FLAG_WRITE) { 502 u16 data_off; 503 504 /* 505 * The header "data_off" is a hint to the receiver 506 * allowing it to align received data into its 507 * buffers such that there's no need to re-copy 508 * it before writing it to disk (direct I/O). 509 */ 510 data_off = (u16) (off & 0xffff); 511 req->r_request->hdr.data_off = cpu_to_le16(data_off); 512 } 513 req->r_request->hdr.data_len = cpu_to_le32(data_len); 514 515 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 516 msg_size = p - msg->front.iov_base; 517 msg->front.iov_len = msg_size; 518 msg->hdr.front_len = cpu_to_le32(msg_size); 519 520 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, 521 num_ops); 522 return; 523 } 524 EXPORT_SYMBOL(ceph_osdc_build_request); 525 526 /* 527 * build new request AND message, calculate layout, and adjust file 528 * extent as needed. 529 * 530 * if the file was recently truncated, we include information about its 531 * old and new size so that the object can be updated appropriately. (we 532 * avoid synchronously deleting truncated objects because it's slow.) 533 * 534 * if @do_sync, include a 'startsync' command so that the osd will flush 535 * data quickly. 536 */ 537 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 538 struct ceph_file_layout *layout, 539 struct ceph_vino vino, 540 u64 off, u64 *plen, 541 int opcode, int flags, 542 struct ceph_snap_context *snapc, 543 int do_sync, 544 u32 truncate_seq, 545 u64 truncate_size, 546 struct timespec *mtime, 547 bool use_mempool) 548 { 549 struct ceph_osd_req_op ops[2]; 550 struct ceph_osd_request *req; 551 unsigned int num_op = do_sync ? 2 : 1; 552 u64 objnum = 0; 553 u64 objoff = 0; 554 u64 objlen = 0; 555 u32 object_size; 556 u64 object_base; 557 int r; 558 559 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 560 561 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, 562 GFP_NOFS); 563 if (!req) 564 return ERR_PTR(-ENOMEM); 565 req->r_flags = flags; 566 567 /* calculate max write size */ 568 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 569 if (r < 0) { 570 ceph_osdc_put_request(req); 571 return ERR_PTR(r); 572 } 573 574 object_size = le32_to_cpu(layout->fl_object_size); 575 object_base = off - objoff; 576 if (truncate_size <= object_base) { 577 truncate_size = 0; 578 } else { 579 truncate_size -= object_base; 580 if (truncate_size > object_size) 581 truncate_size = object_size; 582 } 583 584 osd_req_op_extent_init(&ops[0], opcode, objoff, objlen, 585 truncate_size, truncate_seq); 586 if (do_sync) 587 osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC); 588 589 req->r_file_layout = *layout; /* keep a copy */ 590 591 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 592 vino.ino, objnum); 593 req->r_oid_len = strlen(req->r_oid); 594 595 ceph_osdc_build_request(req, off, num_op, ops, 596 snapc, vino.snap, mtime); 597 598 return req; 599 } 600 EXPORT_SYMBOL(ceph_osdc_new_request); 601 602 /* 603 * We keep osd requests in an rbtree, sorted by ->r_tid. 604 */ 605 static void __insert_request(struct ceph_osd_client *osdc, 606 struct ceph_osd_request *new) 607 { 608 struct rb_node **p = &osdc->requests.rb_node; 609 struct rb_node *parent = NULL; 610 struct ceph_osd_request *req = NULL; 611 612 while (*p) { 613 parent = *p; 614 req = rb_entry(parent, struct ceph_osd_request, r_node); 615 if (new->r_tid < req->r_tid) 616 p = &(*p)->rb_left; 617 else if (new->r_tid > req->r_tid) 618 p = &(*p)->rb_right; 619 else 620 BUG(); 621 } 622 623 rb_link_node(&new->r_node, parent, p); 624 rb_insert_color(&new->r_node, &osdc->requests); 625 } 626 627 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 628 u64 tid) 629 { 630 struct ceph_osd_request *req; 631 struct rb_node *n = osdc->requests.rb_node; 632 633 while (n) { 634 req = rb_entry(n, struct ceph_osd_request, r_node); 635 if (tid < req->r_tid) 636 n = n->rb_left; 637 else if (tid > req->r_tid) 638 n = n->rb_right; 639 else 640 return req; 641 } 642 return NULL; 643 } 644 645 static struct ceph_osd_request * 646 __lookup_request_ge(struct ceph_osd_client *osdc, 647 u64 tid) 648 { 649 struct ceph_osd_request *req; 650 struct rb_node *n = osdc->requests.rb_node; 651 652 while (n) { 653 req = rb_entry(n, struct ceph_osd_request, r_node); 654 if (tid < req->r_tid) { 655 if (!n->rb_left) 656 return req; 657 n = n->rb_left; 658 } else if (tid > req->r_tid) { 659 n = n->rb_right; 660 } else { 661 return req; 662 } 663 } 664 return NULL; 665 } 666 667 /* 668 * Resubmit requests pending on the given osd. 669 */ 670 static void __kick_osd_requests(struct ceph_osd_client *osdc, 671 struct ceph_osd *osd) 672 { 673 struct ceph_osd_request *req, *nreq; 674 LIST_HEAD(resend); 675 int err; 676 677 dout("__kick_osd_requests osd%d\n", osd->o_osd); 678 err = __reset_osd(osdc, osd); 679 if (err) 680 return; 681 /* 682 * Build up a list of requests to resend by traversing the 683 * osd's list of requests. Requests for a given object are 684 * sent in tid order, and that is also the order they're 685 * kept on this list. Therefore all requests that are in 686 * flight will be found first, followed by all requests that 687 * have not yet been sent. And to resend requests while 688 * preserving this order we will want to put any sent 689 * requests back on the front of the osd client's unsent 690 * list. 691 * 692 * So we build a separate ordered list of already-sent 693 * requests for the affected osd and splice it onto the 694 * front of the osd client's unsent list. Once we've seen a 695 * request that has not yet been sent we're done. Those 696 * requests are already sitting right where they belong. 697 */ 698 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 699 if (!req->r_sent) 700 break; 701 list_move_tail(&req->r_req_lru_item, &resend); 702 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 703 osd->o_osd); 704 if (!req->r_linger) 705 req->r_flags |= CEPH_OSD_FLAG_RETRY; 706 } 707 list_splice(&resend, &osdc->req_unsent); 708 709 /* 710 * Linger requests are re-registered before sending, which 711 * sets up a new tid for each. We add them to the unsent 712 * list at the end to keep things in tid order. 713 */ 714 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 715 r_linger_osd) { 716 /* 717 * reregister request prior to unregistering linger so 718 * that r_osd is preserved. 719 */ 720 BUG_ON(!list_empty(&req->r_req_lru_item)); 721 __register_request(osdc, req); 722 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); 723 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 724 __unregister_linger_request(osdc, req); 725 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 726 osd->o_osd); 727 } 728 } 729 730 /* 731 * If the osd connection drops, we need to resubmit all requests. 732 */ 733 static void osd_reset(struct ceph_connection *con) 734 { 735 struct ceph_osd *osd = con->private; 736 struct ceph_osd_client *osdc; 737 738 if (!osd) 739 return; 740 dout("osd_reset osd%d\n", osd->o_osd); 741 osdc = osd->o_osdc; 742 down_read(&osdc->map_sem); 743 mutex_lock(&osdc->request_mutex); 744 __kick_osd_requests(osdc, osd); 745 __send_queued(osdc); 746 mutex_unlock(&osdc->request_mutex); 747 up_read(&osdc->map_sem); 748 } 749 750 /* 751 * Track open sessions with osds. 752 */ 753 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 754 { 755 struct ceph_osd *osd; 756 757 osd = kzalloc(sizeof(*osd), GFP_NOFS); 758 if (!osd) 759 return NULL; 760 761 atomic_set(&osd->o_ref, 1); 762 osd->o_osdc = osdc; 763 osd->o_osd = onum; 764 RB_CLEAR_NODE(&osd->o_node); 765 INIT_LIST_HEAD(&osd->o_requests); 766 INIT_LIST_HEAD(&osd->o_linger_requests); 767 INIT_LIST_HEAD(&osd->o_osd_lru); 768 osd->o_incarnation = 1; 769 770 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 771 772 INIT_LIST_HEAD(&osd->o_keepalive_item); 773 return osd; 774 } 775 776 static struct ceph_osd *get_osd(struct ceph_osd *osd) 777 { 778 if (atomic_inc_not_zero(&osd->o_ref)) { 779 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 780 atomic_read(&osd->o_ref)); 781 return osd; 782 } else { 783 dout("get_osd %p FAIL\n", osd); 784 return NULL; 785 } 786 } 787 788 static void put_osd(struct ceph_osd *osd) 789 { 790 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 791 atomic_read(&osd->o_ref) - 1); 792 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 793 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 794 795 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 796 kfree(osd); 797 } 798 } 799 800 /* 801 * remove an osd from our map 802 */ 803 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 804 { 805 dout("__remove_osd %p\n", osd); 806 BUG_ON(!list_empty(&osd->o_requests)); 807 rb_erase(&osd->o_node, &osdc->osds); 808 list_del_init(&osd->o_osd_lru); 809 ceph_con_close(&osd->o_con); 810 put_osd(osd); 811 } 812 813 static void remove_all_osds(struct ceph_osd_client *osdc) 814 { 815 dout("%s %p\n", __func__, osdc); 816 mutex_lock(&osdc->request_mutex); 817 while (!RB_EMPTY_ROOT(&osdc->osds)) { 818 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 819 struct ceph_osd, o_node); 820 __remove_osd(osdc, osd); 821 } 822 mutex_unlock(&osdc->request_mutex); 823 } 824 825 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 826 struct ceph_osd *osd) 827 { 828 dout("__move_osd_to_lru %p\n", osd); 829 BUG_ON(!list_empty(&osd->o_osd_lru)); 830 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 831 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 832 } 833 834 static void __remove_osd_from_lru(struct ceph_osd *osd) 835 { 836 dout("__remove_osd_from_lru %p\n", osd); 837 if (!list_empty(&osd->o_osd_lru)) 838 list_del_init(&osd->o_osd_lru); 839 } 840 841 static void remove_old_osds(struct ceph_osd_client *osdc) 842 { 843 struct ceph_osd *osd, *nosd; 844 845 dout("__remove_old_osds %p\n", osdc); 846 mutex_lock(&osdc->request_mutex); 847 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 848 if (time_before(jiffies, osd->lru_ttl)) 849 break; 850 __remove_osd(osdc, osd); 851 } 852 mutex_unlock(&osdc->request_mutex); 853 } 854 855 /* 856 * reset osd connect 857 */ 858 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 859 { 860 struct ceph_entity_addr *peer_addr; 861 862 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 863 if (list_empty(&osd->o_requests) && 864 list_empty(&osd->o_linger_requests)) { 865 __remove_osd(osdc, osd); 866 867 return -ENODEV; 868 } 869 870 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 871 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 872 !ceph_con_opened(&osd->o_con)) { 873 struct ceph_osd_request *req; 874 875 dout(" osd addr hasn't changed and connection never opened," 876 " letting msgr retry"); 877 /* touch each r_stamp for handle_timeout()'s benfit */ 878 list_for_each_entry(req, &osd->o_requests, r_osd_item) 879 req->r_stamp = jiffies; 880 881 return -EAGAIN; 882 } 883 884 ceph_con_close(&osd->o_con); 885 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 886 osd->o_incarnation++; 887 888 return 0; 889 } 890 891 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 892 { 893 struct rb_node **p = &osdc->osds.rb_node; 894 struct rb_node *parent = NULL; 895 struct ceph_osd *osd = NULL; 896 897 dout("__insert_osd %p osd%d\n", new, new->o_osd); 898 while (*p) { 899 parent = *p; 900 osd = rb_entry(parent, struct ceph_osd, o_node); 901 if (new->o_osd < osd->o_osd) 902 p = &(*p)->rb_left; 903 else if (new->o_osd > osd->o_osd) 904 p = &(*p)->rb_right; 905 else 906 BUG(); 907 } 908 909 rb_link_node(&new->o_node, parent, p); 910 rb_insert_color(&new->o_node, &osdc->osds); 911 } 912 913 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 914 { 915 struct ceph_osd *osd; 916 struct rb_node *n = osdc->osds.rb_node; 917 918 while (n) { 919 osd = rb_entry(n, struct ceph_osd, o_node); 920 if (o < osd->o_osd) 921 n = n->rb_left; 922 else if (o > osd->o_osd) 923 n = n->rb_right; 924 else 925 return osd; 926 } 927 return NULL; 928 } 929 930 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 931 { 932 schedule_delayed_work(&osdc->timeout_work, 933 osdc->client->options->osd_keepalive_timeout * HZ); 934 } 935 936 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 937 { 938 cancel_delayed_work(&osdc->timeout_work); 939 } 940 941 /* 942 * Register request, assign tid. If this is the first request, set up 943 * the timeout event. 944 */ 945 static void __register_request(struct ceph_osd_client *osdc, 946 struct ceph_osd_request *req) 947 { 948 req->r_tid = ++osdc->last_tid; 949 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 950 dout("__register_request %p tid %lld\n", req, req->r_tid); 951 __insert_request(osdc, req); 952 ceph_osdc_get_request(req); 953 osdc->num_requests++; 954 if (osdc->num_requests == 1) { 955 dout(" first request, scheduling timeout\n"); 956 __schedule_osd_timeout(osdc); 957 } 958 } 959 960 /* 961 * called under osdc->request_mutex 962 */ 963 static void __unregister_request(struct ceph_osd_client *osdc, 964 struct ceph_osd_request *req) 965 { 966 if (RB_EMPTY_NODE(&req->r_node)) { 967 dout("__unregister_request %p tid %lld not registered\n", 968 req, req->r_tid); 969 return; 970 } 971 972 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 973 rb_erase(&req->r_node, &osdc->requests); 974 osdc->num_requests--; 975 976 if (req->r_osd) { 977 /* make sure the original request isn't in flight. */ 978 ceph_msg_revoke(req->r_request); 979 980 list_del_init(&req->r_osd_item); 981 if (list_empty(&req->r_osd->o_requests) && 982 list_empty(&req->r_osd->o_linger_requests)) { 983 dout("moving osd to %p lru\n", req->r_osd); 984 __move_osd_to_lru(osdc, req->r_osd); 985 } 986 if (list_empty(&req->r_linger_item)) 987 req->r_osd = NULL; 988 } 989 990 list_del_init(&req->r_req_lru_item); 991 ceph_osdc_put_request(req); 992 993 if (osdc->num_requests == 0) { 994 dout(" no requests, canceling timeout\n"); 995 __cancel_osd_timeout(osdc); 996 } 997 } 998 999 /* 1000 * Cancel a previously queued request message 1001 */ 1002 static void __cancel_request(struct ceph_osd_request *req) 1003 { 1004 if (req->r_sent && req->r_osd) { 1005 ceph_msg_revoke(req->r_request); 1006 req->r_sent = 0; 1007 } 1008 } 1009 1010 static void __register_linger_request(struct ceph_osd_client *osdc, 1011 struct ceph_osd_request *req) 1012 { 1013 dout("__register_linger_request %p\n", req); 1014 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1015 if (req->r_osd) 1016 list_add_tail(&req->r_linger_osd, 1017 &req->r_osd->o_linger_requests); 1018 } 1019 1020 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1021 struct ceph_osd_request *req) 1022 { 1023 dout("__unregister_linger_request %p\n", req); 1024 list_del_init(&req->r_linger_item); 1025 if (req->r_osd) { 1026 list_del_init(&req->r_linger_osd); 1027 1028 if (list_empty(&req->r_osd->o_requests) && 1029 list_empty(&req->r_osd->o_linger_requests)) { 1030 dout("moving osd to %p lru\n", req->r_osd); 1031 __move_osd_to_lru(osdc, req->r_osd); 1032 } 1033 if (list_empty(&req->r_osd_item)) 1034 req->r_osd = NULL; 1035 } 1036 } 1037 1038 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 1039 struct ceph_osd_request *req) 1040 { 1041 mutex_lock(&osdc->request_mutex); 1042 if (req->r_linger) { 1043 __unregister_linger_request(osdc, req); 1044 ceph_osdc_put_request(req); 1045 } 1046 mutex_unlock(&osdc->request_mutex); 1047 } 1048 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 1049 1050 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1051 struct ceph_osd_request *req) 1052 { 1053 if (!req->r_linger) { 1054 dout("set_request_linger %p\n", req); 1055 req->r_linger = 1; 1056 /* 1057 * caller is now responsible for calling 1058 * unregister_linger_request 1059 */ 1060 ceph_osdc_get_request(req); 1061 } 1062 } 1063 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1064 1065 /* 1066 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1067 * (as needed), and set the request r_osd appropriately. If there is 1068 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1069 * (unsent, homeless) or leave on in-flight lru. 1070 * 1071 * Return 0 if unchanged, 1 if changed, or negative on error. 1072 * 1073 * Caller should hold map_sem for read and request_mutex. 1074 */ 1075 static int __map_request(struct ceph_osd_client *osdc, 1076 struct ceph_osd_request *req, int force_resend) 1077 { 1078 struct ceph_pg pgid; 1079 int acting[CEPH_PG_MAX_SIZE]; 1080 int o = -1, num = 0; 1081 int err; 1082 1083 dout("map_request %p tid %lld\n", req, req->r_tid); 1084 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1085 ceph_file_layout_pg_pool(req->r_file_layout)); 1086 if (err) { 1087 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1088 return err; 1089 } 1090 req->r_pgid = pgid; 1091 1092 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 1093 if (err > 0) { 1094 o = acting[0]; 1095 num = err; 1096 } 1097 1098 if ((!force_resend && 1099 req->r_osd && req->r_osd->o_osd == o && 1100 req->r_sent >= req->r_osd->o_incarnation && 1101 req->r_num_pg_osds == num && 1102 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1103 (req->r_osd == NULL && o == -1)) 1104 return 0; /* no change */ 1105 1106 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1107 req->r_tid, pgid.pool, pgid.seed, o, 1108 req->r_osd ? req->r_osd->o_osd : -1); 1109 1110 /* record full pg acting set */ 1111 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1112 req->r_num_pg_osds = num; 1113 1114 if (req->r_osd) { 1115 __cancel_request(req); 1116 list_del_init(&req->r_osd_item); 1117 req->r_osd = NULL; 1118 } 1119 1120 req->r_osd = __lookup_osd(osdc, o); 1121 if (!req->r_osd && o >= 0) { 1122 err = -ENOMEM; 1123 req->r_osd = create_osd(osdc, o); 1124 if (!req->r_osd) { 1125 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1126 goto out; 1127 } 1128 1129 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1130 __insert_osd(osdc, req->r_osd); 1131 1132 ceph_con_open(&req->r_osd->o_con, 1133 CEPH_ENTITY_TYPE_OSD, o, 1134 &osdc->osdmap->osd_addr[o]); 1135 } 1136 1137 if (req->r_osd) { 1138 __remove_osd_from_lru(req->r_osd); 1139 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1140 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1141 } else { 1142 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1143 } 1144 err = 1; /* osd or pg changed */ 1145 1146 out: 1147 return err; 1148 } 1149 1150 /* 1151 * caller should hold map_sem (for read) and request_mutex 1152 */ 1153 static void __send_request(struct ceph_osd_client *osdc, 1154 struct ceph_osd_request *req) 1155 { 1156 void *p; 1157 1158 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1159 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1160 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1161 1162 /* fill in message content that changes each time we send it */ 1163 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1164 put_unaligned_le32(req->r_flags, req->r_request_flags); 1165 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1166 p = req->r_request_pgid; 1167 ceph_encode_64(&p, req->r_pgid.pool); 1168 ceph_encode_32(&p, req->r_pgid.seed); 1169 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1170 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1171 sizeof(req->r_reassert_version)); 1172 1173 req->r_stamp = jiffies; 1174 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1175 1176 ceph_msg_get(req->r_request); /* send consumes a ref */ 1177 ceph_con_send(&req->r_osd->o_con, req->r_request); 1178 req->r_sent = req->r_osd->o_incarnation; 1179 } 1180 1181 /* 1182 * Send any requests in the queue (req_unsent). 1183 */ 1184 static void __send_queued(struct ceph_osd_client *osdc) 1185 { 1186 struct ceph_osd_request *req, *tmp; 1187 1188 dout("__send_queued\n"); 1189 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1190 __send_request(osdc, req); 1191 } 1192 1193 /* 1194 * Timeout callback, called every N seconds when 1 or more osd 1195 * requests has been active for more than N seconds. When this 1196 * happens, we ping all OSDs with requests who have timed out to 1197 * ensure any communications channel reset is detected. Reset the 1198 * request timeouts another N seconds in the future as we go. 1199 * Reschedule the timeout event another N seconds in future (unless 1200 * there are no open requests). 1201 */ 1202 static void handle_timeout(struct work_struct *work) 1203 { 1204 struct ceph_osd_client *osdc = 1205 container_of(work, struct ceph_osd_client, timeout_work.work); 1206 struct ceph_osd_request *req; 1207 struct ceph_osd *osd; 1208 unsigned long keepalive = 1209 osdc->client->options->osd_keepalive_timeout * HZ; 1210 struct list_head slow_osds; 1211 dout("timeout\n"); 1212 down_read(&osdc->map_sem); 1213 1214 ceph_monc_request_next_osdmap(&osdc->client->monc); 1215 1216 mutex_lock(&osdc->request_mutex); 1217 1218 /* 1219 * ping osds that are a bit slow. this ensures that if there 1220 * is a break in the TCP connection we will notice, and reopen 1221 * a connection with that osd (from the fault callback). 1222 */ 1223 INIT_LIST_HEAD(&slow_osds); 1224 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1225 if (time_before(jiffies, req->r_stamp + keepalive)) 1226 break; 1227 1228 osd = req->r_osd; 1229 BUG_ON(!osd); 1230 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1231 req->r_tid, osd->o_osd); 1232 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1233 } 1234 while (!list_empty(&slow_osds)) { 1235 osd = list_entry(slow_osds.next, struct ceph_osd, 1236 o_keepalive_item); 1237 list_del_init(&osd->o_keepalive_item); 1238 ceph_con_keepalive(&osd->o_con); 1239 } 1240 1241 __schedule_osd_timeout(osdc); 1242 __send_queued(osdc); 1243 mutex_unlock(&osdc->request_mutex); 1244 up_read(&osdc->map_sem); 1245 } 1246 1247 static void handle_osds_timeout(struct work_struct *work) 1248 { 1249 struct ceph_osd_client *osdc = 1250 container_of(work, struct ceph_osd_client, 1251 osds_timeout_work.work); 1252 unsigned long delay = 1253 osdc->client->options->osd_idle_ttl * HZ >> 2; 1254 1255 dout("osds timeout\n"); 1256 down_read(&osdc->map_sem); 1257 remove_old_osds(osdc); 1258 up_read(&osdc->map_sem); 1259 1260 schedule_delayed_work(&osdc->osds_timeout_work, 1261 round_jiffies_relative(delay)); 1262 } 1263 1264 static void complete_request(struct ceph_osd_request *req) 1265 { 1266 if (req->r_safe_callback) 1267 req->r_safe_callback(req, NULL); 1268 complete_all(&req->r_safe_completion); /* fsync waiter */ 1269 } 1270 1271 static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid) 1272 { 1273 __u8 v; 1274 1275 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad); 1276 v = ceph_decode_8(p); 1277 if (v > 1) { 1278 pr_warning("do not understand pg encoding %d > 1", v); 1279 return -EINVAL; 1280 } 1281 pgid->pool = ceph_decode_64(p); 1282 pgid->seed = ceph_decode_32(p); 1283 *p += 4; 1284 return 0; 1285 1286 bad: 1287 pr_warning("incomplete pg encoding"); 1288 return -EINVAL; 1289 } 1290 1291 /* 1292 * handle osd op reply. either call the callback if it is specified, 1293 * or do the completion to wake up the waiting thread. 1294 */ 1295 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1296 struct ceph_connection *con) 1297 { 1298 void *p, *end; 1299 struct ceph_osd_request *req; 1300 u64 tid; 1301 int object_len; 1302 int numops, payload_len, flags; 1303 s32 result; 1304 s32 retry_attempt; 1305 struct ceph_pg pg; 1306 int err; 1307 u32 reassert_epoch; 1308 u64 reassert_version; 1309 u32 osdmap_epoch; 1310 int already_completed; 1311 int i; 1312 1313 tid = le64_to_cpu(msg->hdr.tid); 1314 dout("handle_reply %p tid %llu\n", msg, tid); 1315 1316 p = msg->front.iov_base; 1317 end = p + msg->front.iov_len; 1318 1319 ceph_decode_need(&p, end, 4, bad); 1320 object_len = ceph_decode_32(&p); 1321 ceph_decode_need(&p, end, object_len, bad); 1322 p += object_len; 1323 1324 err = __decode_pgid(&p, end, &pg); 1325 if (err) 1326 goto bad; 1327 1328 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1329 flags = ceph_decode_64(&p); 1330 result = ceph_decode_32(&p); 1331 reassert_epoch = ceph_decode_32(&p); 1332 reassert_version = ceph_decode_64(&p); 1333 osdmap_epoch = ceph_decode_32(&p); 1334 1335 /* lookup */ 1336 mutex_lock(&osdc->request_mutex); 1337 req = __lookup_request(osdc, tid); 1338 if (req == NULL) { 1339 dout("handle_reply tid %llu dne\n", tid); 1340 goto bad_mutex; 1341 } 1342 ceph_osdc_get_request(req); 1343 1344 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1345 req, result); 1346 1347 ceph_decode_need(&p, end, 4, bad); 1348 numops = ceph_decode_32(&p); 1349 if (numops > CEPH_OSD_MAX_OP) 1350 goto bad_put; 1351 if (numops != req->r_num_ops) 1352 goto bad_put; 1353 payload_len = 0; 1354 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); 1355 for (i = 0; i < numops; i++) { 1356 struct ceph_osd_op *op = p; 1357 int len; 1358 1359 len = le32_to_cpu(op->payload_len); 1360 req->r_reply_op_len[i] = len; 1361 dout(" op %d has %d bytes\n", i, len); 1362 payload_len += len; 1363 p += sizeof(*op); 1364 } 1365 if (payload_len != le32_to_cpu(msg->hdr.data_len)) { 1366 pr_warning("sum of op payload lens %d != data_len %d", 1367 payload_len, le32_to_cpu(msg->hdr.data_len)); 1368 goto bad_put; 1369 } 1370 1371 ceph_decode_need(&p, end, 4 + numops * 4, bad); 1372 retry_attempt = ceph_decode_32(&p); 1373 for (i = 0; i < numops; i++) 1374 req->r_reply_op_result[i] = ceph_decode_32(&p); 1375 1376 /* 1377 * if this connection filled our message, drop our reference now, to 1378 * avoid a (safe but slower) revoke later. 1379 */ 1380 if (req->r_con_filling_msg == con && req->r_reply == msg) { 1381 dout(" dropping con_filling_msg ref %p\n", con); 1382 req->r_con_filling_msg = NULL; 1383 con->ops->put(con); 1384 } 1385 1386 if (!req->r_got_reply) { 1387 unsigned int bytes; 1388 1389 req->r_result = result; 1390 bytes = le32_to_cpu(msg->hdr.data_len); 1391 dout("handle_reply result %d bytes %d\n", req->r_result, 1392 bytes); 1393 if (req->r_result == 0) 1394 req->r_result = bytes; 1395 1396 /* in case this is a write and we need to replay, */ 1397 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1398 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1399 1400 req->r_got_reply = 1; 1401 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1402 dout("handle_reply tid %llu dup ack\n", tid); 1403 mutex_unlock(&osdc->request_mutex); 1404 goto done; 1405 } 1406 1407 dout("handle_reply tid %llu flags %d\n", tid, flags); 1408 1409 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1410 __register_linger_request(osdc, req); 1411 1412 /* either this is a read, or we got the safe response */ 1413 if (result < 0 || 1414 (flags & CEPH_OSD_FLAG_ONDISK) || 1415 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1416 __unregister_request(osdc, req); 1417 1418 already_completed = req->r_completed; 1419 req->r_completed = 1; 1420 mutex_unlock(&osdc->request_mutex); 1421 if (already_completed) 1422 goto done; 1423 1424 if (req->r_callback) 1425 req->r_callback(req, msg); 1426 else 1427 complete_all(&req->r_completion); 1428 1429 if (flags & CEPH_OSD_FLAG_ONDISK) 1430 complete_request(req); 1431 1432 done: 1433 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1434 ceph_osdc_put_request(req); 1435 return; 1436 1437 bad_put: 1438 ceph_osdc_put_request(req); 1439 bad_mutex: 1440 mutex_unlock(&osdc->request_mutex); 1441 bad: 1442 pr_err("corrupt osd_op_reply got %d %d\n", 1443 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1444 ceph_msg_dump(msg); 1445 } 1446 1447 static void reset_changed_osds(struct ceph_osd_client *osdc) 1448 { 1449 struct rb_node *p, *n; 1450 1451 for (p = rb_first(&osdc->osds); p; p = n) { 1452 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1453 1454 n = rb_next(p); 1455 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1456 memcmp(&osd->o_con.peer_addr, 1457 ceph_osd_addr(osdc->osdmap, 1458 osd->o_osd), 1459 sizeof(struct ceph_entity_addr)) != 0) 1460 __reset_osd(osdc, osd); 1461 } 1462 } 1463 1464 /* 1465 * Requeue requests whose mapping to an OSD has changed. If requests map to 1466 * no osd, request a new map. 1467 * 1468 * Caller should hold map_sem for read. 1469 */ 1470 static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1471 { 1472 struct ceph_osd_request *req, *nreq; 1473 struct rb_node *p; 1474 int needmap = 0; 1475 int err; 1476 1477 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1478 mutex_lock(&osdc->request_mutex); 1479 for (p = rb_first(&osdc->requests); p; ) { 1480 req = rb_entry(p, struct ceph_osd_request, r_node); 1481 p = rb_next(p); 1482 1483 /* 1484 * For linger requests that have not yet been 1485 * registered, move them to the linger list; they'll 1486 * be sent to the osd in the loop below. Unregister 1487 * the request before re-registering it as a linger 1488 * request to ensure the __map_request() below 1489 * will decide it needs to be sent. 1490 */ 1491 if (req->r_linger && list_empty(&req->r_linger_item)) { 1492 dout("%p tid %llu restart on osd%d\n", 1493 req, req->r_tid, 1494 req->r_osd ? req->r_osd->o_osd : -1); 1495 __unregister_request(osdc, req); 1496 __register_linger_request(osdc, req); 1497 continue; 1498 } 1499 1500 err = __map_request(osdc, req, force_resend); 1501 if (err < 0) 1502 continue; /* error */ 1503 if (req->r_osd == NULL) { 1504 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1505 needmap++; /* request a newer map */ 1506 } else if (err > 0) { 1507 if (!req->r_linger) { 1508 dout("%p tid %llu requeued on osd%d\n", req, 1509 req->r_tid, 1510 req->r_osd ? req->r_osd->o_osd : -1); 1511 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1512 } 1513 } 1514 } 1515 1516 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1517 r_linger_item) { 1518 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1519 1520 err = __map_request(osdc, req, force_resend); 1521 dout("__map_request returned %d\n", err); 1522 if (err == 0) 1523 continue; /* no change and no osd was specified */ 1524 if (err < 0) 1525 continue; /* hrm! */ 1526 if (req->r_osd == NULL) { 1527 dout("tid %llu maps to no valid osd\n", req->r_tid); 1528 needmap++; /* request a newer map */ 1529 continue; 1530 } 1531 1532 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1533 req->r_osd ? req->r_osd->o_osd : -1); 1534 __register_request(osdc, req); 1535 __unregister_linger_request(osdc, req); 1536 } 1537 mutex_unlock(&osdc->request_mutex); 1538 1539 if (needmap) { 1540 dout("%d requests for down osds, need new map\n", needmap); 1541 ceph_monc_request_next_osdmap(&osdc->client->monc); 1542 } 1543 reset_changed_osds(osdc); 1544 } 1545 1546 1547 /* 1548 * Process updated osd map. 1549 * 1550 * The message contains any number of incremental and full maps, normally 1551 * indicating some sort of topology change in the cluster. Kick requests 1552 * off to different OSDs as needed. 1553 */ 1554 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1555 { 1556 void *p, *end, *next; 1557 u32 nr_maps, maplen; 1558 u32 epoch; 1559 struct ceph_osdmap *newmap = NULL, *oldmap; 1560 int err; 1561 struct ceph_fsid fsid; 1562 1563 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1564 p = msg->front.iov_base; 1565 end = p + msg->front.iov_len; 1566 1567 /* verify fsid */ 1568 ceph_decode_need(&p, end, sizeof(fsid), bad); 1569 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1570 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1571 return; 1572 1573 down_write(&osdc->map_sem); 1574 1575 /* incremental maps */ 1576 ceph_decode_32_safe(&p, end, nr_maps, bad); 1577 dout(" %d inc maps\n", nr_maps); 1578 while (nr_maps > 0) { 1579 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1580 epoch = ceph_decode_32(&p); 1581 maplen = ceph_decode_32(&p); 1582 ceph_decode_need(&p, end, maplen, bad); 1583 next = p + maplen; 1584 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1585 dout("applying incremental map %u len %d\n", 1586 epoch, maplen); 1587 newmap = osdmap_apply_incremental(&p, next, 1588 osdc->osdmap, 1589 &osdc->client->msgr); 1590 if (IS_ERR(newmap)) { 1591 err = PTR_ERR(newmap); 1592 goto bad; 1593 } 1594 BUG_ON(!newmap); 1595 if (newmap != osdc->osdmap) { 1596 ceph_osdmap_destroy(osdc->osdmap); 1597 osdc->osdmap = newmap; 1598 } 1599 kick_requests(osdc, 0); 1600 } else { 1601 dout("ignoring incremental map %u len %d\n", 1602 epoch, maplen); 1603 } 1604 p = next; 1605 nr_maps--; 1606 } 1607 if (newmap) 1608 goto done; 1609 1610 /* full maps */ 1611 ceph_decode_32_safe(&p, end, nr_maps, bad); 1612 dout(" %d full maps\n", nr_maps); 1613 while (nr_maps) { 1614 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1615 epoch = ceph_decode_32(&p); 1616 maplen = ceph_decode_32(&p); 1617 ceph_decode_need(&p, end, maplen, bad); 1618 if (nr_maps > 1) { 1619 dout("skipping non-latest full map %u len %d\n", 1620 epoch, maplen); 1621 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1622 dout("skipping full map %u len %d, " 1623 "older than our %u\n", epoch, maplen, 1624 osdc->osdmap->epoch); 1625 } else { 1626 int skipped_map = 0; 1627 1628 dout("taking full map %u len %d\n", epoch, maplen); 1629 newmap = osdmap_decode(&p, p+maplen); 1630 if (IS_ERR(newmap)) { 1631 err = PTR_ERR(newmap); 1632 goto bad; 1633 } 1634 BUG_ON(!newmap); 1635 oldmap = osdc->osdmap; 1636 osdc->osdmap = newmap; 1637 if (oldmap) { 1638 if (oldmap->epoch + 1 < newmap->epoch) 1639 skipped_map = 1; 1640 ceph_osdmap_destroy(oldmap); 1641 } 1642 kick_requests(osdc, skipped_map); 1643 } 1644 p += maplen; 1645 nr_maps--; 1646 } 1647 1648 done: 1649 downgrade_write(&osdc->map_sem); 1650 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1651 1652 /* 1653 * subscribe to subsequent osdmap updates if full to ensure 1654 * we find out when we are no longer full and stop returning 1655 * ENOSPC. 1656 */ 1657 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1658 ceph_monc_request_next_osdmap(&osdc->client->monc); 1659 1660 mutex_lock(&osdc->request_mutex); 1661 __send_queued(osdc); 1662 mutex_unlock(&osdc->request_mutex); 1663 up_read(&osdc->map_sem); 1664 wake_up_all(&osdc->client->auth_wq); 1665 return; 1666 1667 bad: 1668 pr_err("osdc handle_map corrupt msg\n"); 1669 ceph_msg_dump(msg); 1670 up_write(&osdc->map_sem); 1671 return; 1672 } 1673 1674 /* 1675 * watch/notify callback event infrastructure 1676 * 1677 * These callbacks are used both for watch and notify operations. 1678 */ 1679 static void __release_event(struct kref *kref) 1680 { 1681 struct ceph_osd_event *event = 1682 container_of(kref, struct ceph_osd_event, kref); 1683 1684 dout("__release_event %p\n", event); 1685 kfree(event); 1686 } 1687 1688 static void get_event(struct ceph_osd_event *event) 1689 { 1690 kref_get(&event->kref); 1691 } 1692 1693 void ceph_osdc_put_event(struct ceph_osd_event *event) 1694 { 1695 kref_put(&event->kref, __release_event); 1696 } 1697 EXPORT_SYMBOL(ceph_osdc_put_event); 1698 1699 static void __insert_event(struct ceph_osd_client *osdc, 1700 struct ceph_osd_event *new) 1701 { 1702 struct rb_node **p = &osdc->event_tree.rb_node; 1703 struct rb_node *parent = NULL; 1704 struct ceph_osd_event *event = NULL; 1705 1706 while (*p) { 1707 parent = *p; 1708 event = rb_entry(parent, struct ceph_osd_event, node); 1709 if (new->cookie < event->cookie) 1710 p = &(*p)->rb_left; 1711 else if (new->cookie > event->cookie) 1712 p = &(*p)->rb_right; 1713 else 1714 BUG(); 1715 } 1716 1717 rb_link_node(&new->node, parent, p); 1718 rb_insert_color(&new->node, &osdc->event_tree); 1719 } 1720 1721 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1722 u64 cookie) 1723 { 1724 struct rb_node **p = &osdc->event_tree.rb_node; 1725 struct rb_node *parent = NULL; 1726 struct ceph_osd_event *event = NULL; 1727 1728 while (*p) { 1729 parent = *p; 1730 event = rb_entry(parent, struct ceph_osd_event, node); 1731 if (cookie < event->cookie) 1732 p = &(*p)->rb_left; 1733 else if (cookie > event->cookie) 1734 p = &(*p)->rb_right; 1735 else 1736 return event; 1737 } 1738 return NULL; 1739 } 1740 1741 static void __remove_event(struct ceph_osd_event *event) 1742 { 1743 struct ceph_osd_client *osdc = event->osdc; 1744 1745 if (!RB_EMPTY_NODE(&event->node)) { 1746 dout("__remove_event removed %p\n", event); 1747 rb_erase(&event->node, &osdc->event_tree); 1748 ceph_osdc_put_event(event); 1749 } else { 1750 dout("__remove_event didn't remove %p\n", event); 1751 } 1752 } 1753 1754 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1755 void (*event_cb)(u64, u64, u8, void *), 1756 void *data, struct ceph_osd_event **pevent) 1757 { 1758 struct ceph_osd_event *event; 1759 1760 event = kmalloc(sizeof(*event), GFP_NOIO); 1761 if (!event) 1762 return -ENOMEM; 1763 1764 dout("create_event %p\n", event); 1765 event->cb = event_cb; 1766 event->one_shot = 0; 1767 event->data = data; 1768 event->osdc = osdc; 1769 INIT_LIST_HEAD(&event->osd_node); 1770 RB_CLEAR_NODE(&event->node); 1771 kref_init(&event->kref); /* one ref for us */ 1772 kref_get(&event->kref); /* one ref for the caller */ 1773 1774 spin_lock(&osdc->event_lock); 1775 event->cookie = ++osdc->event_count; 1776 __insert_event(osdc, event); 1777 spin_unlock(&osdc->event_lock); 1778 1779 *pevent = event; 1780 return 0; 1781 } 1782 EXPORT_SYMBOL(ceph_osdc_create_event); 1783 1784 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1785 { 1786 struct ceph_osd_client *osdc = event->osdc; 1787 1788 dout("cancel_event %p\n", event); 1789 spin_lock(&osdc->event_lock); 1790 __remove_event(event); 1791 spin_unlock(&osdc->event_lock); 1792 ceph_osdc_put_event(event); /* caller's */ 1793 } 1794 EXPORT_SYMBOL(ceph_osdc_cancel_event); 1795 1796 1797 static void do_event_work(struct work_struct *work) 1798 { 1799 struct ceph_osd_event_work *event_work = 1800 container_of(work, struct ceph_osd_event_work, work); 1801 struct ceph_osd_event *event = event_work->event; 1802 u64 ver = event_work->ver; 1803 u64 notify_id = event_work->notify_id; 1804 u8 opcode = event_work->opcode; 1805 1806 dout("do_event_work completing %p\n", event); 1807 event->cb(ver, notify_id, opcode, event->data); 1808 dout("do_event_work completed %p\n", event); 1809 ceph_osdc_put_event(event); 1810 kfree(event_work); 1811 } 1812 1813 1814 /* 1815 * Process osd watch notifications 1816 */ 1817 static void handle_watch_notify(struct ceph_osd_client *osdc, 1818 struct ceph_msg *msg) 1819 { 1820 void *p, *end; 1821 u8 proto_ver; 1822 u64 cookie, ver, notify_id; 1823 u8 opcode; 1824 struct ceph_osd_event *event; 1825 struct ceph_osd_event_work *event_work; 1826 1827 p = msg->front.iov_base; 1828 end = p + msg->front.iov_len; 1829 1830 ceph_decode_8_safe(&p, end, proto_ver, bad); 1831 ceph_decode_8_safe(&p, end, opcode, bad); 1832 ceph_decode_64_safe(&p, end, cookie, bad); 1833 ceph_decode_64_safe(&p, end, ver, bad); 1834 ceph_decode_64_safe(&p, end, notify_id, bad); 1835 1836 spin_lock(&osdc->event_lock); 1837 event = __find_event(osdc, cookie); 1838 if (event) { 1839 BUG_ON(event->one_shot); 1840 get_event(event); 1841 } 1842 spin_unlock(&osdc->event_lock); 1843 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1844 cookie, ver, event); 1845 if (event) { 1846 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1847 if (!event_work) { 1848 dout("ERROR: could not allocate event_work\n"); 1849 goto done_err; 1850 } 1851 INIT_WORK(&event_work->work, do_event_work); 1852 event_work->event = event; 1853 event_work->ver = ver; 1854 event_work->notify_id = notify_id; 1855 event_work->opcode = opcode; 1856 if (!queue_work(osdc->notify_wq, &event_work->work)) { 1857 dout("WARNING: failed to queue notify event work\n"); 1858 goto done_err; 1859 } 1860 } 1861 1862 return; 1863 1864 done_err: 1865 ceph_osdc_put_event(event); 1866 return; 1867 1868 bad: 1869 pr_err("osdc handle_watch_notify corrupt msg\n"); 1870 return; 1871 } 1872 1873 static void ceph_osdc_msg_data_set(struct ceph_msg *msg, 1874 struct ceph_osd_data *osd_data) 1875 { 1876 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 1877 BUG_ON(osd_data->length > (u64) SIZE_MAX); 1878 if (osd_data->length) 1879 ceph_msg_data_set_pages(msg, osd_data->pages, 1880 osd_data->length, osd_data->alignment); 1881 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 1882 BUG_ON(!osd_data->pagelist->length); 1883 ceph_msg_data_set_pagelist(msg, osd_data->pagelist); 1884 #ifdef CONFIG_BLOCK 1885 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 1886 ceph_msg_data_set_bio(msg, osd_data->bio); 1887 #endif 1888 } else { 1889 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 1890 } 1891 } 1892 1893 /* 1894 * Register request, send initial attempt. 1895 */ 1896 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1897 struct ceph_osd_request *req, 1898 bool nofail) 1899 { 1900 int rc = 0; 1901 1902 /* Set up response incoming data and request outgoing data fields */ 1903 1904 ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in); 1905 ceph_osdc_msg_data_set(req->r_request, &req->r_data_out); 1906 1907 down_read(&osdc->map_sem); 1908 mutex_lock(&osdc->request_mutex); 1909 __register_request(osdc, req); 1910 WARN_ON(req->r_sent); 1911 rc = __map_request(osdc, req, 0); 1912 if (rc < 0) { 1913 if (nofail) { 1914 dout("osdc_start_request failed map, " 1915 " will retry %lld\n", req->r_tid); 1916 rc = 0; 1917 } 1918 goto out_unlock; 1919 } 1920 if (req->r_osd == NULL) { 1921 dout("send_request %p no up osds in pg\n", req); 1922 ceph_monc_request_next_osdmap(&osdc->client->monc); 1923 } else { 1924 __send_queued(osdc); 1925 } 1926 rc = 0; 1927 out_unlock: 1928 mutex_unlock(&osdc->request_mutex); 1929 up_read(&osdc->map_sem); 1930 return rc; 1931 } 1932 EXPORT_SYMBOL(ceph_osdc_start_request); 1933 1934 /* 1935 * wait for a request to complete 1936 */ 1937 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1938 struct ceph_osd_request *req) 1939 { 1940 int rc; 1941 1942 rc = wait_for_completion_interruptible(&req->r_completion); 1943 if (rc < 0) { 1944 mutex_lock(&osdc->request_mutex); 1945 __cancel_request(req); 1946 __unregister_request(osdc, req); 1947 mutex_unlock(&osdc->request_mutex); 1948 complete_request(req); 1949 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1950 return rc; 1951 } 1952 1953 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1954 return req->r_result; 1955 } 1956 EXPORT_SYMBOL(ceph_osdc_wait_request); 1957 1958 /* 1959 * sync - wait for all in-flight requests to flush. avoid starvation. 1960 */ 1961 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1962 { 1963 struct ceph_osd_request *req; 1964 u64 last_tid, next_tid = 0; 1965 1966 mutex_lock(&osdc->request_mutex); 1967 last_tid = osdc->last_tid; 1968 while (1) { 1969 req = __lookup_request_ge(osdc, next_tid); 1970 if (!req) 1971 break; 1972 if (req->r_tid > last_tid) 1973 break; 1974 1975 next_tid = req->r_tid + 1; 1976 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1977 continue; 1978 1979 ceph_osdc_get_request(req); 1980 mutex_unlock(&osdc->request_mutex); 1981 dout("sync waiting on tid %llu (last is %llu)\n", 1982 req->r_tid, last_tid); 1983 wait_for_completion(&req->r_safe_completion); 1984 mutex_lock(&osdc->request_mutex); 1985 ceph_osdc_put_request(req); 1986 } 1987 mutex_unlock(&osdc->request_mutex); 1988 dout("sync done (thru tid %llu)\n", last_tid); 1989 } 1990 EXPORT_SYMBOL(ceph_osdc_sync); 1991 1992 /* 1993 * init, shutdown 1994 */ 1995 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1996 { 1997 int err; 1998 1999 dout("init\n"); 2000 osdc->client = client; 2001 osdc->osdmap = NULL; 2002 init_rwsem(&osdc->map_sem); 2003 init_completion(&osdc->map_waiters); 2004 osdc->last_requested_map = 0; 2005 mutex_init(&osdc->request_mutex); 2006 osdc->last_tid = 0; 2007 osdc->osds = RB_ROOT; 2008 INIT_LIST_HEAD(&osdc->osd_lru); 2009 osdc->requests = RB_ROOT; 2010 INIT_LIST_HEAD(&osdc->req_lru); 2011 INIT_LIST_HEAD(&osdc->req_unsent); 2012 INIT_LIST_HEAD(&osdc->req_notarget); 2013 INIT_LIST_HEAD(&osdc->req_linger); 2014 osdc->num_requests = 0; 2015 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 2016 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 2017 spin_lock_init(&osdc->event_lock); 2018 osdc->event_tree = RB_ROOT; 2019 osdc->event_count = 0; 2020 2021 schedule_delayed_work(&osdc->osds_timeout_work, 2022 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 2023 2024 err = -ENOMEM; 2025 osdc->req_mempool = mempool_create_kmalloc_pool(10, 2026 sizeof(struct ceph_osd_request)); 2027 if (!osdc->req_mempool) 2028 goto out; 2029 2030 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2031 OSD_OP_FRONT_LEN, 10, true, 2032 "osd_op"); 2033 if (err < 0) 2034 goto out_mempool; 2035 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2036 OSD_OPREPLY_FRONT_LEN, 10, true, 2037 "osd_op_reply"); 2038 if (err < 0) 2039 goto out_msgpool; 2040 2041 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2042 if (IS_ERR(osdc->notify_wq)) { 2043 err = PTR_ERR(osdc->notify_wq); 2044 osdc->notify_wq = NULL; 2045 goto out_msgpool; 2046 } 2047 return 0; 2048 2049 out_msgpool: 2050 ceph_msgpool_destroy(&osdc->msgpool_op); 2051 out_mempool: 2052 mempool_destroy(osdc->req_mempool); 2053 out: 2054 return err; 2055 } 2056 2057 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2058 { 2059 flush_workqueue(osdc->notify_wq); 2060 destroy_workqueue(osdc->notify_wq); 2061 cancel_delayed_work_sync(&osdc->timeout_work); 2062 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2063 if (osdc->osdmap) { 2064 ceph_osdmap_destroy(osdc->osdmap); 2065 osdc->osdmap = NULL; 2066 } 2067 remove_all_osds(osdc); 2068 mempool_destroy(osdc->req_mempool); 2069 ceph_msgpool_destroy(&osdc->msgpool_op); 2070 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2071 } 2072 2073 /* 2074 * Read some contiguous pages. If we cross a stripe boundary, shorten 2075 * *plen. Return number of bytes read, or error. 2076 */ 2077 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2078 struct ceph_vino vino, struct ceph_file_layout *layout, 2079 u64 off, u64 *plen, 2080 u32 truncate_seq, u64 truncate_size, 2081 struct page **pages, int num_pages, int page_align) 2082 { 2083 struct ceph_osd_request *req; 2084 struct ceph_osd_data *osd_data; 2085 int rc = 0; 2086 2087 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2088 vino.snap, off, *plen); 2089 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 2090 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2091 NULL, 0, truncate_seq, truncate_size, NULL, 2092 false); 2093 if (IS_ERR(req)) 2094 return PTR_ERR(req); 2095 2096 /* it may be a short read due to an object boundary */ 2097 2098 osd_data = &req->r_data_in; 2099 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2100 osd_data->pages = pages; 2101 osd_data->length = *plen; 2102 osd_data->alignment = page_align; 2103 2104 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2105 off, *plen, osd_data->length, page_align); 2106 2107 rc = ceph_osdc_start_request(osdc, req, false); 2108 if (!rc) 2109 rc = ceph_osdc_wait_request(osdc, req); 2110 2111 ceph_osdc_put_request(req); 2112 dout("readpages result %d\n", rc); 2113 return rc; 2114 } 2115 EXPORT_SYMBOL(ceph_osdc_readpages); 2116 2117 /* 2118 * do a synchronous write on N pages 2119 */ 2120 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2121 struct ceph_file_layout *layout, 2122 struct ceph_snap_context *snapc, 2123 u64 off, u64 len, 2124 u32 truncate_seq, u64 truncate_size, 2125 struct timespec *mtime, 2126 struct page **pages, int num_pages) 2127 { 2128 struct ceph_osd_request *req; 2129 struct ceph_osd_data *osd_data; 2130 int rc = 0; 2131 int page_align = off & ~PAGE_MASK; 2132 2133 BUG_ON(vino.snap != CEPH_NOSNAP); 2134 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 2135 CEPH_OSD_OP_WRITE, 2136 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2137 snapc, 0, 2138 truncate_seq, truncate_size, mtime, 2139 true); 2140 if (IS_ERR(req)) 2141 return PTR_ERR(req); 2142 2143 /* it may be a short write due to an object boundary */ 2144 osd_data = &req->r_data_out; 2145 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2146 osd_data->pages = pages; 2147 osd_data->length = len; 2148 osd_data->alignment = page_align; 2149 dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length); 2150 2151 rc = ceph_osdc_start_request(osdc, req, true); 2152 if (!rc) 2153 rc = ceph_osdc_wait_request(osdc, req); 2154 2155 ceph_osdc_put_request(req); 2156 if (rc == 0) 2157 rc = len; 2158 dout("writepages result %d\n", rc); 2159 return rc; 2160 } 2161 EXPORT_SYMBOL(ceph_osdc_writepages); 2162 2163 /* 2164 * handle incoming message 2165 */ 2166 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2167 { 2168 struct ceph_osd *osd = con->private; 2169 struct ceph_osd_client *osdc; 2170 int type = le16_to_cpu(msg->hdr.type); 2171 2172 if (!osd) 2173 goto out; 2174 osdc = osd->o_osdc; 2175 2176 switch (type) { 2177 case CEPH_MSG_OSD_MAP: 2178 ceph_osdc_handle_map(osdc, msg); 2179 break; 2180 case CEPH_MSG_OSD_OPREPLY: 2181 handle_reply(osdc, msg, con); 2182 break; 2183 case CEPH_MSG_WATCH_NOTIFY: 2184 handle_watch_notify(osdc, msg); 2185 break; 2186 2187 default: 2188 pr_err("received unknown message type %d %s\n", type, 2189 ceph_msg_type_name(type)); 2190 } 2191 out: 2192 ceph_msg_put(msg); 2193 } 2194 2195 /* 2196 * lookup and return message for incoming reply. set up reply message 2197 * pages. 2198 */ 2199 static struct ceph_msg *get_reply(struct ceph_connection *con, 2200 struct ceph_msg_header *hdr, 2201 int *skip) 2202 { 2203 struct ceph_osd *osd = con->private; 2204 struct ceph_osd_client *osdc = osd->o_osdc; 2205 struct ceph_msg *m; 2206 struct ceph_osd_request *req; 2207 int front = le32_to_cpu(hdr->front_len); 2208 int data_len = le32_to_cpu(hdr->data_len); 2209 u64 tid; 2210 2211 tid = le64_to_cpu(hdr->tid); 2212 mutex_lock(&osdc->request_mutex); 2213 req = __lookup_request(osdc, tid); 2214 if (!req) { 2215 *skip = 1; 2216 m = NULL; 2217 dout("get_reply unknown tid %llu from osd%d\n", tid, 2218 osd->o_osd); 2219 goto out; 2220 } 2221 2222 if (req->r_con_filling_msg) { 2223 dout("%s revoking msg %p from old con %p\n", __func__, 2224 req->r_reply, req->r_con_filling_msg); 2225 ceph_msg_revoke_incoming(req->r_reply); 2226 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 2227 req->r_con_filling_msg = NULL; 2228 } 2229 2230 if (front > req->r_reply->front.iov_len) { 2231 pr_warning("get_reply front %d > preallocated %d\n", 2232 front, (int)req->r_reply->front.iov_len); 2233 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2234 if (!m) 2235 goto out; 2236 ceph_msg_put(req->r_reply); 2237 req->r_reply = m; 2238 } 2239 m = ceph_msg_get(req->r_reply); 2240 2241 if (data_len > 0) { 2242 struct ceph_osd_data *osd_data = &req->r_data_in; 2243 2244 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2245 if (osd_data->pages && 2246 unlikely(osd_data->length < data_len)) { 2247 2248 pr_warning("tid %lld reply has %d bytes " 2249 "we had only %llu bytes ready\n", 2250 tid, data_len, osd_data->length); 2251 *skip = 1; 2252 ceph_msg_put(m); 2253 m = NULL; 2254 goto out; 2255 } 2256 } 2257 } 2258 *skip = 0; 2259 req->r_con_filling_msg = con->ops->get(con); 2260 dout("get_reply tid %lld %p\n", tid, m); 2261 2262 out: 2263 mutex_unlock(&osdc->request_mutex); 2264 return m; 2265 2266 } 2267 2268 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2269 struct ceph_msg_header *hdr, 2270 int *skip) 2271 { 2272 struct ceph_osd *osd = con->private; 2273 int type = le16_to_cpu(hdr->type); 2274 int front = le32_to_cpu(hdr->front_len); 2275 2276 *skip = 0; 2277 switch (type) { 2278 case CEPH_MSG_OSD_MAP: 2279 case CEPH_MSG_WATCH_NOTIFY: 2280 return ceph_msg_new(type, front, GFP_NOFS, false); 2281 case CEPH_MSG_OSD_OPREPLY: 2282 return get_reply(con, hdr, skip); 2283 default: 2284 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2285 osd->o_osd); 2286 *skip = 1; 2287 return NULL; 2288 } 2289 } 2290 2291 /* 2292 * Wrappers to refcount containing ceph_osd struct 2293 */ 2294 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2295 { 2296 struct ceph_osd *osd = con->private; 2297 if (get_osd(osd)) 2298 return con; 2299 return NULL; 2300 } 2301 2302 static void put_osd_con(struct ceph_connection *con) 2303 { 2304 struct ceph_osd *osd = con->private; 2305 put_osd(osd); 2306 } 2307 2308 /* 2309 * authentication 2310 */ 2311 /* 2312 * Note: returned pointer is the address of a structure that's 2313 * managed separately. Caller must *not* attempt to free it. 2314 */ 2315 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2316 int *proto, int force_new) 2317 { 2318 struct ceph_osd *o = con->private; 2319 struct ceph_osd_client *osdc = o->o_osdc; 2320 struct ceph_auth_client *ac = osdc->client->monc.auth; 2321 struct ceph_auth_handshake *auth = &o->o_auth; 2322 2323 if (force_new && auth->authorizer) { 2324 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2325 auth->authorizer = NULL; 2326 } 2327 if (!auth->authorizer) { 2328 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2329 auth); 2330 if (ret) 2331 return ERR_PTR(ret); 2332 } else { 2333 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2334 auth); 2335 if (ret) 2336 return ERR_PTR(ret); 2337 } 2338 *proto = ac->protocol; 2339 2340 return auth; 2341 } 2342 2343 2344 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2345 { 2346 struct ceph_osd *o = con->private; 2347 struct ceph_osd_client *osdc = o->o_osdc; 2348 struct ceph_auth_client *ac = osdc->client->monc.auth; 2349 2350 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2351 } 2352 2353 static int invalidate_authorizer(struct ceph_connection *con) 2354 { 2355 struct ceph_osd *o = con->private; 2356 struct ceph_osd_client *osdc = o->o_osdc; 2357 struct ceph_auth_client *ac = osdc->client->monc.auth; 2358 2359 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2360 return ceph_monc_validate_auth(&osdc->client->monc); 2361 } 2362 2363 static const struct ceph_connection_operations osd_con_ops = { 2364 .get = get_osd_con, 2365 .put = put_osd_con, 2366 .dispatch = dispatch, 2367 .get_authorizer = get_authorizer, 2368 .verify_authorizer_reply = verify_authorizer_reply, 2369 .invalidate_authorizer = invalidate_authorizer, 2370 .alloc_msg = alloc_msg, 2371 .fault = osd_reset, 2372 }; 2373