1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 26 static void __send_queued(struct ceph_osd_client *osdc); 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 static void __register_request(struct ceph_osd_client *osdc, 29 struct ceph_osd_request *req); 30 static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __send_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 35 /* 36 * Implement client access to distributed object storage cluster. 37 * 38 * All data objects are stored within a cluster/cloud of OSDs, or 39 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 40 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 41 * remote daemons serving up and coordinating consistent and safe 42 * access to storage. 43 * 44 * Cluster membership and the mapping of data objects onto storage devices 45 * are described by the osd map. 46 * 47 * We keep track of pending OSD requests (read, write), resubmit 48 * requests to different OSDs when the cluster topology/data layout 49 * change, or retry the affected requests when the communications 50 * channel with an OSD is reset. 51 */ 52 53 /* 54 * calculate the mapping of a file extent onto an object, and fill out the 55 * request accordingly. shorten extent as necessary if it crosses an 56 * object boundary. 57 * 58 * fill osd op in request message. 59 */ 60 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 61 u64 *objnum, u64 *objoff, u64 *objlen) 62 { 63 u64 orig_len = *plen; 64 int r; 65 66 /* object extent? */ 67 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 68 objoff, objlen); 69 if (r < 0) 70 return r; 71 if (*objlen < orig_len) { 72 *plen = *objlen; 73 dout(" skipping last %llu, final file extent %llu~%llu\n", 74 orig_len - *plen, off, *plen); 75 } 76 77 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 78 79 return 0; 80 } 81 82 /* 83 * requests 84 */ 85 void ceph_osdc_release_request(struct kref *kref) 86 { 87 int num_pages; 88 struct ceph_osd_request *req = container_of(kref, 89 struct ceph_osd_request, 90 r_kref); 91 92 if (req->r_request) 93 ceph_msg_put(req->r_request); 94 if (req->r_con_filling_msg) { 95 dout("%s revoking msg %p from con %p\n", __func__, 96 req->r_reply, req->r_con_filling_msg); 97 ceph_msg_revoke_incoming(req->r_reply); 98 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 99 req->r_con_filling_msg = NULL; 100 } 101 if (req->r_reply) 102 ceph_msg_put(req->r_reply); 103 104 if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES && 105 req->r_data_in.own_pages) { 106 num_pages = calc_pages_for((u64)req->r_data_in.alignment, 107 (u64)req->r_data_in.length); 108 ceph_release_page_vector(req->r_data_in.pages, num_pages); 109 } 110 if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES && 111 req->r_data_out.own_pages) { 112 num_pages = calc_pages_for((u64)req->r_data_out.alignment, 113 (u64)req->r_data_out.length); 114 ceph_release_page_vector(req->r_data_out.pages, num_pages); 115 } 116 117 ceph_put_snap_context(req->r_snapc); 118 if (req->r_mempool) 119 mempool_free(req, req->r_osdc->req_mempool); 120 else 121 kfree(req); 122 } 123 EXPORT_SYMBOL(ceph_osdc_release_request); 124 125 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 126 struct ceph_snap_context *snapc, 127 unsigned int num_ops, 128 bool use_mempool, 129 gfp_t gfp_flags) 130 { 131 struct ceph_osd_request *req; 132 struct ceph_msg *msg; 133 size_t msg_size; 134 135 msg_size = 4 + 4 + 8 + 8 + 4+8; 136 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 137 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 138 msg_size += 4 + MAX_OBJ_NAME_SIZE; 139 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 140 msg_size += 8; /* snapid */ 141 msg_size += 8; /* snap_seq */ 142 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 143 msg_size += 4; 144 145 if (use_mempool) { 146 req = mempool_alloc(osdc->req_mempool, gfp_flags); 147 memset(req, 0, sizeof(*req)); 148 } else { 149 req = kzalloc(sizeof(*req), gfp_flags); 150 } 151 if (req == NULL) 152 return NULL; 153 154 req->r_osdc = osdc; 155 req->r_mempool = use_mempool; 156 157 kref_init(&req->r_kref); 158 init_completion(&req->r_completion); 159 init_completion(&req->r_safe_completion); 160 RB_CLEAR_NODE(&req->r_node); 161 INIT_LIST_HEAD(&req->r_unsafe_item); 162 INIT_LIST_HEAD(&req->r_linger_item); 163 INIT_LIST_HEAD(&req->r_linger_osd); 164 INIT_LIST_HEAD(&req->r_req_lru_item); 165 INIT_LIST_HEAD(&req->r_osd_item); 166 167 /* create reply message */ 168 if (use_mempool) 169 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 170 else 171 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 172 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 173 if (!msg) { 174 ceph_osdc_put_request(req); 175 return NULL; 176 } 177 req->r_reply = msg; 178 179 req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE; 180 req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE; 181 182 /* create request message; allow space for oid */ 183 if (use_mempool) 184 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 185 else 186 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 187 if (!msg) { 188 ceph_osdc_put_request(req); 189 return NULL; 190 } 191 192 memset(msg->front.iov_base, 0, msg->front.iov_len); 193 194 req->r_request = msg; 195 196 return req; 197 } 198 EXPORT_SYMBOL(ceph_osdc_alloc_request); 199 200 static bool osd_req_opcode_valid(u16 opcode) 201 { 202 switch (opcode) { 203 case CEPH_OSD_OP_READ: 204 case CEPH_OSD_OP_STAT: 205 case CEPH_OSD_OP_MAPEXT: 206 case CEPH_OSD_OP_MASKTRUNC: 207 case CEPH_OSD_OP_SPARSE_READ: 208 case CEPH_OSD_OP_NOTIFY: 209 case CEPH_OSD_OP_NOTIFY_ACK: 210 case CEPH_OSD_OP_ASSERT_VER: 211 case CEPH_OSD_OP_WRITE: 212 case CEPH_OSD_OP_WRITEFULL: 213 case CEPH_OSD_OP_TRUNCATE: 214 case CEPH_OSD_OP_ZERO: 215 case CEPH_OSD_OP_DELETE: 216 case CEPH_OSD_OP_APPEND: 217 case CEPH_OSD_OP_STARTSYNC: 218 case CEPH_OSD_OP_SETTRUNC: 219 case CEPH_OSD_OP_TRIMTRUNC: 220 case CEPH_OSD_OP_TMAPUP: 221 case CEPH_OSD_OP_TMAPPUT: 222 case CEPH_OSD_OP_TMAPGET: 223 case CEPH_OSD_OP_CREATE: 224 case CEPH_OSD_OP_ROLLBACK: 225 case CEPH_OSD_OP_WATCH: 226 case CEPH_OSD_OP_OMAPGETKEYS: 227 case CEPH_OSD_OP_OMAPGETVALS: 228 case CEPH_OSD_OP_OMAPGETHEADER: 229 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 230 case CEPH_OSD_OP_OMAPSETVALS: 231 case CEPH_OSD_OP_OMAPSETHEADER: 232 case CEPH_OSD_OP_OMAPCLEAR: 233 case CEPH_OSD_OP_OMAPRMKEYS: 234 case CEPH_OSD_OP_OMAP_CMP: 235 case CEPH_OSD_OP_CLONERANGE: 236 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 237 case CEPH_OSD_OP_SRC_CMPXATTR: 238 case CEPH_OSD_OP_GETXATTR: 239 case CEPH_OSD_OP_GETXATTRS: 240 case CEPH_OSD_OP_CMPXATTR: 241 case CEPH_OSD_OP_SETXATTR: 242 case CEPH_OSD_OP_SETXATTRS: 243 case CEPH_OSD_OP_RESETXATTRS: 244 case CEPH_OSD_OP_RMXATTR: 245 case CEPH_OSD_OP_PULL: 246 case CEPH_OSD_OP_PUSH: 247 case CEPH_OSD_OP_BALANCEREADS: 248 case CEPH_OSD_OP_UNBALANCEREADS: 249 case CEPH_OSD_OP_SCRUB: 250 case CEPH_OSD_OP_SCRUB_RESERVE: 251 case CEPH_OSD_OP_SCRUB_UNRESERVE: 252 case CEPH_OSD_OP_SCRUB_STOP: 253 case CEPH_OSD_OP_SCRUB_MAP: 254 case CEPH_OSD_OP_WRLOCK: 255 case CEPH_OSD_OP_WRUNLOCK: 256 case CEPH_OSD_OP_RDLOCK: 257 case CEPH_OSD_OP_RDUNLOCK: 258 case CEPH_OSD_OP_UPLOCK: 259 case CEPH_OSD_OP_DNLOCK: 260 case CEPH_OSD_OP_CALL: 261 case CEPH_OSD_OP_PGLS: 262 case CEPH_OSD_OP_PGLS_FILTER: 263 return true; 264 default: 265 return false; 266 } 267 } 268 269 /* 270 * This is an osd op init function for opcodes that have no data or 271 * other information associated with them. It also serves as a 272 * common init routine for all the other init functions, below. 273 */ 274 void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode) 275 { 276 BUG_ON(!osd_req_opcode_valid(opcode)); 277 278 memset(op, 0, sizeof (*op)); 279 280 op->op = opcode; 281 } 282 283 void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, 284 u64 offset, u64 length, 285 u64 truncate_size, u32 truncate_seq) 286 { 287 size_t payload_len = 0; 288 289 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 290 291 osd_req_op_init(op, opcode); 292 293 op->extent.offset = offset; 294 op->extent.length = length; 295 op->extent.truncate_size = truncate_size; 296 op->extent.truncate_seq = truncate_seq; 297 if (opcode == CEPH_OSD_OP_WRITE) 298 payload_len += length; 299 300 op->payload_len = payload_len; 301 } 302 EXPORT_SYMBOL(osd_req_op_extent_init); 303 304 void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, 305 const char *class, const char *method, 306 const void *request_data, size_t request_data_size) 307 { 308 size_t payload_len = 0; 309 size_t size; 310 311 BUG_ON(opcode != CEPH_OSD_OP_CALL); 312 313 osd_req_op_init(op, opcode); 314 315 op->cls.class_name = class; 316 size = strlen(class); 317 BUG_ON(size > (size_t) U8_MAX); 318 op->cls.class_len = size; 319 payload_len += size; 320 321 op->cls.method_name = method; 322 size = strlen(method); 323 BUG_ON(size > (size_t) U8_MAX); 324 op->cls.method_len = size; 325 payload_len += size; 326 327 op->cls.indata = request_data; 328 BUG_ON(request_data_size > (size_t) U32_MAX); 329 op->cls.indata_len = (u32) request_data_size; 330 payload_len += request_data_size; 331 332 op->cls.argc = 0; /* currently unused */ 333 334 op->payload_len = payload_len; 335 } 336 EXPORT_SYMBOL(osd_req_op_cls_init); 337 338 void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, 339 u64 cookie, u64 version, int flag) 340 { 341 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 342 343 osd_req_op_init(op, opcode); 344 345 op->watch.cookie = cookie; 346 /* op->watch.ver = version; */ /* XXX 3847 */ 347 op->watch.ver = cpu_to_le64(version); 348 if (opcode == CEPH_OSD_OP_WATCH && flag) 349 op->watch.flag = (u8) 1; 350 } 351 EXPORT_SYMBOL(osd_req_op_watch_init); 352 353 static u64 osd_req_encode_op(struct ceph_osd_request *req, 354 struct ceph_osd_op *dst, 355 struct ceph_osd_req_op *src) 356 { 357 u64 out_data_len = 0; 358 struct ceph_pagelist *pagelist; 359 360 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 361 pr_err("unrecognized osd opcode %d\n", src->op); 362 363 return 0; 364 } 365 366 switch (src->op) { 367 case CEPH_OSD_OP_STAT: 368 break; 369 case CEPH_OSD_OP_READ: 370 case CEPH_OSD_OP_WRITE: 371 if (src->op == CEPH_OSD_OP_WRITE) 372 out_data_len = src->extent.length; 373 dst->extent.offset = cpu_to_le64(src->extent.offset); 374 dst->extent.length = cpu_to_le64(src->extent.length); 375 dst->extent.truncate_size = 376 cpu_to_le64(src->extent.truncate_size); 377 dst->extent.truncate_seq = 378 cpu_to_le32(src->extent.truncate_seq); 379 break; 380 case CEPH_OSD_OP_CALL: 381 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 382 BUG_ON(!pagelist); 383 ceph_pagelist_init(pagelist); 384 385 dst->cls.class_len = src->cls.class_len; 386 dst->cls.method_len = src->cls.method_len; 387 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 388 ceph_pagelist_append(pagelist, src->cls.class_name, 389 src->cls.class_len); 390 ceph_pagelist_append(pagelist, src->cls.method_name, 391 src->cls.method_len); 392 ceph_pagelist_append(pagelist, src->cls.indata, 393 src->cls.indata_len); 394 395 req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST; 396 req->r_data_out.pagelist = pagelist; 397 out_data_len = pagelist->length; 398 break; 399 case CEPH_OSD_OP_STARTSYNC: 400 break; 401 case CEPH_OSD_OP_NOTIFY_ACK: 402 case CEPH_OSD_OP_WATCH: 403 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 404 dst->watch.ver = cpu_to_le64(src->watch.ver); 405 dst->watch.flag = src->watch.flag; 406 break; 407 default: 408 pr_err("unsupported osd opcode %s\n", 409 ceph_osd_op_name(src->op)); 410 WARN_ON(1); 411 412 return 0; 413 } 414 dst->op = cpu_to_le16(src->op); 415 dst->payload_len = cpu_to_le32(src->payload_len); 416 417 return out_data_len; 418 } 419 420 /* 421 * build new request AND message 422 * 423 */ 424 void ceph_osdc_build_request(struct ceph_osd_request *req, 425 u64 off, unsigned int num_ops, 426 struct ceph_osd_req_op *src_ops, 427 struct ceph_snap_context *snapc, u64 snap_id, 428 struct timespec *mtime) 429 { 430 struct ceph_msg *msg = req->r_request; 431 struct ceph_osd_req_op *src_op; 432 void *p; 433 size_t msg_size; 434 int flags = req->r_flags; 435 u64 data_len; 436 int i; 437 438 req->r_num_ops = num_ops; 439 req->r_snapid = snap_id; 440 req->r_snapc = ceph_get_snap_context(snapc); 441 442 /* encode request */ 443 msg->hdr.version = cpu_to_le16(4); 444 445 p = msg->front.iov_base; 446 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 447 req->r_request_osdmap_epoch = p; 448 p += 4; 449 req->r_request_flags = p; 450 p += 4; 451 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 452 ceph_encode_timespec(p, mtime); 453 p += sizeof(struct ceph_timespec); 454 req->r_request_reassert_version = p; 455 p += sizeof(struct ceph_eversion); /* will get filled in */ 456 457 /* oloc */ 458 ceph_encode_8(&p, 4); 459 ceph_encode_8(&p, 4); 460 ceph_encode_32(&p, 8 + 4 + 4); 461 req->r_request_pool = p; 462 p += 8; 463 ceph_encode_32(&p, -1); /* preferred */ 464 ceph_encode_32(&p, 0); /* key len */ 465 466 ceph_encode_8(&p, 1); 467 req->r_request_pgid = p; 468 p += 8 + 4; 469 ceph_encode_32(&p, -1); /* preferred */ 470 471 /* oid */ 472 ceph_encode_32(&p, req->r_oid_len); 473 memcpy(p, req->r_oid, req->r_oid_len); 474 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 475 p += req->r_oid_len; 476 477 /* ops--can imply data */ 478 ceph_encode_16(&p, num_ops); 479 src_op = src_ops; 480 req->r_request_ops = p; 481 data_len = 0; 482 for (i = 0; i < num_ops; i++, src_op++) { 483 data_len += osd_req_encode_op(req, p, src_op); 484 p += sizeof(struct ceph_osd_op); 485 } 486 487 /* snaps */ 488 ceph_encode_64(&p, req->r_snapid); 489 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 490 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 491 if (req->r_snapc) { 492 for (i = 0; i < snapc->num_snaps; i++) { 493 ceph_encode_64(&p, req->r_snapc->snaps[i]); 494 } 495 } 496 497 req->r_request_attempts = p; 498 p += 4; 499 500 /* data */ 501 if (flags & CEPH_OSD_FLAG_WRITE) { 502 u16 data_off; 503 504 /* 505 * The header "data_off" is a hint to the receiver 506 * allowing it to align received data into its 507 * buffers such that there's no need to re-copy 508 * it before writing it to disk (direct I/O). 509 */ 510 data_off = (u16) (off & 0xffff); 511 req->r_request->hdr.data_off = cpu_to_le16(data_off); 512 } 513 req->r_request->hdr.data_len = cpu_to_le32(data_len); 514 515 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 516 msg_size = p - msg->front.iov_base; 517 msg->front.iov_len = msg_size; 518 msg->hdr.front_len = cpu_to_le32(msg_size); 519 520 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, 521 num_ops); 522 return; 523 } 524 EXPORT_SYMBOL(ceph_osdc_build_request); 525 526 /* 527 * build new request AND message, calculate layout, and adjust file 528 * extent as needed. 529 * 530 * if the file was recently truncated, we include information about its 531 * old and new size so that the object can be updated appropriately. (we 532 * avoid synchronously deleting truncated objects because it's slow.) 533 * 534 * if @do_sync, include a 'startsync' command so that the osd will flush 535 * data quickly. 536 */ 537 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 538 struct ceph_file_layout *layout, 539 struct ceph_vino vino, 540 u64 off, u64 *plen, 541 int opcode, int flags, 542 struct ceph_snap_context *snapc, 543 int do_sync, 544 u32 truncate_seq, 545 u64 truncate_size, 546 struct timespec *mtime, 547 bool use_mempool) 548 { 549 struct ceph_osd_req_op ops[2]; 550 struct ceph_osd_request *req; 551 unsigned int num_op = do_sync ? 2 : 1; 552 u64 objnum = 0; 553 u64 objoff = 0; 554 u64 objlen = 0; 555 u32 object_size; 556 u64 object_base; 557 int r; 558 559 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 560 561 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, 562 GFP_NOFS); 563 if (!req) 564 return ERR_PTR(-ENOMEM); 565 req->r_flags = flags; 566 567 /* calculate max write size */ 568 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 569 if (r < 0) { 570 ceph_osdc_put_request(req); 571 return ERR_PTR(r); 572 } 573 574 object_size = le32_to_cpu(layout->fl_object_size); 575 object_base = off - objoff; 576 if (truncate_size <= object_base) { 577 truncate_size = 0; 578 } else { 579 truncate_size -= object_base; 580 if (truncate_size > object_size) 581 truncate_size = object_size; 582 } 583 584 osd_req_op_extent_init(&ops[0], opcode, objoff, objlen, 585 truncate_size, truncate_seq); 586 if (do_sync) 587 osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC); 588 589 req->r_file_layout = *layout; /* keep a copy */ 590 591 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 592 vino.ino, objnum); 593 req->r_oid_len = strlen(req->r_oid); 594 595 ceph_osdc_build_request(req, off, num_op, ops, 596 snapc, vino.snap, mtime); 597 598 return req; 599 } 600 EXPORT_SYMBOL(ceph_osdc_new_request); 601 602 /* 603 * We keep osd requests in an rbtree, sorted by ->r_tid. 604 */ 605 static void __insert_request(struct ceph_osd_client *osdc, 606 struct ceph_osd_request *new) 607 { 608 struct rb_node **p = &osdc->requests.rb_node; 609 struct rb_node *parent = NULL; 610 struct ceph_osd_request *req = NULL; 611 612 while (*p) { 613 parent = *p; 614 req = rb_entry(parent, struct ceph_osd_request, r_node); 615 if (new->r_tid < req->r_tid) 616 p = &(*p)->rb_left; 617 else if (new->r_tid > req->r_tid) 618 p = &(*p)->rb_right; 619 else 620 BUG(); 621 } 622 623 rb_link_node(&new->r_node, parent, p); 624 rb_insert_color(&new->r_node, &osdc->requests); 625 } 626 627 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 628 u64 tid) 629 { 630 struct ceph_osd_request *req; 631 struct rb_node *n = osdc->requests.rb_node; 632 633 while (n) { 634 req = rb_entry(n, struct ceph_osd_request, r_node); 635 if (tid < req->r_tid) 636 n = n->rb_left; 637 else if (tid > req->r_tid) 638 n = n->rb_right; 639 else 640 return req; 641 } 642 return NULL; 643 } 644 645 static struct ceph_osd_request * 646 __lookup_request_ge(struct ceph_osd_client *osdc, 647 u64 tid) 648 { 649 struct ceph_osd_request *req; 650 struct rb_node *n = osdc->requests.rb_node; 651 652 while (n) { 653 req = rb_entry(n, struct ceph_osd_request, r_node); 654 if (tid < req->r_tid) { 655 if (!n->rb_left) 656 return req; 657 n = n->rb_left; 658 } else if (tid > req->r_tid) { 659 n = n->rb_right; 660 } else { 661 return req; 662 } 663 } 664 return NULL; 665 } 666 667 /* 668 * Resubmit requests pending on the given osd. 669 */ 670 static void __kick_osd_requests(struct ceph_osd_client *osdc, 671 struct ceph_osd *osd) 672 { 673 struct ceph_osd_request *req, *nreq; 674 LIST_HEAD(resend); 675 int err; 676 677 dout("__kick_osd_requests osd%d\n", osd->o_osd); 678 err = __reset_osd(osdc, osd); 679 if (err) 680 return; 681 /* 682 * Build up a list of requests to resend by traversing the 683 * osd's list of requests. Requests for a given object are 684 * sent in tid order, and that is also the order they're 685 * kept on this list. Therefore all requests that are in 686 * flight will be found first, followed by all requests that 687 * have not yet been sent. And to resend requests while 688 * preserving this order we will want to put any sent 689 * requests back on the front of the osd client's unsent 690 * list. 691 * 692 * So we build a separate ordered list of already-sent 693 * requests for the affected osd and splice it onto the 694 * front of the osd client's unsent list. Once we've seen a 695 * request that has not yet been sent we're done. Those 696 * requests are already sitting right where they belong. 697 */ 698 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 699 if (!req->r_sent) 700 break; 701 list_move_tail(&req->r_req_lru_item, &resend); 702 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 703 osd->o_osd); 704 if (!req->r_linger) 705 req->r_flags |= CEPH_OSD_FLAG_RETRY; 706 } 707 list_splice(&resend, &osdc->req_unsent); 708 709 /* 710 * Linger requests are re-registered before sending, which 711 * sets up a new tid for each. We add them to the unsent 712 * list at the end to keep things in tid order. 713 */ 714 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 715 r_linger_osd) { 716 /* 717 * reregister request prior to unregistering linger so 718 * that r_osd is preserved. 719 */ 720 BUG_ON(!list_empty(&req->r_req_lru_item)); 721 __register_request(osdc, req); 722 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); 723 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 724 __unregister_linger_request(osdc, req); 725 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 726 osd->o_osd); 727 } 728 } 729 730 /* 731 * If the osd connection drops, we need to resubmit all requests. 732 */ 733 static void osd_reset(struct ceph_connection *con) 734 { 735 struct ceph_osd *osd = con->private; 736 struct ceph_osd_client *osdc; 737 738 if (!osd) 739 return; 740 dout("osd_reset osd%d\n", osd->o_osd); 741 osdc = osd->o_osdc; 742 down_read(&osdc->map_sem); 743 mutex_lock(&osdc->request_mutex); 744 __kick_osd_requests(osdc, osd); 745 __send_queued(osdc); 746 mutex_unlock(&osdc->request_mutex); 747 up_read(&osdc->map_sem); 748 } 749 750 /* 751 * Track open sessions with osds. 752 */ 753 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 754 { 755 struct ceph_osd *osd; 756 757 osd = kzalloc(sizeof(*osd), GFP_NOFS); 758 if (!osd) 759 return NULL; 760 761 atomic_set(&osd->o_ref, 1); 762 osd->o_osdc = osdc; 763 osd->o_osd = onum; 764 RB_CLEAR_NODE(&osd->o_node); 765 INIT_LIST_HEAD(&osd->o_requests); 766 INIT_LIST_HEAD(&osd->o_linger_requests); 767 INIT_LIST_HEAD(&osd->o_osd_lru); 768 osd->o_incarnation = 1; 769 770 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 771 772 INIT_LIST_HEAD(&osd->o_keepalive_item); 773 return osd; 774 } 775 776 static struct ceph_osd *get_osd(struct ceph_osd *osd) 777 { 778 if (atomic_inc_not_zero(&osd->o_ref)) { 779 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 780 atomic_read(&osd->o_ref)); 781 return osd; 782 } else { 783 dout("get_osd %p FAIL\n", osd); 784 return NULL; 785 } 786 } 787 788 static void put_osd(struct ceph_osd *osd) 789 { 790 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 791 atomic_read(&osd->o_ref) - 1); 792 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 793 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 794 795 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 796 kfree(osd); 797 } 798 } 799 800 /* 801 * remove an osd from our map 802 */ 803 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 804 { 805 dout("__remove_osd %p\n", osd); 806 BUG_ON(!list_empty(&osd->o_requests)); 807 rb_erase(&osd->o_node, &osdc->osds); 808 list_del_init(&osd->o_osd_lru); 809 ceph_con_close(&osd->o_con); 810 put_osd(osd); 811 } 812 813 static void remove_all_osds(struct ceph_osd_client *osdc) 814 { 815 dout("%s %p\n", __func__, osdc); 816 mutex_lock(&osdc->request_mutex); 817 while (!RB_EMPTY_ROOT(&osdc->osds)) { 818 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 819 struct ceph_osd, o_node); 820 __remove_osd(osdc, osd); 821 } 822 mutex_unlock(&osdc->request_mutex); 823 } 824 825 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 826 struct ceph_osd *osd) 827 { 828 dout("__move_osd_to_lru %p\n", osd); 829 BUG_ON(!list_empty(&osd->o_osd_lru)); 830 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 831 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 832 } 833 834 static void __remove_osd_from_lru(struct ceph_osd *osd) 835 { 836 dout("__remove_osd_from_lru %p\n", osd); 837 if (!list_empty(&osd->o_osd_lru)) 838 list_del_init(&osd->o_osd_lru); 839 } 840 841 static void remove_old_osds(struct ceph_osd_client *osdc) 842 { 843 struct ceph_osd *osd, *nosd; 844 845 dout("__remove_old_osds %p\n", osdc); 846 mutex_lock(&osdc->request_mutex); 847 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 848 if (time_before(jiffies, osd->lru_ttl)) 849 break; 850 __remove_osd(osdc, osd); 851 } 852 mutex_unlock(&osdc->request_mutex); 853 } 854 855 /* 856 * reset osd connect 857 */ 858 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 859 { 860 struct ceph_entity_addr *peer_addr; 861 862 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 863 if (list_empty(&osd->o_requests) && 864 list_empty(&osd->o_linger_requests)) { 865 __remove_osd(osdc, osd); 866 867 return -ENODEV; 868 } 869 870 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 871 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 872 !ceph_con_opened(&osd->o_con)) { 873 struct ceph_osd_request *req; 874 875 dout(" osd addr hasn't changed and connection never opened," 876 " letting msgr retry"); 877 /* touch each r_stamp for handle_timeout()'s benfit */ 878 list_for_each_entry(req, &osd->o_requests, r_osd_item) 879 req->r_stamp = jiffies; 880 881 return -EAGAIN; 882 } 883 884 ceph_con_close(&osd->o_con); 885 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 886 osd->o_incarnation++; 887 888 return 0; 889 } 890 891 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 892 { 893 struct rb_node **p = &osdc->osds.rb_node; 894 struct rb_node *parent = NULL; 895 struct ceph_osd *osd = NULL; 896 897 dout("__insert_osd %p osd%d\n", new, new->o_osd); 898 while (*p) { 899 parent = *p; 900 osd = rb_entry(parent, struct ceph_osd, o_node); 901 if (new->o_osd < osd->o_osd) 902 p = &(*p)->rb_left; 903 else if (new->o_osd > osd->o_osd) 904 p = &(*p)->rb_right; 905 else 906 BUG(); 907 } 908 909 rb_link_node(&new->o_node, parent, p); 910 rb_insert_color(&new->o_node, &osdc->osds); 911 } 912 913 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 914 { 915 struct ceph_osd *osd; 916 struct rb_node *n = osdc->osds.rb_node; 917 918 while (n) { 919 osd = rb_entry(n, struct ceph_osd, o_node); 920 if (o < osd->o_osd) 921 n = n->rb_left; 922 else if (o > osd->o_osd) 923 n = n->rb_right; 924 else 925 return osd; 926 } 927 return NULL; 928 } 929 930 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 931 { 932 schedule_delayed_work(&osdc->timeout_work, 933 osdc->client->options->osd_keepalive_timeout * HZ); 934 } 935 936 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 937 { 938 cancel_delayed_work(&osdc->timeout_work); 939 } 940 941 /* 942 * Register request, assign tid. If this is the first request, set up 943 * the timeout event. 944 */ 945 static void __register_request(struct ceph_osd_client *osdc, 946 struct ceph_osd_request *req) 947 { 948 req->r_tid = ++osdc->last_tid; 949 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 950 dout("__register_request %p tid %lld\n", req, req->r_tid); 951 __insert_request(osdc, req); 952 ceph_osdc_get_request(req); 953 osdc->num_requests++; 954 if (osdc->num_requests == 1) { 955 dout(" first request, scheduling timeout\n"); 956 __schedule_osd_timeout(osdc); 957 } 958 } 959 960 /* 961 * called under osdc->request_mutex 962 */ 963 static void __unregister_request(struct ceph_osd_client *osdc, 964 struct ceph_osd_request *req) 965 { 966 if (RB_EMPTY_NODE(&req->r_node)) { 967 dout("__unregister_request %p tid %lld not registered\n", 968 req, req->r_tid); 969 return; 970 } 971 972 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 973 rb_erase(&req->r_node, &osdc->requests); 974 osdc->num_requests--; 975 976 if (req->r_osd) { 977 /* make sure the original request isn't in flight. */ 978 ceph_msg_revoke(req->r_request); 979 980 list_del_init(&req->r_osd_item); 981 if (list_empty(&req->r_osd->o_requests) && 982 list_empty(&req->r_osd->o_linger_requests)) { 983 dout("moving osd to %p lru\n", req->r_osd); 984 __move_osd_to_lru(osdc, req->r_osd); 985 } 986 if (list_empty(&req->r_linger_item)) 987 req->r_osd = NULL; 988 } 989 990 list_del_init(&req->r_req_lru_item); 991 ceph_osdc_put_request(req); 992 993 if (osdc->num_requests == 0) { 994 dout(" no requests, canceling timeout\n"); 995 __cancel_osd_timeout(osdc); 996 } 997 } 998 999 /* 1000 * Cancel a previously queued request message 1001 */ 1002 static void __cancel_request(struct ceph_osd_request *req) 1003 { 1004 if (req->r_sent && req->r_osd) { 1005 ceph_msg_revoke(req->r_request); 1006 req->r_sent = 0; 1007 } 1008 } 1009 1010 static void __register_linger_request(struct ceph_osd_client *osdc, 1011 struct ceph_osd_request *req) 1012 { 1013 dout("__register_linger_request %p\n", req); 1014 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1015 if (req->r_osd) 1016 list_add_tail(&req->r_linger_osd, 1017 &req->r_osd->o_linger_requests); 1018 } 1019 1020 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1021 struct ceph_osd_request *req) 1022 { 1023 dout("__unregister_linger_request %p\n", req); 1024 list_del_init(&req->r_linger_item); 1025 if (req->r_osd) { 1026 list_del_init(&req->r_linger_osd); 1027 1028 if (list_empty(&req->r_osd->o_requests) && 1029 list_empty(&req->r_osd->o_linger_requests)) { 1030 dout("moving osd to %p lru\n", req->r_osd); 1031 __move_osd_to_lru(osdc, req->r_osd); 1032 } 1033 if (list_empty(&req->r_osd_item)) 1034 req->r_osd = NULL; 1035 } 1036 } 1037 1038 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 1039 struct ceph_osd_request *req) 1040 { 1041 mutex_lock(&osdc->request_mutex); 1042 if (req->r_linger) { 1043 __unregister_linger_request(osdc, req); 1044 ceph_osdc_put_request(req); 1045 } 1046 mutex_unlock(&osdc->request_mutex); 1047 } 1048 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 1049 1050 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1051 struct ceph_osd_request *req) 1052 { 1053 if (!req->r_linger) { 1054 dout("set_request_linger %p\n", req); 1055 req->r_linger = 1; 1056 /* 1057 * caller is now responsible for calling 1058 * unregister_linger_request 1059 */ 1060 ceph_osdc_get_request(req); 1061 } 1062 } 1063 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1064 1065 /* 1066 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1067 * (as needed), and set the request r_osd appropriately. If there is 1068 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1069 * (unsent, homeless) or leave on in-flight lru. 1070 * 1071 * Return 0 if unchanged, 1 if changed, or negative on error. 1072 * 1073 * Caller should hold map_sem for read and request_mutex. 1074 */ 1075 static int __map_request(struct ceph_osd_client *osdc, 1076 struct ceph_osd_request *req, int force_resend) 1077 { 1078 struct ceph_pg pgid; 1079 int acting[CEPH_PG_MAX_SIZE]; 1080 int o = -1, num = 0; 1081 int err; 1082 1083 dout("map_request %p tid %lld\n", req, req->r_tid); 1084 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1085 ceph_file_layout_pg_pool(req->r_file_layout)); 1086 if (err) { 1087 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1088 return err; 1089 } 1090 req->r_pgid = pgid; 1091 1092 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 1093 if (err > 0) { 1094 o = acting[0]; 1095 num = err; 1096 } 1097 1098 if ((!force_resend && 1099 req->r_osd && req->r_osd->o_osd == o && 1100 req->r_sent >= req->r_osd->o_incarnation && 1101 req->r_num_pg_osds == num && 1102 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1103 (req->r_osd == NULL && o == -1)) 1104 return 0; /* no change */ 1105 1106 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1107 req->r_tid, pgid.pool, pgid.seed, o, 1108 req->r_osd ? req->r_osd->o_osd : -1); 1109 1110 /* record full pg acting set */ 1111 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1112 req->r_num_pg_osds = num; 1113 1114 if (req->r_osd) { 1115 __cancel_request(req); 1116 list_del_init(&req->r_osd_item); 1117 req->r_osd = NULL; 1118 } 1119 1120 req->r_osd = __lookup_osd(osdc, o); 1121 if (!req->r_osd && o >= 0) { 1122 err = -ENOMEM; 1123 req->r_osd = create_osd(osdc, o); 1124 if (!req->r_osd) { 1125 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1126 goto out; 1127 } 1128 1129 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1130 __insert_osd(osdc, req->r_osd); 1131 1132 ceph_con_open(&req->r_osd->o_con, 1133 CEPH_ENTITY_TYPE_OSD, o, 1134 &osdc->osdmap->osd_addr[o]); 1135 } 1136 1137 if (req->r_osd) { 1138 __remove_osd_from_lru(req->r_osd); 1139 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1140 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1141 } else { 1142 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1143 } 1144 err = 1; /* osd or pg changed */ 1145 1146 out: 1147 return err; 1148 } 1149 1150 /* 1151 * caller should hold map_sem (for read) and request_mutex 1152 */ 1153 static void __send_request(struct ceph_osd_client *osdc, 1154 struct ceph_osd_request *req) 1155 { 1156 void *p; 1157 1158 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1159 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1160 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1161 1162 /* fill in message content that changes each time we send it */ 1163 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1164 put_unaligned_le32(req->r_flags, req->r_request_flags); 1165 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1166 p = req->r_request_pgid; 1167 ceph_encode_64(&p, req->r_pgid.pool); 1168 ceph_encode_32(&p, req->r_pgid.seed); 1169 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1170 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1171 sizeof(req->r_reassert_version)); 1172 1173 req->r_stamp = jiffies; 1174 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1175 1176 ceph_msg_get(req->r_request); /* send consumes a ref */ 1177 ceph_con_send(&req->r_osd->o_con, req->r_request); 1178 req->r_sent = req->r_osd->o_incarnation; 1179 } 1180 1181 /* 1182 * Send any requests in the queue (req_unsent). 1183 */ 1184 static void __send_queued(struct ceph_osd_client *osdc) 1185 { 1186 struct ceph_osd_request *req, *tmp; 1187 1188 dout("__send_queued\n"); 1189 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1190 __send_request(osdc, req); 1191 } 1192 1193 /* 1194 * Timeout callback, called every N seconds when 1 or more osd 1195 * requests has been active for more than N seconds. When this 1196 * happens, we ping all OSDs with requests who have timed out to 1197 * ensure any communications channel reset is detected. Reset the 1198 * request timeouts another N seconds in the future as we go. 1199 * Reschedule the timeout event another N seconds in future (unless 1200 * there are no open requests). 1201 */ 1202 static void handle_timeout(struct work_struct *work) 1203 { 1204 struct ceph_osd_client *osdc = 1205 container_of(work, struct ceph_osd_client, timeout_work.work); 1206 struct ceph_osd_request *req; 1207 struct ceph_osd *osd; 1208 unsigned long keepalive = 1209 osdc->client->options->osd_keepalive_timeout * HZ; 1210 struct list_head slow_osds; 1211 dout("timeout\n"); 1212 down_read(&osdc->map_sem); 1213 1214 ceph_monc_request_next_osdmap(&osdc->client->monc); 1215 1216 mutex_lock(&osdc->request_mutex); 1217 1218 /* 1219 * ping osds that are a bit slow. this ensures that if there 1220 * is a break in the TCP connection we will notice, and reopen 1221 * a connection with that osd (from the fault callback). 1222 */ 1223 INIT_LIST_HEAD(&slow_osds); 1224 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1225 if (time_before(jiffies, req->r_stamp + keepalive)) 1226 break; 1227 1228 osd = req->r_osd; 1229 BUG_ON(!osd); 1230 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1231 req->r_tid, osd->o_osd); 1232 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1233 } 1234 while (!list_empty(&slow_osds)) { 1235 osd = list_entry(slow_osds.next, struct ceph_osd, 1236 o_keepalive_item); 1237 list_del_init(&osd->o_keepalive_item); 1238 ceph_con_keepalive(&osd->o_con); 1239 } 1240 1241 __schedule_osd_timeout(osdc); 1242 __send_queued(osdc); 1243 mutex_unlock(&osdc->request_mutex); 1244 up_read(&osdc->map_sem); 1245 } 1246 1247 static void handle_osds_timeout(struct work_struct *work) 1248 { 1249 struct ceph_osd_client *osdc = 1250 container_of(work, struct ceph_osd_client, 1251 osds_timeout_work.work); 1252 unsigned long delay = 1253 osdc->client->options->osd_idle_ttl * HZ >> 2; 1254 1255 dout("osds timeout\n"); 1256 down_read(&osdc->map_sem); 1257 remove_old_osds(osdc); 1258 up_read(&osdc->map_sem); 1259 1260 schedule_delayed_work(&osdc->osds_timeout_work, 1261 round_jiffies_relative(delay)); 1262 } 1263 1264 static void complete_request(struct ceph_osd_request *req) 1265 { 1266 if (req->r_safe_callback) 1267 req->r_safe_callback(req, NULL); 1268 complete_all(&req->r_safe_completion); /* fsync waiter */ 1269 } 1270 1271 /* 1272 * handle osd op reply. either call the callback if it is specified, 1273 * or do the completion to wake up the waiting thread. 1274 */ 1275 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1276 struct ceph_connection *con) 1277 { 1278 void *p, *end; 1279 struct ceph_osd_request *req; 1280 u64 tid; 1281 int object_len; 1282 int numops, payload_len, flags; 1283 s32 result; 1284 s32 retry_attempt; 1285 struct ceph_pg pg; 1286 int err; 1287 u32 reassert_epoch; 1288 u64 reassert_version; 1289 u32 osdmap_epoch; 1290 int already_completed; 1291 int i; 1292 1293 tid = le64_to_cpu(msg->hdr.tid); 1294 dout("handle_reply %p tid %llu\n", msg, tid); 1295 1296 p = msg->front.iov_base; 1297 end = p + msg->front.iov_len; 1298 1299 ceph_decode_need(&p, end, 4, bad); 1300 object_len = ceph_decode_32(&p); 1301 ceph_decode_need(&p, end, object_len, bad); 1302 p += object_len; 1303 1304 err = ceph_decode_pgid(&p, end, &pg); 1305 if (err) 1306 goto bad; 1307 1308 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1309 flags = ceph_decode_64(&p); 1310 result = ceph_decode_32(&p); 1311 reassert_epoch = ceph_decode_32(&p); 1312 reassert_version = ceph_decode_64(&p); 1313 osdmap_epoch = ceph_decode_32(&p); 1314 1315 /* lookup */ 1316 mutex_lock(&osdc->request_mutex); 1317 req = __lookup_request(osdc, tid); 1318 if (req == NULL) { 1319 dout("handle_reply tid %llu dne\n", tid); 1320 goto bad_mutex; 1321 } 1322 ceph_osdc_get_request(req); 1323 1324 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1325 req, result); 1326 1327 ceph_decode_need(&p, end, 4, bad); 1328 numops = ceph_decode_32(&p); 1329 if (numops > CEPH_OSD_MAX_OP) 1330 goto bad_put; 1331 if (numops != req->r_num_ops) 1332 goto bad_put; 1333 payload_len = 0; 1334 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); 1335 for (i = 0; i < numops; i++) { 1336 struct ceph_osd_op *op = p; 1337 int len; 1338 1339 len = le32_to_cpu(op->payload_len); 1340 req->r_reply_op_len[i] = len; 1341 dout(" op %d has %d bytes\n", i, len); 1342 payload_len += len; 1343 p += sizeof(*op); 1344 } 1345 if (payload_len != le32_to_cpu(msg->hdr.data_len)) { 1346 pr_warning("sum of op payload lens %d != data_len %d", 1347 payload_len, le32_to_cpu(msg->hdr.data_len)); 1348 goto bad_put; 1349 } 1350 1351 ceph_decode_need(&p, end, 4 + numops * 4, bad); 1352 retry_attempt = ceph_decode_32(&p); 1353 for (i = 0; i < numops; i++) 1354 req->r_reply_op_result[i] = ceph_decode_32(&p); 1355 1356 /* 1357 * if this connection filled our message, drop our reference now, to 1358 * avoid a (safe but slower) revoke later. 1359 */ 1360 if (req->r_con_filling_msg == con && req->r_reply == msg) { 1361 dout(" dropping con_filling_msg ref %p\n", con); 1362 req->r_con_filling_msg = NULL; 1363 con->ops->put(con); 1364 } 1365 1366 if (!req->r_got_reply) { 1367 unsigned int bytes; 1368 1369 req->r_result = result; 1370 bytes = le32_to_cpu(msg->hdr.data_len); 1371 dout("handle_reply result %d bytes %d\n", req->r_result, 1372 bytes); 1373 if (req->r_result == 0) 1374 req->r_result = bytes; 1375 1376 /* in case this is a write and we need to replay, */ 1377 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1378 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1379 1380 req->r_got_reply = 1; 1381 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1382 dout("handle_reply tid %llu dup ack\n", tid); 1383 mutex_unlock(&osdc->request_mutex); 1384 goto done; 1385 } 1386 1387 dout("handle_reply tid %llu flags %d\n", tid, flags); 1388 1389 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1390 __register_linger_request(osdc, req); 1391 1392 /* either this is a read, or we got the safe response */ 1393 if (result < 0 || 1394 (flags & CEPH_OSD_FLAG_ONDISK) || 1395 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1396 __unregister_request(osdc, req); 1397 1398 already_completed = req->r_completed; 1399 req->r_completed = 1; 1400 mutex_unlock(&osdc->request_mutex); 1401 if (already_completed) 1402 goto done; 1403 1404 if (req->r_callback) 1405 req->r_callback(req, msg); 1406 else 1407 complete_all(&req->r_completion); 1408 1409 if (flags & CEPH_OSD_FLAG_ONDISK) 1410 complete_request(req); 1411 1412 done: 1413 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1414 ceph_osdc_put_request(req); 1415 return; 1416 1417 bad_put: 1418 ceph_osdc_put_request(req); 1419 bad_mutex: 1420 mutex_unlock(&osdc->request_mutex); 1421 bad: 1422 pr_err("corrupt osd_op_reply got %d %d\n", 1423 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1424 ceph_msg_dump(msg); 1425 } 1426 1427 static void reset_changed_osds(struct ceph_osd_client *osdc) 1428 { 1429 struct rb_node *p, *n; 1430 1431 for (p = rb_first(&osdc->osds); p; p = n) { 1432 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1433 1434 n = rb_next(p); 1435 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1436 memcmp(&osd->o_con.peer_addr, 1437 ceph_osd_addr(osdc->osdmap, 1438 osd->o_osd), 1439 sizeof(struct ceph_entity_addr)) != 0) 1440 __reset_osd(osdc, osd); 1441 } 1442 } 1443 1444 /* 1445 * Requeue requests whose mapping to an OSD has changed. If requests map to 1446 * no osd, request a new map. 1447 * 1448 * Caller should hold map_sem for read. 1449 */ 1450 static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1451 { 1452 struct ceph_osd_request *req, *nreq; 1453 struct rb_node *p; 1454 int needmap = 0; 1455 int err; 1456 1457 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1458 mutex_lock(&osdc->request_mutex); 1459 for (p = rb_first(&osdc->requests); p; ) { 1460 req = rb_entry(p, struct ceph_osd_request, r_node); 1461 p = rb_next(p); 1462 1463 /* 1464 * For linger requests that have not yet been 1465 * registered, move them to the linger list; they'll 1466 * be sent to the osd in the loop below. Unregister 1467 * the request before re-registering it as a linger 1468 * request to ensure the __map_request() below 1469 * will decide it needs to be sent. 1470 */ 1471 if (req->r_linger && list_empty(&req->r_linger_item)) { 1472 dout("%p tid %llu restart on osd%d\n", 1473 req, req->r_tid, 1474 req->r_osd ? req->r_osd->o_osd : -1); 1475 __unregister_request(osdc, req); 1476 __register_linger_request(osdc, req); 1477 continue; 1478 } 1479 1480 err = __map_request(osdc, req, force_resend); 1481 if (err < 0) 1482 continue; /* error */ 1483 if (req->r_osd == NULL) { 1484 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1485 needmap++; /* request a newer map */ 1486 } else if (err > 0) { 1487 if (!req->r_linger) { 1488 dout("%p tid %llu requeued on osd%d\n", req, 1489 req->r_tid, 1490 req->r_osd ? req->r_osd->o_osd : -1); 1491 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1492 } 1493 } 1494 } 1495 1496 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1497 r_linger_item) { 1498 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1499 1500 err = __map_request(osdc, req, force_resend); 1501 dout("__map_request returned %d\n", err); 1502 if (err == 0) 1503 continue; /* no change and no osd was specified */ 1504 if (err < 0) 1505 continue; /* hrm! */ 1506 if (req->r_osd == NULL) { 1507 dout("tid %llu maps to no valid osd\n", req->r_tid); 1508 needmap++; /* request a newer map */ 1509 continue; 1510 } 1511 1512 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1513 req->r_osd ? req->r_osd->o_osd : -1); 1514 __register_request(osdc, req); 1515 __unregister_linger_request(osdc, req); 1516 } 1517 mutex_unlock(&osdc->request_mutex); 1518 1519 if (needmap) { 1520 dout("%d requests for down osds, need new map\n", needmap); 1521 ceph_monc_request_next_osdmap(&osdc->client->monc); 1522 } 1523 reset_changed_osds(osdc); 1524 } 1525 1526 1527 /* 1528 * Process updated osd map. 1529 * 1530 * The message contains any number of incremental and full maps, normally 1531 * indicating some sort of topology change in the cluster. Kick requests 1532 * off to different OSDs as needed. 1533 */ 1534 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1535 { 1536 void *p, *end, *next; 1537 u32 nr_maps, maplen; 1538 u32 epoch; 1539 struct ceph_osdmap *newmap = NULL, *oldmap; 1540 int err; 1541 struct ceph_fsid fsid; 1542 1543 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1544 p = msg->front.iov_base; 1545 end = p + msg->front.iov_len; 1546 1547 /* verify fsid */ 1548 ceph_decode_need(&p, end, sizeof(fsid), bad); 1549 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1550 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1551 return; 1552 1553 down_write(&osdc->map_sem); 1554 1555 /* incremental maps */ 1556 ceph_decode_32_safe(&p, end, nr_maps, bad); 1557 dout(" %d inc maps\n", nr_maps); 1558 while (nr_maps > 0) { 1559 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1560 epoch = ceph_decode_32(&p); 1561 maplen = ceph_decode_32(&p); 1562 ceph_decode_need(&p, end, maplen, bad); 1563 next = p + maplen; 1564 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1565 dout("applying incremental map %u len %d\n", 1566 epoch, maplen); 1567 newmap = osdmap_apply_incremental(&p, next, 1568 osdc->osdmap, 1569 &osdc->client->msgr); 1570 if (IS_ERR(newmap)) { 1571 err = PTR_ERR(newmap); 1572 goto bad; 1573 } 1574 BUG_ON(!newmap); 1575 if (newmap != osdc->osdmap) { 1576 ceph_osdmap_destroy(osdc->osdmap); 1577 osdc->osdmap = newmap; 1578 } 1579 kick_requests(osdc, 0); 1580 } else { 1581 dout("ignoring incremental map %u len %d\n", 1582 epoch, maplen); 1583 } 1584 p = next; 1585 nr_maps--; 1586 } 1587 if (newmap) 1588 goto done; 1589 1590 /* full maps */ 1591 ceph_decode_32_safe(&p, end, nr_maps, bad); 1592 dout(" %d full maps\n", nr_maps); 1593 while (nr_maps) { 1594 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1595 epoch = ceph_decode_32(&p); 1596 maplen = ceph_decode_32(&p); 1597 ceph_decode_need(&p, end, maplen, bad); 1598 if (nr_maps > 1) { 1599 dout("skipping non-latest full map %u len %d\n", 1600 epoch, maplen); 1601 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1602 dout("skipping full map %u len %d, " 1603 "older than our %u\n", epoch, maplen, 1604 osdc->osdmap->epoch); 1605 } else { 1606 int skipped_map = 0; 1607 1608 dout("taking full map %u len %d\n", epoch, maplen); 1609 newmap = osdmap_decode(&p, p+maplen); 1610 if (IS_ERR(newmap)) { 1611 err = PTR_ERR(newmap); 1612 goto bad; 1613 } 1614 BUG_ON(!newmap); 1615 oldmap = osdc->osdmap; 1616 osdc->osdmap = newmap; 1617 if (oldmap) { 1618 if (oldmap->epoch + 1 < newmap->epoch) 1619 skipped_map = 1; 1620 ceph_osdmap_destroy(oldmap); 1621 } 1622 kick_requests(osdc, skipped_map); 1623 } 1624 p += maplen; 1625 nr_maps--; 1626 } 1627 1628 done: 1629 downgrade_write(&osdc->map_sem); 1630 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1631 1632 /* 1633 * subscribe to subsequent osdmap updates if full to ensure 1634 * we find out when we are no longer full and stop returning 1635 * ENOSPC. 1636 */ 1637 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1638 ceph_monc_request_next_osdmap(&osdc->client->monc); 1639 1640 mutex_lock(&osdc->request_mutex); 1641 __send_queued(osdc); 1642 mutex_unlock(&osdc->request_mutex); 1643 up_read(&osdc->map_sem); 1644 wake_up_all(&osdc->client->auth_wq); 1645 return; 1646 1647 bad: 1648 pr_err("osdc handle_map corrupt msg\n"); 1649 ceph_msg_dump(msg); 1650 up_write(&osdc->map_sem); 1651 return; 1652 } 1653 1654 /* 1655 * watch/notify callback event infrastructure 1656 * 1657 * These callbacks are used both for watch and notify operations. 1658 */ 1659 static void __release_event(struct kref *kref) 1660 { 1661 struct ceph_osd_event *event = 1662 container_of(kref, struct ceph_osd_event, kref); 1663 1664 dout("__release_event %p\n", event); 1665 kfree(event); 1666 } 1667 1668 static void get_event(struct ceph_osd_event *event) 1669 { 1670 kref_get(&event->kref); 1671 } 1672 1673 void ceph_osdc_put_event(struct ceph_osd_event *event) 1674 { 1675 kref_put(&event->kref, __release_event); 1676 } 1677 EXPORT_SYMBOL(ceph_osdc_put_event); 1678 1679 static void __insert_event(struct ceph_osd_client *osdc, 1680 struct ceph_osd_event *new) 1681 { 1682 struct rb_node **p = &osdc->event_tree.rb_node; 1683 struct rb_node *parent = NULL; 1684 struct ceph_osd_event *event = NULL; 1685 1686 while (*p) { 1687 parent = *p; 1688 event = rb_entry(parent, struct ceph_osd_event, node); 1689 if (new->cookie < event->cookie) 1690 p = &(*p)->rb_left; 1691 else if (new->cookie > event->cookie) 1692 p = &(*p)->rb_right; 1693 else 1694 BUG(); 1695 } 1696 1697 rb_link_node(&new->node, parent, p); 1698 rb_insert_color(&new->node, &osdc->event_tree); 1699 } 1700 1701 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1702 u64 cookie) 1703 { 1704 struct rb_node **p = &osdc->event_tree.rb_node; 1705 struct rb_node *parent = NULL; 1706 struct ceph_osd_event *event = NULL; 1707 1708 while (*p) { 1709 parent = *p; 1710 event = rb_entry(parent, struct ceph_osd_event, node); 1711 if (cookie < event->cookie) 1712 p = &(*p)->rb_left; 1713 else if (cookie > event->cookie) 1714 p = &(*p)->rb_right; 1715 else 1716 return event; 1717 } 1718 return NULL; 1719 } 1720 1721 static void __remove_event(struct ceph_osd_event *event) 1722 { 1723 struct ceph_osd_client *osdc = event->osdc; 1724 1725 if (!RB_EMPTY_NODE(&event->node)) { 1726 dout("__remove_event removed %p\n", event); 1727 rb_erase(&event->node, &osdc->event_tree); 1728 ceph_osdc_put_event(event); 1729 } else { 1730 dout("__remove_event didn't remove %p\n", event); 1731 } 1732 } 1733 1734 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1735 void (*event_cb)(u64, u64, u8, void *), 1736 void *data, struct ceph_osd_event **pevent) 1737 { 1738 struct ceph_osd_event *event; 1739 1740 event = kmalloc(sizeof(*event), GFP_NOIO); 1741 if (!event) 1742 return -ENOMEM; 1743 1744 dout("create_event %p\n", event); 1745 event->cb = event_cb; 1746 event->one_shot = 0; 1747 event->data = data; 1748 event->osdc = osdc; 1749 INIT_LIST_HEAD(&event->osd_node); 1750 RB_CLEAR_NODE(&event->node); 1751 kref_init(&event->kref); /* one ref for us */ 1752 kref_get(&event->kref); /* one ref for the caller */ 1753 1754 spin_lock(&osdc->event_lock); 1755 event->cookie = ++osdc->event_count; 1756 __insert_event(osdc, event); 1757 spin_unlock(&osdc->event_lock); 1758 1759 *pevent = event; 1760 return 0; 1761 } 1762 EXPORT_SYMBOL(ceph_osdc_create_event); 1763 1764 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1765 { 1766 struct ceph_osd_client *osdc = event->osdc; 1767 1768 dout("cancel_event %p\n", event); 1769 spin_lock(&osdc->event_lock); 1770 __remove_event(event); 1771 spin_unlock(&osdc->event_lock); 1772 ceph_osdc_put_event(event); /* caller's */ 1773 } 1774 EXPORT_SYMBOL(ceph_osdc_cancel_event); 1775 1776 1777 static void do_event_work(struct work_struct *work) 1778 { 1779 struct ceph_osd_event_work *event_work = 1780 container_of(work, struct ceph_osd_event_work, work); 1781 struct ceph_osd_event *event = event_work->event; 1782 u64 ver = event_work->ver; 1783 u64 notify_id = event_work->notify_id; 1784 u8 opcode = event_work->opcode; 1785 1786 dout("do_event_work completing %p\n", event); 1787 event->cb(ver, notify_id, opcode, event->data); 1788 dout("do_event_work completed %p\n", event); 1789 ceph_osdc_put_event(event); 1790 kfree(event_work); 1791 } 1792 1793 1794 /* 1795 * Process osd watch notifications 1796 */ 1797 static void handle_watch_notify(struct ceph_osd_client *osdc, 1798 struct ceph_msg *msg) 1799 { 1800 void *p, *end; 1801 u8 proto_ver; 1802 u64 cookie, ver, notify_id; 1803 u8 opcode; 1804 struct ceph_osd_event *event; 1805 struct ceph_osd_event_work *event_work; 1806 1807 p = msg->front.iov_base; 1808 end = p + msg->front.iov_len; 1809 1810 ceph_decode_8_safe(&p, end, proto_ver, bad); 1811 ceph_decode_8_safe(&p, end, opcode, bad); 1812 ceph_decode_64_safe(&p, end, cookie, bad); 1813 ceph_decode_64_safe(&p, end, ver, bad); 1814 ceph_decode_64_safe(&p, end, notify_id, bad); 1815 1816 spin_lock(&osdc->event_lock); 1817 event = __find_event(osdc, cookie); 1818 if (event) { 1819 BUG_ON(event->one_shot); 1820 get_event(event); 1821 } 1822 spin_unlock(&osdc->event_lock); 1823 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1824 cookie, ver, event); 1825 if (event) { 1826 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1827 if (!event_work) { 1828 dout("ERROR: could not allocate event_work\n"); 1829 goto done_err; 1830 } 1831 INIT_WORK(&event_work->work, do_event_work); 1832 event_work->event = event; 1833 event_work->ver = ver; 1834 event_work->notify_id = notify_id; 1835 event_work->opcode = opcode; 1836 if (!queue_work(osdc->notify_wq, &event_work->work)) { 1837 dout("WARNING: failed to queue notify event work\n"); 1838 goto done_err; 1839 } 1840 } 1841 1842 return; 1843 1844 done_err: 1845 ceph_osdc_put_event(event); 1846 return; 1847 1848 bad: 1849 pr_err("osdc handle_watch_notify corrupt msg\n"); 1850 return; 1851 } 1852 1853 static void ceph_osdc_msg_data_set(struct ceph_msg *msg, 1854 struct ceph_osd_data *osd_data) 1855 { 1856 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 1857 BUG_ON(osd_data->length > (u64) SIZE_MAX); 1858 if (osd_data->length) 1859 ceph_msg_data_set_pages(msg, osd_data->pages, 1860 osd_data->length, osd_data->alignment); 1861 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 1862 BUG_ON(!osd_data->pagelist->length); 1863 ceph_msg_data_set_pagelist(msg, osd_data->pagelist); 1864 #ifdef CONFIG_BLOCK 1865 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 1866 ceph_msg_data_set_bio(msg, osd_data->bio); 1867 #endif 1868 } else { 1869 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 1870 } 1871 } 1872 1873 /* 1874 * Register request, send initial attempt. 1875 */ 1876 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1877 struct ceph_osd_request *req, 1878 bool nofail) 1879 { 1880 int rc = 0; 1881 1882 /* Set up response incoming data and request outgoing data fields */ 1883 1884 ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in); 1885 ceph_osdc_msg_data_set(req->r_request, &req->r_data_out); 1886 1887 down_read(&osdc->map_sem); 1888 mutex_lock(&osdc->request_mutex); 1889 __register_request(osdc, req); 1890 WARN_ON(req->r_sent); 1891 rc = __map_request(osdc, req, 0); 1892 if (rc < 0) { 1893 if (nofail) { 1894 dout("osdc_start_request failed map, " 1895 " will retry %lld\n", req->r_tid); 1896 rc = 0; 1897 } 1898 goto out_unlock; 1899 } 1900 if (req->r_osd == NULL) { 1901 dout("send_request %p no up osds in pg\n", req); 1902 ceph_monc_request_next_osdmap(&osdc->client->monc); 1903 } else { 1904 __send_queued(osdc); 1905 } 1906 rc = 0; 1907 out_unlock: 1908 mutex_unlock(&osdc->request_mutex); 1909 up_read(&osdc->map_sem); 1910 return rc; 1911 } 1912 EXPORT_SYMBOL(ceph_osdc_start_request); 1913 1914 /* 1915 * wait for a request to complete 1916 */ 1917 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1918 struct ceph_osd_request *req) 1919 { 1920 int rc; 1921 1922 rc = wait_for_completion_interruptible(&req->r_completion); 1923 if (rc < 0) { 1924 mutex_lock(&osdc->request_mutex); 1925 __cancel_request(req); 1926 __unregister_request(osdc, req); 1927 mutex_unlock(&osdc->request_mutex); 1928 complete_request(req); 1929 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1930 return rc; 1931 } 1932 1933 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1934 return req->r_result; 1935 } 1936 EXPORT_SYMBOL(ceph_osdc_wait_request); 1937 1938 /* 1939 * sync - wait for all in-flight requests to flush. avoid starvation. 1940 */ 1941 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1942 { 1943 struct ceph_osd_request *req; 1944 u64 last_tid, next_tid = 0; 1945 1946 mutex_lock(&osdc->request_mutex); 1947 last_tid = osdc->last_tid; 1948 while (1) { 1949 req = __lookup_request_ge(osdc, next_tid); 1950 if (!req) 1951 break; 1952 if (req->r_tid > last_tid) 1953 break; 1954 1955 next_tid = req->r_tid + 1; 1956 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1957 continue; 1958 1959 ceph_osdc_get_request(req); 1960 mutex_unlock(&osdc->request_mutex); 1961 dout("sync waiting on tid %llu (last is %llu)\n", 1962 req->r_tid, last_tid); 1963 wait_for_completion(&req->r_safe_completion); 1964 mutex_lock(&osdc->request_mutex); 1965 ceph_osdc_put_request(req); 1966 } 1967 mutex_unlock(&osdc->request_mutex); 1968 dout("sync done (thru tid %llu)\n", last_tid); 1969 } 1970 EXPORT_SYMBOL(ceph_osdc_sync); 1971 1972 /* 1973 * init, shutdown 1974 */ 1975 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1976 { 1977 int err; 1978 1979 dout("init\n"); 1980 osdc->client = client; 1981 osdc->osdmap = NULL; 1982 init_rwsem(&osdc->map_sem); 1983 init_completion(&osdc->map_waiters); 1984 osdc->last_requested_map = 0; 1985 mutex_init(&osdc->request_mutex); 1986 osdc->last_tid = 0; 1987 osdc->osds = RB_ROOT; 1988 INIT_LIST_HEAD(&osdc->osd_lru); 1989 osdc->requests = RB_ROOT; 1990 INIT_LIST_HEAD(&osdc->req_lru); 1991 INIT_LIST_HEAD(&osdc->req_unsent); 1992 INIT_LIST_HEAD(&osdc->req_notarget); 1993 INIT_LIST_HEAD(&osdc->req_linger); 1994 osdc->num_requests = 0; 1995 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1996 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1997 spin_lock_init(&osdc->event_lock); 1998 osdc->event_tree = RB_ROOT; 1999 osdc->event_count = 0; 2000 2001 schedule_delayed_work(&osdc->osds_timeout_work, 2002 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 2003 2004 err = -ENOMEM; 2005 osdc->req_mempool = mempool_create_kmalloc_pool(10, 2006 sizeof(struct ceph_osd_request)); 2007 if (!osdc->req_mempool) 2008 goto out; 2009 2010 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2011 OSD_OP_FRONT_LEN, 10, true, 2012 "osd_op"); 2013 if (err < 0) 2014 goto out_mempool; 2015 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2016 OSD_OPREPLY_FRONT_LEN, 10, true, 2017 "osd_op_reply"); 2018 if (err < 0) 2019 goto out_msgpool; 2020 2021 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2022 if (IS_ERR(osdc->notify_wq)) { 2023 err = PTR_ERR(osdc->notify_wq); 2024 osdc->notify_wq = NULL; 2025 goto out_msgpool; 2026 } 2027 return 0; 2028 2029 out_msgpool: 2030 ceph_msgpool_destroy(&osdc->msgpool_op); 2031 out_mempool: 2032 mempool_destroy(osdc->req_mempool); 2033 out: 2034 return err; 2035 } 2036 2037 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2038 { 2039 flush_workqueue(osdc->notify_wq); 2040 destroy_workqueue(osdc->notify_wq); 2041 cancel_delayed_work_sync(&osdc->timeout_work); 2042 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2043 if (osdc->osdmap) { 2044 ceph_osdmap_destroy(osdc->osdmap); 2045 osdc->osdmap = NULL; 2046 } 2047 remove_all_osds(osdc); 2048 mempool_destroy(osdc->req_mempool); 2049 ceph_msgpool_destroy(&osdc->msgpool_op); 2050 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2051 } 2052 2053 /* 2054 * Read some contiguous pages. If we cross a stripe boundary, shorten 2055 * *plen. Return number of bytes read, or error. 2056 */ 2057 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2058 struct ceph_vino vino, struct ceph_file_layout *layout, 2059 u64 off, u64 *plen, 2060 u32 truncate_seq, u64 truncate_size, 2061 struct page **pages, int num_pages, int page_align) 2062 { 2063 struct ceph_osd_request *req; 2064 struct ceph_osd_data *osd_data; 2065 int rc = 0; 2066 2067 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2068 vino.snap, off, *plen); 2069 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 2070 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2071 NULL, 0, truncate_seq, truncate_size, NULL, 2072 false); 2073 if (IS_ERR(req)) 2074 return PTR_ERR(req); 2075 2076 /* it may be a short read due to an object boundary */ 2077 2078 osd_data = &req->r_data_in; 2079 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2080 osd_data->pages = pages; 2081 osd_data->length = *plen; 2082 osd_data->alignment = page_align; 2083 2084 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2085 off, *plen, osd_data->length, page_align); 2086 2087 rc = ceph_osdc_start_request(osdc, req, false); 2088 if (!rc) 2089 rc = ceph_osdc_wait_request(osdc, req); 2090 2091 ceph_osdc_put_request(req); 2092 dout("readpages result %d\n", rc); 2093 return rc; 2094 } 2095 EXPORT_SYMBOL(ceph_osdc_readpages); 2096 2097 /* 2098 * do a synchronous write on N pages 2099 */ 2100 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2101 struct ceph_file_layout *layout, 2102 struct ceph_snap_context *snapc, 2103 u64 off, u64 len, 2104 u32 truncate_seq, u64 truncate_size, 2105 struct timespec *mtime, 2106 struct page **pages, int num_pages) 2107 { 2108 struct ceph_osd_request *req; 2109 struct ceph_osd_data *osd_data; 2110 int rc = 0; 2111 int page_align = off & ~PAGE_MASK; 2112 2113 BUG_ON(vino.snap != CEPH_NOSNAP); 2114 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 2115 CEPH_OSD_OP_WRITE, 2116 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2117 snapc, 0, 2118 truncate_seq, truncate_size, mtime, 2119 true); 2120 if (IS_ERR(req)) 2121 return PTR_ERR(req); 2122 2123 /* it may be a short write due to an object boundary */ 2124 osd_data = &req->r_data_out; 2125 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2126 osd_data->pages = pages; 2127 osd_data->length = len; 2128 osd_data->alignment = page_align; 2129 dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length); 2130 2131 rc = ceph_osdc_start_request(osdc, req, true); 2132 if (!rc) 2133 rc = ceph_osdc_wait_request(osdc, req); 2134 2135 ceph_osdc_put_request(req); 2136 if (rc == 0) 2137 rc = len; 2138 dout("writepages result %d\n", rc); 2139 return rc; 2140 } 2141 EXPORT_SYMBOL(ceph_osdc_writepages); 2142 2143 /* 2144 * handle incoming message 2145 */ 2146 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2147 { 2148 struct ceph_osd *osd = con->private; 2149 struct ceph_osd_client *osdc; 2150 int type = le16_to_cpu(msg->hdr.type); 2151 2152 if (!osd) 2153 goto out; 2154 osdc = osd->o_osdc; 2155 2156 switch (type) { 2157 case CEPH_MSG_OSD_MAP: 2158 ceph_osdc_handle_map(osdc, msg); 2159 break; 2160 case CEPH_MSG_OSD_OPREPLY: 2161 handle_reply(osdc, msg, con); 2162 break; 2163 case CEPH_MSG_WATCH_NOTIFY: 2164 handle_watch_notify(osdc, msg); 2165 break; 2166 2167 default: 2168 pr_err("received unknown message type %d %s\n", type, 2169 ceph_msg_type_name(type)); 2170 } 2171 out: 2172 ceph_msg_put(msg); 2173 } 2174 2175 /* 2176 * lookup and return message for incoming reply. set up reply message 2177 * pages. 2178 */ 2179 static struct ceph_msg *get_reply(struct ceph_connection *con, 2180 struct ceph_msg_header *hdr, 2181 int *skip) 2182 { 2183 struct ceph_osd *osd = con->private; 2184 struct ceph_osd_client *osdc = osd->o_osdc; 2185 struct ceph_msg *m; 2186 struct ceph_osd_request *req; 2187 int front = le32_to_cpu(hdr->front_len); 2188 int data_len = le32_to_cpu(hdr->data_len); 2189 u64 tid; 2190 2191 tid = le64_to_cpu(hdr->tid); 2192 mutex_lock(&osdc->request_mutex); 2193 req = __lookup_request(osdc, tid); 2194 if (!req) { 2195 *skip = 1; 2196 m = NULL; 2197 dout("get_reply unknown tid %llu from osd%d\n", tid, 2198 osd->o_osd); 2199 goto out; 2200 } 2201 2202 if (req->r_con_filling_msg) { 2203 dout("%s revoking msg %p from old con %p\n", __func__, 2204 req->r_reply, req->r_con_filling_msg); 2205 ceph_msg_revoke_incoming(req->r_reply); 2206 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 2207 req->r_con_filling_msg = NULL; 2208 } 2209 2210 if (front > req->r_reply->front.iov_len) { 2211 pr_warning("get_reply front %d > preallocated %d\n", 2212 front, (int)req->r_reply->front.iov_len); 2213 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2214 if (!m) 2215 goto out; 2216 ceph_msg_put(req->r_reply); 2217 req->r_reply = m; 2218 } 2219 m = ceph_msg_get(req->r_reply); 2220 2221 if (data_len > 0) { 2222 struct ceph_osd_data *osd_data = &req->r_data_in; 2223 2224 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2225 if (osd_data->pages && 2226 unlikely(osd_data->length < data_len)) { 2227 2228 pr_warning("tid %lld reply has %d bytes " 2229 "we had only %llu bytes ready\n", 2230 tid, data_len, osd_data->length); 2231 *skip = 1; 2232 ceph_msg_put(m); 2233 m = NULL; 2234 goto out; 2235 } 2236 } 2237 } 2238 *skip = 0; 2239 req->r_con_filling_msg = con->ops->get(con); 2240 dout("get_reply tid %lld %p\n", tid, m); 2241 2242 out: 2243 mutex_unlock(&osdc->request_mutex); 2244 return m; 2245 2246 } 2247 2248 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2249 struct ceph_msg_header *hdr, 2250 int *skip) 2251 { 2252 struct ceph_osd *osd = con->private; 2253 int type = le16_to_cpu(hdr->type); 2254 int front = le32_to_cpu(hdr->front_len); 2255 2256 *skip = 0; 2257 switch (type) { 2258 case CEPH_MSG_OSD_MAP: 2259 case CEPH_MSG_WATCH_NOTIFY: 2260 return ceph_msg_new(type, front, GFP_NOFS, false); 2261 case CEPH_MSG_OSD_OPREPLY: 2262 return get_reply(con, hdr, skip); 2263 default: 2264 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2265 osd->o_osd); 2266 *skip = 1; 2267 return NULL; 2268 } 2269 } 2270 2271 /* 2272 * Wrappers to refcount containing ceph_osd struct 2273 */ 2274 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2275 { 2276 struct ceph_osd *osd = con->private; 2277 if (get_osd(osd)) 2278 return con; 2279 return NULL; 2280 } 2281 2282 static void put_osd_con(struct ceph_connection *con) 2283 { 2284 struct ceph_osd *osd = con->private; 2285 put_osd(osd); 2286 } 2287 2288 /* 2289 * authentication 2290 */ 2291 /* 2292 * Note: returned pointer is the address of a structure that's 2293 * managed separately. Caller must *not* attempt to free it. 2294 */ 2295 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2296 int *proto, int force_new) 2297 { 2298 struct ceph_osd *o = con->private; 2299 struct ceph_osd_client *osdc = o->o_osdc; 2300 struct ceph_auth_client *ac = osdc->client->monc.auth; 2301 struct ceph_auth_handshake *auth = &o->o_auth; 2302 2303 if (force_new && auth->authorizer) { 2304 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2305 auth->authorizer = NULL; 2306 } 2307 if (!auth->authorizer) { 2308 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2309 auth); 2310 if (ret) 2311 return ERR_PTR(ret); 2312 } else { 2313 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2314 auth); 2315 if (ret) 2316 return ERR_PTR(ret); 2317 } 2318 *proto = ac->protocol; 2319 2320 return auth; 2321 } 2322 2323 2324 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2325 { 2326 struct ceph_osd *o = con->private; 2327 struct ceph_osd_client *osdc = o->o_osdc; 2328 struct ceph_auth_client *ac = osdc->client->monc.auth; 2329 2330 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2331 } 2332 2333 static int invalidate_authorizer(struct ceph_connection *con) 2334 { 2335 struct ceph_osd *o = con->private; 2336 struct ceph_osd_client *osdc = o->o_osdc; 2337 struct ceph_auth_client *ac = osdc->client->monc.auth; 2338 2339 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2340 return ceph_monc_validate_auth(&osdc->client->monc); 2341 } 2342 2343 static const struct ceph_connection_operations osd_con_ops = { 2344 .get = get_osd_con, 2345 .put = put_osd_con, 2346 .dispatch = dispatch, 2347 .get_authorizer = get_authorizer, 2348 .verify_authorizer_reply = verify_authorizer_reply, 2349 .invalidate_authorizer = invalidate_authorizer, 2350 .alloc_msg = alloc_msg, 2351 .fault = osd_reset, 2352 }; 2353