1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 static int __kick_requests(struct ceph_osd_client *osdc, 26 struct ceph_osd *kickosd); 27 28 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 29 30 static int op_needs_trail(int op) 31 { 32 switch (op) { 33 case CEPH_OSD_OP_GETXATTR: 34 case CEPH_OSD_OP_SETXATTR: 35 case CEPH_OSD_OP_CMPXATTR: 36 case CEPH_OSD_OP_CALL: 37 return 1; 38 default: 39 return 0; 40 } 41 } 42 43 static int op_has_extent(int op) 44 { 45 return (op == CEPH_OSD_OP_READ || 46 op == CEPH_OSD_OP_WRITE); 47 } 48 49 void ceph_calc_raw_layout(struct ceph_osd_client *osdc, 50 struct ceph_file_layout *layout, 51 u64 snapid, 52 u64 off, u64 *plen, u64 *bno, 53 struct ceph_osd_request *req, 54 struct ceph_osd_req_op *op) 55 { 56 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 57 u64 orig_len = *plen; 58 u64 objoff, objlen; /* extent in object */ 59 60 reqhead->snapid = cpu_to_le64(snapid); 61 62 /* object extent? */ 63 ceph_calc_file_object_mapping(layout, off, plen, bno, 64 &objoff, &objlen); 65 if (*plen < orig_len) 66 dout(" skipping last %llu, final file extent %llu~%llu\n", 67 orig_len - *plen, off, *plen); 68 69 if (op_has_extent(op->op)) { 70 op->extent.offset = objoff; 71 op->extent.length = objlen; 72 } 73 req->r_num_pages = calc_pages_for(off, *plen); 74 req->r_page_alignment = off & ~PAGE_MASK; 75 if (op->op == CEPH_OSD_OP_WRITE) 76 op->payload_len = *plen; 77 78 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 79 *bno, objoff, objlen, req->r_num_pages); 80 81 } 82 EXPORT_SYMBOL(ceph_calc_raw_layout); 83 84 /* 85 * Implement client access to distributed object storage cluster. 86 * 87 * All data objects are stored within a cluster/cloud of OSDs, or 88 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 89 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 90 * remote daemons serving up and coordinating consistent and safe 91 * access to storage. 92 * 93 * Cluster membership and the mapping of data objects onto storage devices 94 * are described by the osd map. 95 * 96 * We keep track of pending OSD requests (read, write), resubmit 97 * requests to different OSDs when the cluster topology/data layout 98 * change, or retry the affected requests when the communications 99 * channel with an OSD is reset. 100 */ 101 102 /* 103 * calculate the mapping of a file extent onto an object, and fill out the 104 * request accordingly. shorten extent as necessary if it crosses an 105 * object boundary. 106 * 107 * fill osd op in request message. 108 */ 109 static void calc_layout(struct ceph_osd_client *osdc, 110 struct ceph_vino vino, 111 struct ceph_file_layout *layout, 112 u64 off, u64 *plen, 113 struct ceph_osd_request *req, 114 struct ceph_osd_req_op *op) 115 { 116 u64 bno; 117 118 ceph_calc_raw_layout(osdc, layout, vino.snap, off, 119 plen, &bno, req, op); 120 121 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); 122 req->r_oid_len = strlen(req->r_oid); 123 } 124 125 /* 126 * requests 127 */ 128 void ceph_osdc_release_request(struct kref *kref) 129 { 130 struct ceph_osd_request *req = container_of(kref, 131 struct ceph_osd_request, 132 r_kref); 133 134 if (req->r_request) 135 ceph_msg_put(req->r_request); 136 if (req->r_reply) 137 ceph_msg_put(req->r_reply); 138 if (req->r_con_filling_msg) { 139 dout("release_request revoking pages %p from con %p\n", 140 req->r_pages, req->r_con_filling_msg); 141 ceph_con_revoke_message(req->r_con_filling_msg, 142 req->r_reply); 143 ceph_con_put(req->r_con_filling_msg); 144 } 145 if (req->r_own_pages) 146 ceph_release_page_vector(req->r_pages, 147 req->r_num_pages); 148 #ifdef CONFIG_BLOCK 149 if (req->r_bio) 150 bio_put(req->r_bio); 151 #endif 152 ceph_put_snap_context(req->r_snapc); 153 if (req->r_trail) { 154 ceph_pagelist_release(req->r_trail); 155 kfree(req->r_trail); 156 } 157 if (req->r_mempool) 158 mempool_free(req, req->r_osdc->req_mempool); 159 else 160 kfree(req); 161 } 162 EXPORT_SYMBOL(ceph_osdc_release_request); 163 164 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail) 165 { 166 int i = 0; 167 168 if (needs_trail) 169 *needs_trail = 0; 170 while (ops[i].op) { 171 if (needs_trail && op_needs_trail(ops[i].op)) 172 *needs_trail = 1; 173 i++; 174 } 175 176 return i; 177 } 178 179 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 180 int flags, 181 struct ceph_snap_context *snapc, 182 struct ceph_osd_req_op *ops, 183 bool use_mempool, 184 gfp_t gfp_flags, 185 struct page **pages, 186 struct bio *bio) 187 { 188 struct ceph_osd_request *req; 189 struct ceph_msg *msg; 190 int needs_trail; 191 int num_op = get_num_ops(ops, &needs_trail); 192 size_t msg_size = sizeof(struct ceph_osd_request_head); 193 194 msg_size += num_op*sizeof(struct ceph_osd_op); 195 196 if (use_mempool) { 197 req = mempool_alloc(osdc->req_mempool, gfp_flags); 198 memset(req, 0, sizeof(*req)); 199 } else { 200 req = kzalloc(sizeof(*req), gfp_flags); 201 } 202 if (req == NULL) 203 return NULL; 204 205 req->r_osdc = osdc; 206 req->r_mempool = use_mempool; 207 208 kref_init(&req->r_kref); 209 init_completion(&req->r_completion); 210 init_completion(&req->r_safe_completion); 211 INIT_LIST_HEAD(&req->r_unsafe_item); 212 req->r_flags = flags; 213 214 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 215 216 /* create reply message */ 217 if (use_mempool) 218 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 219 else 220 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 221 OSD_OPREPLY_FRONT_LEN, gfp_flags); 222 if (!msg) { 223 ceph_osdc_put_request(req); 224 return NULL; 225 } 226 req->r_reply = msg; 227 228 /* allocate space for the trailing data */ 229 if (needs_trail) { 230 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags); 231 if (!req->r_trail) { 232 ceph_osdc_put_request(req); 233 return NULL; 234 } 235 ceph_pagelist_init(req->r_trail); 236 } 237 /* create request message; allow space for oid */ 238 msg_size += 40; 239 if (snapc) 240 msg_size += sizeof(u64) * snapc->num_snaps; 241 if (use_mempool) 242 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 243 else 244 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags); 245 if (!msg) { 246 ceph_osdc_put_request(req); 247 return NULL; 248 } 249 250 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 251 memset(msg->front.iov_base, 0, msg->front.iov_len); 252 253 req->r_request = msg; 254 req->r_pages = pages; 255 #ifdef CONFIG_BLOCK 256 if (bio) { 257 req->r_bio = bio; 258 bio_get(req->r_bio); 259 } 260 #endif 261 262 return req; 263 } 264 EXPORT_SYMBOL(ceph_osdc_alloc_request); 265 266 static void osd_req_encode_op(struct ceph_osd_request *req, 267 struct ceph_osd_op *dst, 268 struct ceph_osd_req_op *src) 269 { 270 dst->op = cpu_to_le16(src->op); 271 272 switch (dst->op) { 273 case CEPH_OSD_OP_READ: 274 case CEPH_OSD_OP_WRITE: 275 dst->extent.offset = 276 cpu_to_le64(src->extent.offset); 277 dst->extent.length = 278 cpu_to_le64(src->extent.length); 279 dst->extent.truncate_size = 280 cpu_to_le64(src->extent.truncate_size); 281 dst->extent.truncate_seq = 282 cpu_to_le32(src->extent.truncate_seq); 283 break; 284 285 case CEPH_OSD_OP_GETXATTR: 286 case CEPH_OSD_OP_SETXATTR: 287 case CEPH_OSD_OP_CMPXATTR: 288 BUG_ON(!req->r_trail); 289 290 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 291 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 292 dst->xattr.cmp_op = src->xattr.cmp_op; 293 dst->xattr.cmp_mode = src->xattr.cmp_mode; 294 ceph_pagelist_append(req->r_trail, src->xattr.name, 295 src->xattr.name_len); 296 ceph_pagelist_append(req->r_trail, src->xattr.val, 297 src->xattr.value_len); 298 break; 299 case CEPH_OSD_OP_CALL: 300 BUG_ON(!req->r_trail); 301 302 dst->cls.class_len = src->cls.class_len; 303 dst->cls.method_len = src->cls.method_len; 304 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 305 306 ceph_pagelist_append(req->r_trail, src->cls.class_name, 307 src->cls.class_len); 308 ceph_pagelist_append(req->r_trail, src->cls.method_name, 309 src->cls.method_len); 310 ceph_pagelist_append(req->r_trail, src->cls.indata, 311 src->cls.indata_len); 312 break; 313 case CEPH_OSD_OP_ROLLBACK: 314 dst->snap.snapid = cpu_to_le64(src->snap.snapid); 315 break; 316 case CEPH_OSD_OP_STARTSYNC: 317 break; 318 default: 319 pr_err("unrecognized osd opcode %d\n", dst->op); 320 WARN_ON(1); 321 break; 322 } 323 dst->payload_len = cpu_to_le32(src->payload_len); 324 } 325 326 /* 327 * build new request AND message 328 * 329 */ 330 void ceph_osdc_build_request(struct ceph_osd_request *req, 331 u64 off, u64 *plen, 332 struct ceph_osd_req_op *src_ops, 333 struct ceph_snap_context *snapc, 334 struct timespec *mtime, 335 const char *oid, 336 int oid_len) 337 { 338 struct ceph_msg *msg = req->r_request; 339 struct ceph_osd_request_head *head; 340 struct ceph_osd_req_op *src_op; 341 struct ceph_osd_op *op; 342 void *p; 343 int num_op = get_num_ops(src_ops, NULL); 344 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 345 int flags = req->r_flags; 346 u64 data_len = 0; 347 int i; 348 349 head = msg->front.iov_base; 350 op = (void *)(head + 1); 351 p = (void *)(op + num_op); 352 353 req->r_snapc = ceph_get_snap_context(snapc); 354 355 head->client_inc = cpu_to_le32(1); /* always, for now. */ 356 head->flags = cpu_to_le32(flags); 357 if (flags & CEPH_OSD_FLAG_WRITE) 358 ceph_encode_timespec(&head->mtime, mtime); 359 head->num_ops = cpu_to_le16(num_op); 360 361 362 /* fill in oid */ 363 head->object_len = cpu_to_le32(oid_len); 364 memcpy(p, oid, oid_len); 365 p += oid_len; 366 367 src_op = src_ops; 368 while (src_op->op) { 369 osd_req_encode_op(req, op, src_op); 370 src_op++; 371 op++; 372 } 373 374 if (req->r_trail) 375 data_len += req->r_trail->length; 376 377 if (snapc) { 378 head->snap_seq = cpu_to_le64(snapc->seq); 379 head->num_snaps = cpu_to_le32(snapc->num_snaps); 380 for (i = 0; i < snapc->num_snaps; i++) { 381 put_unaligned_le64(snapc->snaps[i], p); 382 p += sizeof(u64); 383 } 384 } 385 386 if (flags & CEPH_OSD_FLAG_WRITE) { 387 req->r_request->hdr.data_off = cpu_to_le16(off); 388 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 389 } else if (data_len) { 390 req->r_request->hdr.data_off = 0; 391 req->r_request->hdr.data_len = cpu_to_le32(data_len); 392 } 393 394 req->r_request->page_alignment = req->r_page_alignment; 395 396 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 397 msg_size = p - msg->front.iov_base; 398 msg->front.iov_len = msg_size; 399 msg->hdr.front_len = cpu_to_le32(msg_size); 400 return; 401 } 402 EXPORT_SYMBOL(ceph_osdc_build_request); 403 404 /* 405 * build new request AND message, calculate layout, and adjust file 406 * extent as needed. 407 * 408 * if the file was recently truncated, we include information about its 409 * old and new size so that the object can be updated appropriately. (we 410 * avoid synchronously deleting truncated objects because it's slow.) 411 * 412 * if @do_sync, include a 'startsync' command so that the osd will flush 413 * data quickly. 414 */ 415 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 416 struct ceph_file_layout *layout, 417 struct ceph_vino vino, 418 u64 off, u64 *plen, 419 int opcode, int flags, 420 struct ceph_snap_context *snapc, 421 int do_sync, 422 u32 truncate_seq, 423 u64 truncate_size, 424 struct timespec *mtime, 425 bool use_mempool, int num_reply, 426 int page_align) 427 { 428 struct ceph_osd_req_op ops[3]; 429 struct ceph_osd_request *req; 430 431 ops[0].op = opcode; 432 ops[0].extent.truncate_seq = truncate_seq; 433 ops[0].extent.truncate_size = truncate_size; 434 ops[0].payload_len = 0; 435 436 if (do_sync) { 437 ops[1].op = CEPH_OSD_OP_STARTSYNC; 438 ops[1].payload_len = 0; 439 ops[2].op = 0; 440 } else 441 ops[1].op = 0; 442 443 req = ceph_osdc_alloc_request(osdc, flags, 444 snapc, ops, 445 use_mempool, 446 GFP_NOFS, NULL, NULL); 447 if (IS_ERR(req)) 448 return req; 449 450 /* calculate max write size */ 451 calc_layout(osdc, vino, layout, off, plen, req, ops); 452 req->r_file_layout = *layout; /* keep a copy */ 453 454 /* in case it differs from natural alignment that calc_layout 455 filled in for us */ 456 req->r_page_alignment = page_align; 457 458 ceph_osdc_build_request(req, off, plen, ops, 459 snapc, 460 mtime, 461 req->r_oid, req->r_oid_len); 462 463 return req; 464 } 465 EXPORT_SYMBOL(ceph_osdc_new_request); 466 467 /* 468 * We keep osd requests in an rbtree, sorted by ->r_tid. 469 */ 470 static void __insert_request(struct ceph_osd_client *osdc, 471 struct ceph_osd_request *new) 472 { 473 struct rb_node **p = &osdc->requests.rb_node; 474 struct rb_node *parent = NULL; 475 struct ceph_osd_request *req = NULL; 476 477 while (*p) { 478 parent = *p; 479 req = rb_entry(parent, struct ceph_osd_request, r_node); 480 if (new->r_tid < req->r_tid) 481 p = &(*p)->rb_left; 482 else if (new->r_tid > req->r_tid) 483 p = &(*p)->rb_right; 484 else 485 BUG(); 486 } 487 488 rb_link_node(&new->r_node, parent, p); 489 rb_insert_color(&new->r_node, &osdc->requests); 490 } 491 492 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 493 u64 tid) 494 { 495 struct ceph_osd_request *req; 496 struct rb_node *n = osdc->requests.rb_node; 497 498 while (n) { 499 req = rb_entry(n, struct ceph_osd_request, r_node); 500 if (tid < req->r_tid) 501 n = n->rb_left; 502 else if (tid > req->r_tid) 503 n = n->rb_right; 504 else 505 return req; 506 } 507 return NULL; 508 } 509 510 static struct ceph_osd_request * 511 __lookup_request_ge(struct ceph_osd_client *osdc, 512 u64 tid) 513 { 514 struct ceph_osd_request *req; 515 struct rb_node *n = osdc->requests.rb_node; 516 517 while (n) { 518 req = rb_entry(n, struct ceph_osd_request, r_node); 519 if (tid < req->r_tid) { 520 if (!n->rb_left) 521 return req; 522 n = n->rb_left; 523 } else if (tid > req->r_tid) { 524 n = n->rb_right; 525 } else { 526 return req; 527 } 528 } 529 return NULL; 530 } 531 532 533 /* 534 * If the osd connection drops, we need to resubmit all requests. 535 */ 536 static void osd_reset(struct ceph_connection *con) 537 { 538 struct ceph_osd *osd = con->private; 539 struct ceph_osd_client *osdc; 540 541 if (!osd) 542 return; 543 dout("osd_reset osd%d\n", osd->o_osd); 544 osdc = osd->o_osdc; 545 down_read(&osdc->map_sem); 546 kick_requests(osdc, osd); 547 up_read(&osdc->map_sem); 548 } 549 550 /* 551 * Track open sessions with osds. 552 */ 553 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc) 554 { 555 struct ceph_osd *osd; 556 557 osd = kzalloc(sizeof(*osd), GFP_NOFS); 558 if (!osd) 559 return NULL; 560 561 atomic_set(&osd->o_ref, 1); 562 osd->o_osdc = osdc; 563 INIT_LIST_HEAD(&osd->o_requests); 564 INIT_LIST_HEAD(&osd->o_osd_lru); 565 osd->o_incarnation = 1; 566 567 ceph_con_init(osdc->client->msgr, &osd->o_con); 568 osd->o_con.private = osd; 569 osd->o_con.ops = &osd_con_ops; 570 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD; 571 572 INIT_LIST_HEAD(&osd->o_keepalive_item); 573 return osd; 574 } 575 576 static struct ceph_osd *get_osd(struct ceph_osd *osd) 577 { 578 if (atomic_inc_not_zero(&osd->o_ref)) { 579 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 580 atomic_read(&osd->o_ref)); 581 return osd; 582 } else { 583 dout("get_osd %p FAIL\n", osd); 584 return NULL; 585 } 586 } 587 588 static void put_osd(struct ceph_osd *osd) 589 { 590 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 591 atomic_read(&osd->o_ref) - 1); 592 if (atomic_dec_and_test(&osd->o_ref)) { 593 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 594 595 if (osd->o_authorizer) 596 ac->ops->destroy_authorizer(ac, osd->o_authorizer); 597 kfree(osd); 598 } 599 } 600 601 /* 602 * remove an osd from our map 603 */ 604 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 605 { 606 dout("__remove_osd %p\n", osd); 607 BUG_ON(!list_empty(&osd->o_requests)); 608 rb_erase(&osd->o_node, &osdc->osds); 609 list_del_init(&osd->o_osd_lru); 610 ceph_con_close(&osd->o_con); 611 put_osd(osd); 612 } 613 614 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 615 struct ceph_osd *osd) 616 { 617 dout("__move_osd_to_lru %p\n", osd); 618 BUG_ON(!list_empty(&osd->o_osd_lru)); 619 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 620 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 621 } 622 623 static void __remove_osd_from_lru(struct ceph_osd *osd) 624 { 625 dout("__remove_osd_from_lru %p\n", osd); 626 if (!list_empty(&osd->o_osd_lru)) 627 list_del_init(&osd->o_osd_lru); 628 } 629 630 static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all) 631 { 632 struct ceph_osd *osd, *nosd; 633 634 dout("__remove_old_osds %p\n", osdc); 635 mutex_lock(&osdc->request_mutex); 636 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 637 if (!remove_all && time_before(jiffies, osd->lru_ttl)) 638 break; 639 __remove_osd(osdc, osd); 640 } 641 mutex_unlock(&osdc->request_mutex); 642 } 643 644 /* 645 * reset osd connect 646 */ 647 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 648 { 649 struct ceph_osd_request *req; 650 int ret = 0; 651 652 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 653 if (list_empty(&osd->o_requests)) { 654 __remove_osd(osdc, osd); 655 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 656 &osd->o_con.peer_addr, 657 sizeof(osd->o_con.peer_addr)) == 0 && 658 !ceph_con_opened(&osd->o_con)) { 659 dout(" osd addr hasn't changed and connection never opened," 660 " letting msgr retry"); 661 /* touch each r_stamp for handle_timeout()'s benfit */ 662 list_for_each_entry(req, &osd->o_requests, r_osd_item) 663 req->r_stamp = jiffies; 664 ret = -EAGAIN; 665 } else { 666 ceph_con_close(&osd->o_con); 667 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]); 668 osd->o_incarnation++; 669 } 670 return ret; 671 } 672 673 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 674 { 675 struct rb_node **p = &osdc->osds.rb_node; 676 struct rb_node *parent = NULL; 677 struct ceph_osd *osd = NULL; 678 679 while (*p) { 680 parent = *p; 681 osd = rb_entry(parent, struct ceph_osd, o_node); 682 if (new->o_osd < osd->o_osd) 683 p = &(*p)->rb_left; 684 else if (new->o_osd > osd->o_osd) 685 p = &(*p)->rb_right; 686 else 687 BUG(); 688 } 689 690 rb_link_node(&new->o_node, parent, p); 691 rb_insert_color(&new->o_node, &osdc->osds); 692 } 693 694 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 695 { 696 struct ceph_osd *osd; 697 struct rb_node *n = osdc->osds.rb_node; 698 699 while (n) { 700 osd = rb_entry(n, struct ceph_osd, o_node); 701 if (o < osd->o_osd) 702 n = n->rb_left; 703 else if (o > osd->o_osd) 704 n = n->rb_right; 705 else 706 return osd; 707 } 708 return NULL; 709 } 710 711 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 712 { 713 schedule_delayed_work(&osdc->timeout_work, 714 osdc->client->options->osd_keepalive_timeout * HZ); 715 } 716 717 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 718 { 719 cancel_delayed_work(&osdc->timeout_work); 720 } 721 722 /* 723 * Register request, assign tid. If this is the first request, set up 724 * the timeout event. 725 */ 726 static void register_request(struct ceph_osd_client *osdc, 727 struct ceph_osd_request *req) 728 { 729 mutex_lock(&osdc->request_mutex); 730 req->r_tid = ++osdc->last_tid; 731 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 732 INIT_LIST_HEAD(&req->r_req_lru_item); 733 734 dout("register_request %p tid %lld\n", req, req->r_tid); 735 __insert_request(osdc, req); 736 ceph_osdc_get_request(req); 737 osdc->num_requests++; 738 739 if (osdc->num_requests == 1) { 740 dout(" first request, scheduling timeout\n"); 741 __schedule_osd_timeout(osdc); 742 } 743 mutex_unlock(&osdc->request_mutex); 744 } 745 746 /* 747 * called under osdc->request_mutex 748 */ 749 static void __unregister_request(struct ceph_osd_client *osdc, 750 struct ceph_osd_request *req) 751 { 752 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 753 rb_erase(&req->r_node, &osdc->requests); 754 osdc->num_requests--; 755 756 if (req->r_osd) { 757 /* make sure the original request isn't in flight. */ 758 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 759 760 list_del_init(&req->r_osd_item); 761 if (list_empty(&req->r_osd->o_requests)) 762 __move_osd_to_lru(osdc, req->r_osd); 763 req->r_osd = NULL; 764 } 765 766 ceph_osdc_put_request(req); 767 768 list_del_init(&req->r_req_lru_item); 769 if (osdc->num_requests == 0) { 770 dout(" no requests, canceling timeout\n"); 771 __cancel_osd_timeout(osdc); 772 } 773 } 774 775 /* 776 * Cancel a previously queued request message 777 */ 778 static void __cancel_request(struct ceph_osd_request *req) 779 { 780 if (req->r_sent && req->r_osd) { 781 ceph_con_revoke(&req->r_osd->o_con, req->r_request); 782 req->r_sent = 0; 783 } 784 list_del_init(&req->r_req_lru_item); 785 } 786 787 /* 788 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 789 * (as needed), and set the request r_osd appropriately. If there is 790 * no up osd, set r_osd to NULL. 791 * 792 * Return 0 if unchanged, 1 if changed, or negative on error. 793 * 794 * Caller should hold map_sem for read and request_mutex. 795 */ 796 static int __map_osds(struct ceph_osd_client *osdc, 797 struct ceph_osd_request *req) 798 { 799 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 800 struct ceph_pg pgid; 801 int acting[CEPH_PG_MAX_SIZE]; 802 int o = -1, num = 0; 803 int err; 804 805 dout("map_osds %p tid %lld\n", req, req->r_tid); 806 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 807 &req->r_file_layout, osdc->osdmap); 808 if (err) 809 return err; 810 pgid = reqhead->layout.ol_pgid; 811 req->r_pgid = pgid; 812 813 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 814 if (err > 0) { 815 o = acting[0]; 816 num = err; 817 } 818 819 if ((req->r_osd && req->r_osd->o_osd == o && 820 req->r_sent >= req->r_osd->o_incarnation && 821 req->r_num_pg_osds == num && 822 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 823 (req->r_osd == NULL && o == -1)) 824 return 0; /* no change */ 825 826 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n", 827 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 828 req->r_osd ? req->r_osd->o_osd : -1); 829 830 /* record full pg acting set */ 831 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 832 req->r_num_pg_osds = num; 833 834 if (req->r_osd) { 835 __cancel_request(req); 836 list_del_init(&req->r_osd_item); 837 req->r_osd = NULL; 838 } 839 840 req->r_osd = __lookup_osd(osdc, o); 841 if (!req->r_osd && o >= 0) { 842 err = -ENOMEM; 843 req->r_osd = create_osd(osdc); 844 if (!req->r_osd) 845 goto out; 846 847 dout("map_osds osd %p is osd%d\n", req->r_osd, o); 848 req->r_osd->o_osd = o; 849 req->r_osd->o_con.peer_name.num = cpu_to_le64(o); 850 __insert_osd(osdc, req->r_osd); 851 852 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]); 853 } 854 855 if (req->r_osd) { 856 __remove_osd_from_lru(req->r_osd); 857 list_add(&req->r_osd_item, &req->r_osd->o_requests); 858 } 859 err = 1; /* osd or pg changed */ 860 861 out: 862 return err; 863 } 864 865 /* 866 * caller should hold map_sem (for read) and request_mutex 867 */ 868 static int __send_request(struct ceph_osd_client *osdc, 869 struct ceph_osd_request *req) 870 { 871 struct ceph_osd_request_head *reqhead; 872 int err; 873 874 err = __map_osds(osdc, req); 875 if (err < 0) 876 return err; 877 if (req->r_osd == NULL) { 878 dout("send_request %p no up osds in pg\n", req); 879 ceph_monc_request_next_osdmap(&osdc->client->monc); 880 return 0; 881 } 882 883 dout("send_request %p tid %llu to osd%d flags %d\n", 884 req, req->r_tid, req->r_osd->o_osd, req->r_flags); 885 886 reqhead = req->r_request->front.iov_base; 887 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); 888 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 889 reqhead->reassert_version = req->r_reassert_version; 890 891 req->r_stamp = jiffies; 892 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 893 894 ceph_msg_get(req->r_request); /* send consumes a ref */ 895 ceph_con_send(&req->r_osd->o_con, req->r_request); 896 req->r_sent = req->r_osd->o_incarnation; 897 return 0; 898 } 899 900 /* 901 * Timeout callback, called every N seconds when 1 or more osd 902 * requests has been active for more than N seconds. When this 903 * happens, we ping all OSDs with requests who have timed out to 904 * ensure any communications channel reset is detected. Reset the 905 * request timeouts another N seconds in the future as we go. 906 * Reschedule the timeout event another N seconds in future (unless 907 * there are no open requests). 908 */ 909 static void handle_timeout(struct work_struct *work) 910 { 911 struct ceph_osd_client *osdc = 912 container_of(work, struct ceph_osd_client, timeout_work.work); 913 struct ceph_osd_request *req, *last_req = NULL; 914 struct ceph_osd *osd; 915 unsigned long timeout = osdc->client->options->osd_timeout * HZ; 916 unsigned long keepalive = 917 osdc->client->options->osd_keepalive_timeout * HZ; 918 unsigned long last_stamp = 0; 919 struct rb_node *p; 920 struct list_head slow_osds; 921 922 dout("timeout\n"); 923 down_read(&osdc->map_sem); 924 925 ceph_monc_request_next_osdmap(&osdc->client->monc); 926 927 mutex_lock(&osdc->request_mutex); 928 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 929 req = rb_entry(p, struct ceph_osd_request, r_node); 930 931 if (req->r_resend) { 932 int err; 933 934 dout("osdc resending prev failed %lld\n", req->r_tid); 935 err = __send_request(osdc, req); 936 if (err) 937 dout("osdc failed again on %lld\n", req->r_tid); 938 else 939 req->r_resend = false; 940 continue; 941 } 942 } 943 944 /* 945 * reset osds that appear to be _really_ unresponsive. this 946 * is a failsafe measure.. we really shouldn't be getting to 947 * this point if the system is working properly. the monitors 948 * should mark the osd as failed and we should find out about 949 * it from an updated osd map. 950 */ 951 while (timeout && !list_empty(&osdc->req_lru)) { 952 req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 953 r_req_lru_item); 954 955 if (time_before(jiffies, req->r_stamp + timeout)) 956 break; 957 958 BUG_ON(req == last_req && req->r_stamp == last_stamp); 959 last_req = req; 960 last_stamp = req->r_stamp; 961 962 osd = req->r_osd; 963 BUG_ON(!osd); 964 pr_warning(" tid %llu timed out on osd%d, will reset osd\n", 965 req->r_tid, osd->o_osd); 966 __kick_requests(osdc, osd); 967 } 968 969 /* 970 * ping osds that are a bit slow. this ensures that if there 971 * is a break in the TCP connection we will notice, and reopen 972 * a connection with that osd (from the fault callback). 973 */ 974 INIT_LIST_HEAD(&slow_osds); 975 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 976 if (time_before(jiffies, req->r_stamp + keepalive)) 977 break; 978 979 osd = req->r_osd; 980 BUG_ON(!osd); 981 dout(" tid %llu is slow, will send keepalive on osd%d\n", 982 req->r_tid, osd->o_osd); 983 list_move_tail(&osd->o_keepalive_item, &slow_osds); 984 } 985 while (!list_empty(&slow_osds)) { 986 osd = list_entry(slow_osds.next, struct ceph_osd, 987 o_keepalive_item); 988 list_del_init(&osd->o_keepalive_item); 989 ceph_con_keepalive(&osd->o_con); 990 } 991 992 __schedule_osd_timeout(osdc); 993 mutex_unlock(&osdc->request_mutex); 994 995 up_read(&osdc->map_sem); 996 } 997 998 static void handle_osds_timeout(struct work_struct *work) 999 { 1000 struct ceph_osd_client *osdc = 1001 container_of(work, struct ceph_osd_client, 1002 osds_timeout_work.work); 1003 unsigned long delay = 1004 osdc->client->options->osd_idle_ttl * HZ >> 2; 1005 1006 dout("osds timeout\n"); 1007 down_read(&osdc->map_sem); 1008 remove_old_osds(osdc, 0); 1009 up_read(&osdc->map_sem); 1010 1011 schedule_delayed_work(&osdc->osds_timeout_work, 1012 round_jiffies_relative(delay)); 1013 } 1014 1015 /* 1016 * handle osd op reply. either call the callback if it is specified, 1017 * or do the completion to wake up the waiting thread. 1018 */ 1019 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1020 struct ceph_connection *con) 1021 { 1022 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1023 struct ceph_osd_request *req; 1024 u64 tid; 1025 int numops, object_len, flags; 1026 s32 result; 1027 1028 tid = le64_to_cpu(msg->hdr.tid); 1029 if (msg->front.iov_len < sizeof(*rhead)) 1030 goto bad; 1031 numops = le32_to_cpu(rhead->num_ops); 1032 object_len = le32_to_cpu(rhead->object_len); 1033 result = le32_to_cpu(rhead->result); 1034 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1035 numops * sizeof(struct ceph_osd_op)) 1036 goto bad; 1037 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1038 1039 /* lookup */ 1040 mutex_lock(&osdc->request_mutex); 1041 req = __lookup_request(osdc, tid); 1042 if (req == NULL) { 1043 dout("handle_reply tid %llu dne\n", tid); 1044 mutex_unlock(&osdc->request_mutex); 1045 return; 1046 } 1047 ceph_osdc_get_request(req); 1048 flags = le32_to_cpu(rhead->flags); 1049 1050 /* 1051 * if this connection filled our message, drop our reference now, to 1052 * avoid a (safe but slower) revoke later. 1053 */ 1054 if (req->r_con_filling_msg == con && req->r_reply == msg) { 1055 dout(" dropping con_filling_msg ref %p\n", con); 1056 req->r_con_filling_msg = NULL; 1057 ceph_con_put(con); 1058 } 1059 1060 if (!req->r_got_reply) { 1061 unsigned bytes; 1062 1063 req->r_result = le32_to_cpu(rhead->result); 1064 bytes = le32_to_cpu(msg->hdr.data_len); 1065 dout("handle_reply result %d bytes %d\n", req->r_result, 1066 bytes); 1067 if (req->r_result == 0) 1068 req->r_result = bytes; 1069 1070 /* in case this is a write and we need to replay, */ 1071 req->r_reassert_version = rhead->reassert_version; 1072 1073 req->r_got_reply = 1; 1074 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1075 dout("handle_reply tid %llu dup ack\n", tid); 1076 mutex_unlock(&osdc->request_mutex); 1077 goto done; 1078 } 1079 1080 dout("handle_reply tid %llu flags %d\n", tid, flags); 1081 1082 /* either this is a read, or we got the safe response */ 1083 if (result < 0 || 1084 (flags & CEPH_OSD_FLAG_ONDISK) || 1085 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1086 __unregister_request(osdc, req); 1087 1088 mutex_unlock(&osdc->request_mutex); 1089 1090 if (req->r_callback) 1091 req->r_callback(req, msg); 1092 else 1093 complete_all(&req->r_completion); 1094 1095 if (flags & CEPH_OSD_FLAG_ONDISK) { 1096 if (req->r_safe_callback) 1097 req->r_safe_callback(req, msg); 1098 complete_all(&req->r_safe_completion); /* fsync waiter */ 1099 } 1100 1101 done: 1102 ceph_osdc_put_request(req); 1103 return; 1104 1105 bad: 1106 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1107 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1108 (int)sizeof(*rhead)); 1109 ceph_msg_dump(msg); 1110 } 1111 1112 1113 static int __kick_requests(struct ceph_osd_client *osdc, 1114 struct ceph_osd *kickosd) 1115 { 1116 struct ceph_osd_request *req; 1117 struct rb_node *p, *n; 1118 int needmap = 0; 1119 int err; 1120 1121 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1); 1122 if (kickosd) { 1123 err = __reset_osd(osdc, kickosd); 1124 if (err == -EAGAIN) 1125 return 1; 1126 } else { 1127 for (p = rb_first(&osdc->osds); p; p = n) { 1128 struct ceph_osd *osd = 1129 rb_entry(p, struct ceph_osd, o_node); 1130 1131 n = rb_next(p); 1132 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1133 memcmp(&osd->o_con.peer_addr, 1134 ceph_osd_addr(osdc->osdmap, 1135 osd->o_osd), 1136 sizeof(struct ceph_entity_addr)) != 0) 1137 __reset_osd(osdc, osd); 1138 } 1139 } 1140 1141 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { 1142 req = rb_entry(p, struct ceph_osd_request, r_node); 1143 1144 if (req->r_resend) { 1145 dout(" r_resend set on tid %llu\n", req->r_tid); 1146 __cancel_request(req); 1147 goto kick; 1148 } 1149 if (req->r_osd && kickosd == req->r_osd) { 1150 __cancel_request(req); 1151 goto kick; 1152 } 1153 1154 err = __map_osds(osdc, req); 1155 if (err == 0) 1156 continue; /* no change */ 1157 if (err < 0) { 1158 /* 1159 * FIXME: really, we should set the request 1160 * error and fail if this isn't a 'nofail' 1161 * request, but that's a fair bit more 1162 * complicated to do. So retry! 1163 */ 1164 dout(" setting r_resend on %llu\n", req->r_tid); 1165 req->r_resend = true; 1166 continue; 1167 } 1168 if (req->r_osd == NULL) { 1169 dout("tid %llu maps to no valid osd\n", req->r_tid); 1170 needmap++; /* request a newer map */ 1171 continue; 1172 } 1173 1174 kick: 1175 dout("kicking %p tid %llu osd%d\n", req, req->r_tid, 1176 req->r_osd ? req->r_osd->o_osd : -1); 1177 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1178 err = __send_request(osdc, req); 1179 if (err) { 1180 dout(" setting r_resend on %llu\n", req->r_tid); 1181 req->r_resend = true; 1182 } 1183 } 1184 1185 return needmap; 1186 } 1187 1188 /* 1189 * Resubmit osd requests whose osd or osd address has changed. Request 1190 * a new osd map if osds are down, or we are otherwise unable to determine 1191 * how to direct a request. 1192 * 1193 * Close connections to down osds. 1194 * 1195 * If @who is specified, resubmit requests for that specific osd. 1196 * 1197 * Caller should hold map_sem for read and request_mutex. 1198 */ 1199 static void kick_requests(struct ceph_osd_client *osdc, 1200 struct ceph_osd *kickosd) 1201 { 1202 int needmap; 1203 1204 mutex_lock(&osdc->request_mutex); 1205 needmap = __kick_requests(osdc, kickosd); 1206 mutex_unlock(&osdc->request_mutex); 1207 1208 if (needmap) { 1209 dout("%d requests for down osds, need new map\n", needmap); 1210 ceph_monc_request_next_osdmap(&osdc->client->monc); 1211 } 1212 1213 } 1214 /* 1215 * Process updated osd map. 1216 * 1217 * The message contains any number of incremental and full maps, normally 1218 * indicating some sort of topology change in the cluster. Kick requests 1219 * off to different OSDs as needed. 1220 */ 1221 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1222 { 1223 void *p, *end, *next; 1224 u32 nr_maps, maplen; 1225 u32 epoch; 1226 struct ceph_osdmap *newmap = NULL, *oldmap; 1227 int err; 1228 struct ceph_fsid fsid; 1229 1230 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1231 p = msg->front.iov_base; 1232 end = p + msg->front.iov_len; 1233 1234 /* verify fsid */ 1235 ceph_decode_need(&p, end, sizeof(fsid), bad); 1236 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1237 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1238 return; 1239 1240 down_write(&osdc->map_sem); 1241 1242 /* incremental maps */ 1243 ceph_decode_32_safe(&p, end, nr_maps, bad); 1244 dout(" %d inc maps\n", nr_maps); 1245 while (nr_maps > 0) { 1246 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1247 epoch = ceph_decode_32(&p); 1248 maplen = ceph_decode_32(&p); 1249 ceph_decode_need(&p, end, maplen, bad); 1250 next = p + maplen; 1251 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1252 dout("applying incremental map %u len %d\n", 1253 epoch, maplen); 1254 newmap = osdmap_apply_incremental(&p, next, 1255 osdc->osdmap, 1256 osdc->client->msgr); 1257 if (IS_ERR(newmap)) { 1258 err = PTR_ERR(newmap); 1259 goto bad; 1260 } 1261 BUG_ON(!newmap); 1262 if (newmap != osdc->osdmap) { 1263 ceph_osdmap_destroy(osdc->osdmap); 1264 osdc->osdmap = newmap; 1265 } 1266 } else { 1267 dout("ignoring incremental map %u len %d\n", 1268 epoch, maplen); 1269 } 1270 p = next; 1271 nr_maps--; 1272 } 1273 if (newmap) 1274 goto done; 1275 1276 /* full maps */ 1277 ceph_decode_32_safe(&p, end, nr_maps, bad); 1278 dout(" %d full maps\n", nr_maps); 1279 while (nr_maps) { 1280 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1281 epoch = ceph_decode_32(&p); 1282 maplen = ceph_decode_32(&p); 1283 ceph_decode_need(&p, end, maplen, bad); 1284 if (nr_maps > 1) { 1285 dout("skipping non-latest full map %u len %d\n", 1286 epoch, maplen); 1287 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1288 dout("skipping full map %u len %d, " 1289 "older than our %u\n", epoch, maplen, 1290 osdc->osdmap->epoch); 1291 } else { 1292 dout("taking full map %u len %d\n", epoch, maplen); 1293 newmap = osdmap_decode(&p, p+maplen); 1294 if (IS_ERR(newmap)) { 1295 err = PTR_ERR(newmap); 1296 goto bad; 1297 } 1298 BUG_ON(!newmap); 1299 oldmap = osdc->osdmap; 1300 osdc->osdmap = newmap; 1301 if (oldmap) 1302 ceph_osdmap_destroy(oldmap); 1303 } 1304 p += maplen; 1305 nr_maps--; 1306 } 1307 1308 done: 1309 downgrade_write(&osdc->map_sem); 1310 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1311 if (newmap) 1312 kick_requests(osdc, NULL); 1313 up_read(&osdc->map_sem); 1314 wake_up_all(&osdc->client->auth_wq); 1315 return; 1316 1317 bad: 1318 pr_err("osdc handle_map corrupt msg\n"); 1319 ceph_msg_dump(msg); 1320 up_write(&osdc->map_sem); 1321 return; 1322 } 1323 1324 /* 1325 * Register request, send initial attempt. 1326 */ 1327 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1328 struct ceph_osd_request *req, 1329 bool nofail) 1330 { 1331 int rc = 0; 1332 1333 req->r_request->pages = req->r_pages; 1334 req->r_request->nr_pages = req->r_num_pages; 1335 #ifdef CONFIG_BLOCK 1336 req->r_request->bio = req->r_bio; 1337 #endif 1338 req->r_request->trail = req->r_trail; 1339 1340 register_request(osdc, req); 1341 1342 down_read(&osdc->map_sem); 1343 mutex_lock(&osdc->request_mutex); 1344 /* 1345 * a racing kick_requests() may have sent the message for us 1346 * while we dropped request_mutex above, so only send now if 1347 * the request still han't been touched yet. 1348 */ 1349 if (req->r_sent == 0) { 1350 rc = __send_request(osdc, req); 1351 if (rc) { 1352 if (nofail) { 1353 dout("osdc_start_request failed send, " 1354 " marking %lld\n", req->r_tid); 1355 req->r_resend = true; 1356 rc = 0; 1357 } else { 1358 __unregister_request(osdc, req); 1359 } 1360 } 1361 } 1362 mutex_unlock(&osdc->request_mutex); 1363 up_read(&osdc->map_sem); 1364 return rc; 1365 } 1366 EXPORT_SYMBOL(ceph_osdc_start_request); 1367 1368 /* 1369 * wait for a request to complete 1370 */ 1371 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1372 struct ceph_osd_request *req) 1373 { 1374 int rc; 1375 1376 rc = wait_for_completion_interruptible(&req->r_completion); 1377 if (rc < 0) { 1378 mutex_lock(&osdc->request_mutex); 1379 __cancel_request(req); 1380 __unregister_request(osdc, req); 1381 mutex_unlock(&osdc->request_mutex); 1382 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1383 return rc; 1384 } 1385 1386 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1387 return req->r_result; 1388 } 1389 EXPORT_SYMBOL(ceph_osdc_wait_request); 1390 1391 /* 1392 * sync - wait for all in-flight requests to flush. avoid starvation. 1393 */ 1394 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1395 { 1396 struct ceph_osd_request *req; 1397 u64 last_tid, next_tid = 0; 1398 1399 mutex_lock(&osdc->request_mutex); 1400 last_tid = osdc->last_tid; 1401 while (1) { 1402 req = __lookup_request_ge(osdc, next_tid); 1403 if (!req) 1404 break; 1405 if (req->r_tid > last_tid) 1406 break; 1407 1408 next_tid = req->r_tid + 1; 1409 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1410 continue; 1411 1412 ceph_osdc_get_request(req); 1413 mutex_unlock(&osdc->request_mutex); 1414 dout("sync waiting on tid %llu (last is %llu)\n", 1415 req->r_tid, last_tid); 1416 wait_for_completion(&req->r_safe_completion); 1417 mutex_lock(&osdc->request_mutex); 1418 ceph_osdc_put_request(req); 1419 } 1420 mutex_unlock(&osdc->request_mutex); 1421 dout("sync done (thru tid %llu)\n", last_tid); 1422 } 1423 EXPORT_SYMBOL(ceph_osdc_sync); 1424 1425 /* 1426 * init, shutdown 1427 */ 1428 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1429 { 1430 int err; 1431 1432 dout("init\n"); 1433 osdc->client = client; 1434 osdc->osdmap = NULL; 1435 init_rwsem(&osdc->map_sem); 1436 init_completion(&osdc->map_waiters); 1437 osdc->last_requested_map = 0; 1438 mutex_init(&osdc->request_mutex); 1439 osdc->last_tid = 0; 1440 osdc->osds = RB_ROOT; 1441 INIT_LIST_HEAD(&osdc->osd_lru); 1442 osdc->requests = RB_ROOT; 1443 INIT_LIST_HEAD(&osdc->req_lru); 1444 osdc->num_requests = 0; 1445 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1446 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1447 1448 schedule_delayed_work(&osdc->osds_timeout_work, 1449 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1450 1451 err = -ENOMEM; 1452 osdc->req_mempool = mempool_create_kmalloc_pool(10, 1453 sizeof(struct ceph_osd_request)); 1454 if (!osdc->req_mempool) 1455 goto out; 1456 1457 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true, 1458 "osd_op"); 1459 if (err < 0) 1460 goto out_mempool; 1461 err = ceph_msgpool_init(&osdc->msgpool_op_reply, 1462 OSD_OPREPLY_FRONT_LEN, 10, true, 1463 "osd_op_reply"); 1464 if (err < 0) 1465 goto out_msgpool; 1466 return 0; 1467 1468 out_msgpool: 1469 ceph_msgpool_destroy(&osdc->msgpool_op); 1470 out_mempool: 1471 mempool_destroy(osdc->req_mempool); 1472 out: 1473 return err; 1474 } 1475 EXPORT_SYMBOL(ceph_osdc_init); 1476 1477 void ceph_osdc_stop(struct ceph_osd_client *osdc) 1478 { 1479 cancel_delayed_work_sync(&osdc->timeout_work); 1480 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1481 if (osdc->osdmap) { 1482 ceph_osdmap_destroy(osdc->osdmap); 1483 osdc->osdmap = NULL; 1484 } 1485 remove_old_osds(osdc, 1); 1486 mempool_destroy(osdc->req_mempool); 1487 ceph_msgpool_destroy(&osdc->msgpool_op); 1488 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1489 } 1490 EXPORT_SYMBOL(ceph_osdc_stop); 1491 1492 /* 1493 * Read some contiguous pages. If we cross a stripe boundary, shorten 1494 * *plen. Return number of bytes read, or error. 1495 */ 1496 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 1497 struct ceph_vino vino, struct ceph_file_layout *layout, 1498 u64 off, u64 *plen, 1499 u32 truncate_seq, u64 truncate_size, 1500 struct page **pages, int num_pages, int page_align) 1501 { 1502 struct ceph_osd_request *req; 1503 int rc = 0; 1504 1505 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 1506 vino.snap, off, *plen); 1507 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1508 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1509 NULL, 0, truncate_seq, truncate_size, NULL, 1510 false, 1, page_align); 1511 if (!req) 1512 return -ENOMEM; 1513 1514 /* it may be a short read due to an object boundary */ 1515 req->r_pages = pages; 1516 1517 dout("readpages final extent is %llu~%llu (%d pages align %d)\n", 1518 off, *plen, req->r_num_pages, page_align); 1519 1520 rc = ceph_osdc_start_request(osdc, req, false); 1521 if (!rc) 1522 rc = ceph_osdc_wait_request(osdc, req); 1523 1524 ceph_osdc_put_request(req); 1525 dout("readpages result %d\n", rc); 1526 return rc; 1527 } 1528 EXPORT_SYMBOL(ceph_osdc_readpages); 1529 1530 /* 1531 * do a synchronous write on N pages 1532 */ 1533 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 1534 struct ceph_file_layout *layout, 1535 struct ceph_snap_context *snapc, 1536 u64 off, u64 len, 1537 u32 truncate_seq, u64 truncate_size, 1538 struct timespec *mtime, 1539 struct page **pages, int num_pages, 1540 int flags, int do_sync, bool nofail) 1541 { 1542 struct ceph_osd_request *req; 1543 int rc = 0; 1544 int page_align = off & ~PAGE_MASK; 1545 1546 BUG_ON(vino.snap != CEPH_NOSNAP); 1547 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1548 CEPH_OSD_OP_WRITE, 1549 flags | CEPH_OSD_FLAG_ONDISK | 1550 CEPH_OSD_FLAG_WRITE, 1551 snapc, do_sync, 1552 truncate_seq, truncate_size, mtime, 1553 nofail, 1, page_align); 1554 if (!req) 1555 return -ENOMEM; 1556 1557 /* it may be a short write due to an object boundary */ 1558 req->r_pages = pages; 1559 dout("writepages %llu~%llu (%d pages)\n", off, len, 1560 req->r_num_pages); 1561 1562 rc = ceph_osdc_start_request(osdc, req, nofail); 1563 if (!rc) 1564 rc = ceph_osdc_wait_request(osdc, req); 1565 1566 ceph_osdc_put_request(req); 1567 if (rc == 0) 1568 rc = len; 1569 dout("writepages result %d\n", rc); 1570 return rc; 1571 } 1572 EXPORT_SYMBOL(ceph_osdc_writepages); 1573 1574 /* 1575 * handle incoming message 1576 */ 1577 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1578 { 1579 struct ceph_osd *osd = con->private; 1580 struct ceph_osd_client *osdc; 1581 int type = le16_to_cpu(msg->hdr.type); 1582 1583 if (!osd) 1584 goto out; 1585 osdc = osd->o_osdc; 1586 1587 switch (type) { 1588 case CEPH_MSG_OSD_MAP: 1589 ceph_osdc_handle_map(osdc, msg); 1590 break; 1591 case CEPH_MSG_OSD_OPREPLY: 1592 handle_reply(osdc, msg, con); 1593 break; 1594 1595 default: 1596 pr_err("received unknown message type %d %s\n", type, 1597 ceph_msg_type_name(type)); 1598 } 1599 out: 1600 ceph_msg_put(msg); 1601 } 1602 1603 /* 1604 * lookup and return message for incoming reply. set up reply message 1605 * pages. 1606 */ 1607 static struct ceph_msg *get_reply(struct ceph_connection *con, 1608 struct ceph_msg_header *hdr, 1609 int *skip) 1610 { 1611 struct ceph_osd *osd = con->private; 1612 struct ceph_osd_client *osdc = osd->o_osdc; 1613 struct ceph_msg *m; 1614 struct ceph_osd_request *req; 1615 int front = le32_to_cpu(hdr->front_len); 1616 int data_len = le32_to_cpu(hdr->data_len); 1617 u64 tid; 1618 1619 tid = le64_to_cpu(hdr->tid); 1620 mutex_lock(&osdc->request_mutex); 1621 req = __lookup_request(osdc, tid); 1622 if (!req) { 1623 *skip = 1; 1624 m = NULL; 1625 pr_info("get_reply unknown tid %llu from osd%d\n", tid, 1626 osd->o_osd); 1627 goto out; 1628 } 1629 1630 if (req->r_con_filling_msg) { 1631 dout("get_reply revoking msg %p from old con %p\n", 1632 req->r_reply, req->r_con_filling_msg); 1633 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply); 1634 ceph_con_put(req->r_con_filling_msg); 1635 req->r_con_filling_msg = NULL; 1636 } 1637 1638 if (front > req->r_reply->front.iov_len) { 1639 pr_warning("get_reply front %d > preallocated %d\n", 1640 front, (int)req->r_reply->front.iov_len); 1641 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS); 1642 if (!m) 1643 goto out; 1644 ceph_msg_put(req->r_reply); 1645 req->r_reply = m; 1646 } 1647 m = ceph_msg_get(req->r_reply); 1648 1649 if (data_len > 0) { 1650 int want = calc_pages_for(req->r_page_alignment, data_len); 1651 1652 if (unlikely(req->r_num_pages < want)) { 1653 pr_warning("tid %lld reply %d > expected %d pages\n", 1654 tid, want, m->nr_pages); 1655 *skip = 1; 1656 ceph_msg_put(m); 1657 m = NULL; 1658 goto out; 1659 } 1660 m->pages = req->r_pages; 1661 m->nr_pages = req->r_num_pages; 1662 m->page_alignment = req->r_page_alignment; 1663 #ifdef CONFIG_BLOCK 1664 m->bio = req->r_bio; 1665 #endif 1666 } 1667 *skip = 0; 1668 req->r_con_filling_msg = ceph_con_get(con); 1669 dout("get_reply tid %lld %p\n", tid, m); 1670 1671 out: 1672 mutex_unlock(&osdc->request_mutex); 1673 return m; 1674 1675 } 1676 1677 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 1678 struct ceph_msg_header *hdr, 1679 int *skip) 1680 { 1681 struct ceph_osd *osd = con->private; 1682 int type = le16_to_cpu(hdr->type); 1683 int front = le32_to_cpu(hdr->front_len); 1684 1685 switch (type) { 1686 case CEPH_MSG_OSD_MAP: 1687 return ceph_msg_new(type, front, GFP_NOFS); 1688 case CEPH_MSG_OSD_OPREPLY: 1689 return get_reply(con, hdr, skip); 1690 default: 1691 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 1692 osd->o_osd); 1693 *skip = 1; 1694 return NULL; 1695 } 1696 } 1697 1698 /* 1699 * Wrappers to refcount containing ceph_osd struct 1700 */ 1701 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 1702 { 1703 struct ceph_osd *osd = con->private; 1704 if (get_osd(osd)) 1705 return con; 1706 return NULL; 1707 } 1708 1709 static void put_osd_con(struct ceph_connection *con) 1710 { 1711 struct ceph_osd *osd = con->private; 1712 put_osd(osd); 1713 } 1714 1715 /* 1716 * authentication 1717 */ 1718 static int get_authorizer(struct ceph_connection *con, 1719 void **buf, int *len, int *proto, 1720 void **reply_buf, int *reply_len, int force_new) 1721 { 1722 struct ceph_osd *o = con->private; 1723 struct ceph_osd_client *osdc = o->o_osdc; 1724 struct ceph_auth_client *ac = osdc->client->monc.auth; 1725 int ret = 0; 1726 1727 if (force_new && o->o_authorizer) { 1728 ac->ops->destroy_authorizer(ac, o->o_authorizer); 1729 o->o_authorizer = NULL; 1730 } 1731 if (o->o_authorizer == NULL) { 1732 ret = ac->ops->create_authorizer( 1733 ac, CEPH_ENTITY_TYPE_OSD, 1734 &o->o_authorizer, 1735 &o->o_authorizer_buf, 1736 &o->o_authorizer_buf_len, 1737 &o->o_authorizer_reply_buf, 1738 &o->o_authorizer_reply_buf_len); 1739 if (ret) 1740 return ret; 1741 } 1742 1743 *proto = ac->protocol; 1744 *buf = o->o_authorizer_buf; 1745 *len = o->o_authorizer_buf_len; 1746 *reply_buf = o->o_authorizer_reply_buf; 1747 *reply_len = o->o_authorizer_reply_buf_len; 1748 return 0; 1749 } 1750 1751 1752 static int verify_authorizer_reply(struct ceph_connection *con, int len) 1753 { 1754 struct ceph_osd *o = con->private; 1755 struct ceph_osd_client *osdc = o->o_osdc; 1756 struct ceph_auth_client *ac = osdc->client->monc.auth; 1757 1758 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len); 1759 } 1760 1761 static int invalidate_authorizer(struct ceph_connection *con) 1762 { 1763 struct ceph_osd *o = con->private; 1764 struct ceph_osd_client *osdc = o->o_osdc; 1765 struct ceph_auth_client *ac = osdc->client->monc.auth; 1766 1767 if (ac->ops->invalidate_authorizer) 1768 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 1769 1770 return ceph_monc_validate_auth(&osdc->client->monc); 1771 } 1772 1773 static const struct ceph_connection_operations osd_con_ops = { 1774 .get = get_osd_con, 1775 .put = put_osd_con, 1776 .dispatch = dispatch, 1777 .get_authorizer = get_authorizer, 1778 .verify_authorizer_reply = verify_authorizer_reply, 1779 .invalidate_authorizer = invalidate_authorizer, 1780 .alloc_msg = alloc_msg, 1781 .fault = osd_reset, 1782 }; 1783