1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 26 static void send_queued(struct ceph_osd_client *osdc); 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 static void __register_request(struct ceph_osd_client *osdc, 29 struct ceph_osd_request *req); 30 static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __send_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 35 static int op_needs_trail(int op) 36 { 37 switch (op) { 38 case CEPH_OSD_OP_GETXATTR: 39 case CEPH_OSD_OP_SETXATTR: 40 case CEPH_OSD_OP_CMPXATTR: 41 case CEPH_OSD_OP_CALL: 42 case CEPH_OSD_OP_NOTIFY: 43 return 1; 44 default: 45 return 0; 46 } 47 } 48 49 static int op_has_extent(int op) 50 { 51 return (op == CEPH_OSD_OP_READ || 52 op == CEPH_OSD_OP_WRITE); 53 } 54 55 void ceph_calc_raw_layout(struct ceph_osd_client *osdc, 56 struct ceph_file_layout *layout, 57 u64 snapid, 58 u64 off, u64 *plen, u64 *bno, 59 struct ceph_osd_request *req, 60 struct ceph_osd_req_op *op) 61 { 62 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 63 u64 orig_len = *plen; 64 u64 objoff, objlen; /* extent in object */ 65 66 reqhead->snapid = cpu_to_le64(snapid); 67 68 /* object extent? */ 69 ceph_calc_file_object_mapping(layout, off, plen, bno, 70 &objoff, &objlen); 71 if (*plen < orig_len) 72 dout(" skipping last %llu, final file extent %llu~%llu\n", 73 orig_len - *plen, off, *plen); 74 75 if (op_has_extent(op->op)) { 76 op->extent.offset = objoff; 77 op->extent.length = objlen; 78 } 79 req->r_num_pages = calc_pages_for(off, *plen); 80 req->r_page_alignment = off & ~PAGE_MASK; 81 if (op->op == CEPH_OSD_OP_WRITE) 82 op->payload_len = *plen; 83 84 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 85 *bno, objoff, objlen, req->r_num_pages); 86 87 } 88 EXPORT_SYMBOL(ceph_calc_raw_layout); 89 90 /* 91 * Implement client access to distributed object storage cluster. 92 * 93 * All data objects are stored within a cluster/cloud of OSDs, or 94 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 95 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 96 * remote daemons serving up and coordinating consistent and safe 97 * access to storage. 98 * 99 * Cluster membership and the mapping of data objects onto storage devices 100 * are described by the osd map. 101 * 102 * We keep track of pending OSD requests (read, write), resubmit 103 * requests to different OSDs when the cluster topology/data layout 104 * change, or retry the affected requests when the communications 105 * channel with an OSD is reset. 106 */ 107 108 /* 109 * calculate the mapping of a file extent onto an object, and fill out the 110 * request accordingly. shorten extent as necessary if it crosses an 111 * object boundary. 112 * 113 * fill osd op in request message. 114 */ 115 static void calc_layout(struct ceph_osd_client *osdc, 116 struct ceph_vino vino, 117 struct ceph_file_layout *layout, 118 u64 off, u64 *plen, 119 struct ceph_osd_request *req, 120 struct ceph_osd_req_op *op) 121 { 122 u64 bno; 123 124 ceph_calc_raw_layout(osdc, layout, vino.snap, off, 125 plen, &bno, req, op); 126 127 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 128 req->r_oid_len = strlen(req->r_oid); 129 } 130 131 /* 132 * requests 133 */ 134 void ceph_osdc_release_request(struct kref *kref) 135 { 136 struct ceph_osd_request *req = container_of(kref, 137 struct ceph_osd_request, 138 r_kref); 139 140 if (req->r_request) 141 ceph_msg_put(req->r_request); 142 if (req->r_con_filling_msg) { 143 dout("%s revoking pages %p from con %p\n", __func__, 144 req->r_pages, req->r_con_filling_msg); 145 ceph_msg_revoke_incoming(req->r_reply); 146 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 147 } 148 if (req->r_reply) 149 ceph_msg_put(req->r_reply); 150 if (req->r_own_pages) 151 ceph_release_page_vector(req->r_pages, 152 req->r_num_pages); 153 #ifdef CONFIG_BLOCK 154 if (req->r_bio) 155 bio_put(req->r_bio); 156 #endif 157 ceph_put_snap_context(req->r_snapc); 158 if (req->r_trail) { 159 ceph_pagelist_release(req->r_trail); 160 kfree(req->r_trail); 161 } 162 if (req->r_mempool) 163 mempool_free(req, req->r_osdc->req_mempool); 164 else 165 kfree(req); 166 } 167 EXPORT_SYMBOL(ceph_osdc_release_request); 168 169 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail) 170 { 171 int i = 0; 172 173 if (needs_trail) 174 *needs_trail = 0; 175 while (ops[i].op) { 176 if (needs_trail && op_needs_trail(ops[i].op)) 177 *needs_trail = 1; 178 i++; 179 } 180 181 return i; 182 } 183 184 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 185 int flags, 186 struct ceph_snap_context *snapc, 187 struct ceph_osd_req_op *ops, 188 bool use_mempool, 189 gfp_t gfp_flags, 190 struct page **pages, 191 struct bio *bio) 192 { 193 struct ceph_osd_request *req; 194 struct ceph_msg *msg; 195 int needs_trail; 196 int num_op = get_num_ops(ops, &needs_trail); 197 size_t msg_size = sizeof(struct ceph_osd_request_head); 198 199 msg_size += num_op*sizeof(struct ceph_osd_op); 200 201 if (use_mempool) { 202 req = mempool_alloc(osdc->req_mempool, gfp_flags); 203 memset(req, 0, sizeof(*req)); 204 } else { 205 req = kzalloc(sizeof(*req), gfp_flags); 206 } 207 if (req == NULL) 208 return NULL; 209 210 req->r_osdc = osdc; 211 req->r_mempool = use_mempool; 212 213 kref_init(&req->r_kref); 214 init_completion(&req->r_completion); 215 init_completion(&req->r_safe_completion); 216 rb_init_node(&req->r_node); 217 INIT_LIST_HEAD(&req->r_unsafe_item); 218 INIT_LIST_HEAD(&req->r_linger_item); 219 INIT_LIST_HEAD(&req->r_linger_osd); 220 INIT_LIST_HEAD(&req->r_req_lru_item); 221 INIT_LIST_HEAD(&req->r_osd_item); 222 223 req->r_flags = flags; 224 225 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 226 227 /* create reply message */ 228 if (use_mempool) 229 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 230 else 231 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 232 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 233 if (!msg) { 234 ceph_osdc_put_request(req); 235 return NULL; 236 } 237 req->r_reply = msg; 238 239 /* allocate space for the trailing data */ 240 if (needs_trail) { 241 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags); 242 if (!req->r_trail) { 243 ceph_osdc_put_request(req); 244 return NULL; 245 } 246 ceph_pagelist_init(req->r_trail); 247 } 248 249 /* create request message; allow space for oid */ 250 msg_size += MAX_OBJ_NAME_SIZE; 251 if (snapc) 252 msg_size += sizeof(u64) * snapc->num_snaps; 253 if (use_mempool) 254 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 255 else 256 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 257 if (!msg) { 258 ceph_osdc_put_request(req); 259 return NULL; 260 } 261 262 memset(msg->front.iov_base, 0, msg->front.iov_len); 263 264 req->r_request = msg; 265 req->r_pages = pages; 266 #ifdef CONFIG_BLOCK 267 if (bio) { 268 req->r_bio = bio; 269 bio_get(req->r_bio); 270 } 271 #endif 272 273 return req; 274 } 275 EXPORT_SYMBOL(ceph_osdc_alloc_request); 276 277 static void osd_req_encode_op(struct ceph_osd_request *req, 278 struct ceph_osd_op *dst, 279 struct ceph_osd_req_op *src) 280 { 281 dst->op = cpu_to_le16(src->op); 282 283 switch (src->op) { 284 case CEPH_OSD_OP_READ: 285 case CEPH_OSD_OP_WRITE: 286 dst->extent.offset = 287 cpu_to_le64(src->extent.offset); 288 dst->extent.length = 289 cpu_to_le64(src->extent.length); 290 dst->extent.truncate_size = 291 cpu_to_le64(src->extent.truncate_size); 292 dst->extent.truncate_seq = 293 cpu_to_le32(src->extent.truncate_seq); 294 break; 295 296 case CEPH_OSD_OP_GETXATTR: 297 case CEPH_OSD_OP_SETXATTR: 298 case CEPH_OSD_OP_CMPXATTR: 299 BUG_ON(!req->r_trail); 300 301 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 302 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 303 dst->xattr.cmp_op = src->xattr.cmp_op; 304 dst->xattr.cmp_mode = src->xattr.cmp_mode; 305 ceph_pagelist_append(req->r_trail, src->xattr.name, 306 src->xattr.name_len); 307 ceph_pagelist_append(req->r_trail, src->xattr.val, 308 src->xattr.value_len); 309 break; 310 case CEPH_OSD_OP_CALL: 311 BUG_ON(!req->r_trail); 312 313 dst->cls.class_len = src->cls.class_len; 314 dst->cls.method_len = src->cls.method_len; 315 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 316 317 ceph_pagelist_append(req->r_trail, src->cls.class_name, 318 src->cls.class_len); 319 ceph_pagelist_append(req->r_trail, src->cls.method_name, 320 src->cls.method_len); 321 ceph_pagelist_append(req->r_trail, src->cls.indata, 322 src->cls.indata_len); 323 break; 324 case CEPH_OSD_OP_ROLLBACK: 325 dst->snap.snapid = cpu_to_le64(src->snap.snapid); 326 break; 327 case CEPH_OSD_OP_STARTSYNC: 328 break; 329 case CEPH_OSD_OP_NOTIFY: 330 { 331 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); 332 __le32 timeout = cpu_to_le32(src->watch.timeout); 333 334 BUG_ON(!req->r_trail); 335 336 ceph_pagelist_append(req->r_trail, 337 &prot_ver, sizeof(prot_ver)); 338 ceph_pagelist_append(req->r_trail, 339 &timeout, sizeof(timeout)); 340 } 341 case CEPH_OSD_OP_NOTIFY_ACK: 342 case CEPH_OSD_OP_WATCH: 343 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 344 dst->watch.ver = cpu_to_le64(src->watch.ver); 345 dst->watch.flag = src->watch.flag; 346 break; 347 default: 348 pr_err("unrecognized osd opcode %d\n", dst->op); 349 WARN_ON(1); 350 break; 351 } 352 dst->payload_len = cpu_to_le32(src->payload_len); 353 } 354 355 /* 356 * build new request AND message 357 * 358 */ 359 void ceph_osdc_build_request(struct ceph_osd_request *req, 360 u64 off, u64 *plen, 361 struct ceph_osd_req_op *src_ops, 362 struct ceph_snap_context *snapc, 363 struct timespec *mtime, 364 const char *oid, 365 int oid_len) 366 { 367 struct ceph_msg *msg = req->r_request; 368 struct ceph_osd_request_head *head; 369 struct ceph_osd_req_op *src_op; 370 struct ceph_osd_op *op; 371 void *p; 372 int num_op = get_num_ops(src_ops, NULL); 373 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 374 int flags = req->r_flags; 375 u64 data_len = 0; 376 int i; 377 378 head = msg->front.iov_base; 379 op = (void *)(head + 1); 380 p = (void *)(op + num_op); 381 382 req->r_snapc = ceph_get_snap_context(snapc); 383 384 head->client_inc = cpu_to_le32(1); /* always, for now. */ 385 head->flags = cpu_to_le32(flags); 386 if (flags & CEPH_OSD_FLAG_WRITE) 387 ceph_encode_timespec(&head->mtime, mtime); 388 head->num_ops = cpu_to_le16(num_op); 389 390 391 /* fill in oid */ 392 head->object_len = cpu_to_le32(oid_len); 393 memcpy(p, oid, oid_len); 394 p += oid_len; 395 396 src_op = src_ops; 397 while (src_op->op) { 398 osd_req_encode_op(req, op, src_op); 399 src_op++; 400 op++; 401 } 402 403 if (req->r_trail) 404 data_len += req->r_trail->length; 405 406 if (snapc) { 407 head->snap_seq = cpu_to_le64(snapc->seq); 408 head->num_snaps = cpu_to_le32(snapc->num_snaps); 409 for (i = 0; i < snapc->num_snaps; i++) { 410 put_unaligned_le64(snapc->snaps[i], p); 411 p += sizeof(u64); 412 } 413 } 414 415 if (flags & CEPH_OSD_FLAG_WRITE) { 416 req->r_request->hdr.data_off = cpu_to_le16(off); 417 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 418 } else if (data_len) { 419 req->r_request->hdr.data_off = 0; 420 req->r_request->hdr.data_len = cpu_to_le32(data_len); 421 } 422 423 req->r_request->page_alignment = req->r_page_alignment; 424 425 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 426 msg_size = p - msg->front.iov_base; 427 msg->front.iov_len = msg_size; 428 msg->hdr.front_len = cpu_to_le32(msg_size); 429 return; 430 } 431 EXPORT_SYMBOL(ceph_osdc_build_request); 432 433 /* 434 * build new request AND message, calculate layout, and adjust file 435 * extent as needed. 436 * 437 * if the file was recently truncated, we include information about its 438 * old and new size so that the object can be updated appropriately. (we 439 * avoid synchronously deleting truncated objects because it's slow.) 440 * 441 * if @do_sync, include a 'startsync' command so that the osd will flush 442 * data quickly. 443 */ 444 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 445 struct ceph_file_layout *layout, 446 struct ceph_vino vino, 447 u64 off, u64 *plen, 448 int opcode, int flags, 449 struct ceph_snap_context *snapc, 450 int do_sync, 451 u32 truncate_seq, 452 u64 truncate_size, 453 struct timespec *mtime, 454 bool use_mempool, int num_reply, 455 int page_align) 456 { 457 struct ceph_osd_req_op ops[3]; 458 struct ceph_osd_request *req; 459 460 ops[0].op = opcode; 461 ops[0].extent.truncate_seq = truncate_seq; 462 ops[0].extent.truncate_size = truncate_size; 463 ops[0].payload_len = 0; 464 465 if (do_sync) { 466 ops[1].op = CEPH_OSD_OP_STARTSYNC; 467 ops[1].payload_len = 0; 468 ops[2].op = 0; 469 } else 470 ops[1].op = 0; 471 472 req = ceph_osdc_alloc_request(osdc, flags, 473 snapc, ops, 474 use_mempool, 475 GFP_NOFS, NULL, NULL); 476 if (!req) 477 return NULL; 478 479 /* calculate max write size */ 480 calc_layout(osdc, vino, layout, off, plen, req, ops); 481 req->r_file_layout = *layout; /* keep a copy */ 482 483 /* in case it differs from natural (file) alignment that 484 calc_layout filled in for us */ 485 req->r_num_pages = calc_pages_for(page_align, *plen); 486 req->r_page_alignment = page_align; 487 488 ceph_osdc_build_request(req, off, plen, ops, 489 snapc, 490 mtime, 491 req->r_oid, req->r_oid_len); 492 493 return req; 494 } 495 EXPORT_SYMBOL(ceph_osdc_new_request); 496 497 /* 498 * We keep osd requests in an rbtree, sorted by ->r_tid. 499 */ 500 static void __insert_request(struct ceph_osd_client *osdc, 501 struct ceph_osd_request *new) 502 { 503 struct rb_node **p = &osdc->requests.rb_node; 504 struct rb_node *parent = NULL; 505 struct ceph_osd_request *req = NULL; 506 507 while (*p) { 508 parent = *p; 509 req = rb_entry(parent, struct ceph_osd_request, r_node); 510 if (new->r_tid < req->r_tid) 511 p = &(*p)->rb_left; 512 else if (new->r_tid > req->r_tid) 513 p = &(*p)->rb_right; 514 else 515 BUG(); 516 } 517 518 rb_link_node(&new->r_node, parent, p); 519 rb_insert_color(&new->r_node, &osdc->requests); 520 } 521 522 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 523 u64 tid) 524 { 525 struct ceph_osd_request *req; 526 struct rb_node *n = osdc->requests.rb_node; 527 528 while (n) { 529 req = rb_entry(n, struct ceph_osd_request, r_node); 530 if (tid < req->r_tid) 531 n = n->rb_left; 532 else if (tid > req->r_tid) 533 n = n->rb_right; 534 else 535 return req; 536 } 537 return NULL; 538 } 539 540 static struct ceph_osd_request * 541 __lookup_request_ge(struct ceph_osd_client *osdc, 542 u64 tid) 543 { 544 struct ceph_osd_request *req; 545 struct rb_node *n = osdc->requests.rb_node; 546 547 while (n) { 548 req = rb_entry(n, struct ceph_osd_request, r_node); 549 if (tid < req->r_tid) { 550 if (!n->rb_left) 551 return req; 552 n = n->rb_left; 553 } else if (tid > req->r_tid) { 554 n = n->rb_right; 555 } else { 556 return req; 557 } 558 } 559 return NULL; 560 } 561 562 /* 563 * Resubmit requests pending on the given osd. 564 */ 565 static void __kick_osd_requests(struct ceph_osd_client *osdc, 566 struct ceph_osd *osd) 567 { 568 struct ceph_osd_request *req, *nreq; 569 int err; 570 571 dout("__kick_osd_requests osd%d\n", osd->o_osd); 572 err = __reset_osd(osdc, osd); 573 if (err == -EAGAIN) 574 return; 575 576 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 577 list_move(&req->r_req_lru_item, &osdc->req_unsent); 578 dout("requeued %p tid %llu osd%d\n", req, req->r_tid, 579 osd->o_osd); 580 if (!req->r_linger) 581 req->r_flags |= CEPH_OSD_FLAG_RETRY; 582 } 583 584 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 585 r_linger_osd) { 586 /* 587 * reregister request prior to unregistering linger so 588 * that r_osd is preserved. 589 */ 590 BUG_ON(!list_empty(&req->r_req_lru_item)); 591 __register_request(osdc, req); 592 list_add(&req->r_req_lru_item, &osdc->req_unsent); 593 list_add(&req->r_osd_item, &req->r_osd->o_requests); 594 __unregister_linger_request(osdc, req); 595 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 596 osd->o_osd); 597 } 598 } 599 600 static void kick_osd_requests(struct ceph_osd_client *osdc, 601 struct ceph_osd *kickosd) 602 { 603 mutex_lock(&osdc->request_mutex); 604 __kick_osd_requests(osdc, kickosd); 605 mutex_unlock(&osdc->request_mutex); 606 } 607 608 /* 609 * If the osd connection drops, we need to resubmit all requests. 610 */ 611 static void osd_reset(struct ceph_connection *con) 612 { 613 struct ceph_osd *osd = con->private; 614 struct ceph_osd_client *osdc; 615 616 if (!osd) 617 return; 618 dout("osd_reset osd%d\n", osd->o_osd); 619 osdc = osd->o_osdc; 620 down_read(&osdc->map_sem); 621 kick_osd_requests(osdc, osd); 622 send_queued(osdc); 623 up_read(&osdc->map_sem); 624 } 625 626 /* 627 * Track open sessions with osds. 628 */ 629 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 630 { 631 struct ceph_osd *osd; 632 633 osd = kzalloc(sizeof(*osd), GFP_NOFS); 634 if (!osd) 635 return NULL; 636 637 atomic_set(&osd->o_ref, 1); 638 osd->o_osdc = osdc; 639 osd->o_osd = onum; 640 INIT_LIST_HEAD(&osd->o_requests); 641 INIT_LIST_HEAD(&osd->o_linger_requests); 642 INIT_LIST_HEAD(&osd->o_osd_lru); 643 osd->o_incarnation = 1; 644 645 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 646 647 INIT_LIST_HEAD(&osd->o_keepalive_item); 648 return osd; 649 } 650 651 static struct ceph_osd *get_osd(struct ceph_osd *osd) 652 { 653 if (atomic_inc_not_zero(&osd->o_ref)) { 654 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 655 atomic_read(&osd->o_ref)); 656 return osd; 657 } else { 658 dout("get_osd %p FAIL\n", osd); 659 return NULL; 660 } 661 } 662 663 static void put_osd(struct ceph_osd *osd) 664 { 665 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 666 atomic_read(&osd->o_ref) - 1); 667 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 668 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 669 670 if (ac->ops && ac->ops->destroy_authorizer) 671 ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer); 672 kfree(osd); 673 } 674 } 675 676 /* 677 * remove an osd from our map 678 */ 679 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 680 { 681 dout("__remove_osd %p\n", osd); 682 BUG_ON(!list_empty(&osd->o_requests)); 683 rb_erase(&osd->o_node, &osdc->osds); 684 list_del_init(&osd->o_osd_lru); 685 ceph_con_close(&osd->o_con); 686 put_osd(osd); 687 } 688 689 static void remove_all_osds(struct ceph_osd_client *osdc) 690 { 691 dout("%s %p\n", __func__, osdc); 692 mutex_lock(&osdc->request_mutex); 693 while (!RB_EMPTY_ROOT(&osdc->osds)) { 694 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 695 struct ceph_osd, o_node); 696 __remove_osd(osdc, osd); 697 } 698 mutex_unlock(&osdc->request_mutex); 699 } 700 701 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 702 struct ceph_osd *osd) 703 { 704 dout("__move_osd_to_lru %p\n", osd); 705 BUG_ON(!list_empty(&osd->o_osd_lru)); 706 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 707 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 708 } 709 710 static void __remove_osd_from_lru(struct ceph_osd *osd) 711 { 712 dout("__remove_osd_from_lru %p\n", osd); 713 if (!list_empty(&osd->o_osd_lru)) 714 list_del_init(&osd->o_osd_lru); 715 } 716 717 static void remove_old_osds(struct ceph_osd_client *osdc) 718 { 719 struct ceph_osd *osd, *nosd; 720 721 dout("__remove_old_osds %p\n", osdc); 722 mutex_lock(&osdc->request_mutex); 723 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 724 if (time_before(jiffies, osd->lru_ttl)) 725 break; 726 __remove_osd(osdc, osd); 727 } 728 mutex_unlock(&osdc->request_mutex); 729 } 730 731 /* 732 * reset osd connect 733 */ 734 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 735 { 736 struct ceph_osd_request *req; 737 int ret = 0; 738 739 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 740 if (list_empty(&osd->o_requests) && 741 list_empty(&osd->o_linger_requests)) { 742 __remove_osd(osdc, osd); 743 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 744 &osd->o_con.peer_addr, 745 sizeof(osd->o_con.peer_addr)) == 0 && 746 !ceph_con_opened(&osd->o_con)) { 747 dout(" osd addr hasn't changed and connection never opened," 748 " letting msgr retry"); 749 /* touch each r_stamp for handle_timeout()'s benfit */ 750 list_for_each_entry(req, &osd->o_requests, r_osd_item) 751 req->r_stamp = jiffies; 752 ret = -EAGAIN; 753 } else { 754 ceph_con_close(&osd->o_con); 755 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, 756 &osdc->osdmap->osd_addr[osd->o_osd]); 757 osd->o_incarnation++; 758 } 759 return ret; 760 } 761 762 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 763 { 764 struct rb_node **p = &osdc->osds.rb_node; 765 struct rb_node *parent = NULL; 766 struct ceph_osd *osd = NULL; 767 768 dout("__insert_osd %p osd%d\n", new, new->o_osd); 769 while (*p) { 770 parent = *p; 771 osd = rb_entry(parent, struct ceph_osd, o_node); 772 if (new->o_osd < osd->o_osd) 773 p = &(*p)->rb_left; 774 else if (new->o_osd > osd->o_osd) 775 p = &(*p)->rb_right; 776 else 777 BUG(); 778 } 779 780 rb_link_node(&new->o_node, parent, p); 781 rb_insert_color(&new->o_node, &osdc->osds); 782 } 783 784 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 785 { 786 struct ceph_osd *osd; 787 struct rb_node *n = osdc->osds.rb_node; 788 789 while (n) { 790 osd = rb_entry(n, struct ceph_osd, o_node); 791 if (o < osd->o_osd) 792 n = n->rb_left; 793 else if (o > osd->o_osd) 794 n = n->rb_right; 795 else 796 return osd; 797 } 798 return NULL; 799 } 800 801 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 802 { 803 schedule_delayed_work(&osdc->timeout_work, 804 osdc->client->options->osd_keepalive_timeout * HZ); 805 } 806 807 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 808 { 809 cancel_delayed_work(&osdc->timeout_work); 810 } 811 812 /* 813 * Register request, assign tid. If this is the first request, set up 814 * the timeout event. 815 */ 816 static void __register_request(struct ceph_osd_client *osdc, 817 struct ceph_osd_request *req) 818 { 819 req->r_tid = ++osdc->last_tid; 820 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 821 dout("__register_request %p tid %lld\n", req, req->r_tid); 822 __insert_request(osdc, req); 823 ceph_osdc_get_request(req); 824 osdc->num_requests++; 825 if (osdc->num_requests == 1) { 826 dout(" first request, scheduling timeout\n"); 827 __schedule_osd_timeout(osdc); 828 } 829 } 830 831 static void register_request(struct ceph_osd_client *osdc, 832 struct ceph_osd_request *req) 833 { 834 mutex_lock(&osdc->request_mutex); 835 __register_request(osdc, req); 836 mutex_unlock(&osdc->request_mutex); 837 } 838 839 /* 840 * called under osdc->request_mutex 841 */ 842 static void __unregister_request(struct ceph_osd_client *osdc, 843 struct ceph_osd_request *req) 844 { 845 if (RB_EMPTY_NODE(&req->r_node)) { 846 dout("__unregister_request %p tid %lld not registered\n", 847 req, req->r_tid); 848 return; 849 } 850 851 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 852 rb_erase(&req->r_node, &osdc->requests); 853 osdc->num_requests--; 854 855 if (req->r_osd) { 856 /* make sure the original request isn't in flight. */ 857 ceph_msg_revoke(req->r_request); 858 859 list_del_init(&req->r_osd_item); 860 if (list_empty(&req->r_osd->o_requests) && 861 list_empty(&req->r_osd->o_linger_requests)) { 862 dout("moving osd to %p lru\n", req->r_osd); 863 __move_osd_to_lru(osdc, req->r_osd); 864 } 865 if (list_empty(&req->r_linger_item)) 866 req->r_osd = NULL; 867 } 868 869 ceph_osdc_put_request(req); 870 871 list_del_init(&req->r_req_lru_item); 872 if (osdc->num_requests == 0) { 873 dout(" no requests, canceling timeout\n"); 874 __cancel_osd_timeout(osdc); 875 } 876 } 877 878 /* 879 * Cancel a previously queued request message 880 */ 881 static void __cancel_request(struct ceph_osd_request *req) 882 { 883 if (req->r_sent && req->r_osd) { 884 ceph_msg_revoke(req->r_request); 885 req->r_sent = 0; 886 } 887 } 888 889 static void __register_linger_request(struct ceph_osd_client *osdc, 890 struct ceph_osd_request *req) 891 { 892 dout("__register_linger_request %p\n", req); 893 list_add_tail(&req->r_linger_item, &osdc->req_linger); 894 if (req->r_osd) 895 list_add_tail(&req->r_linger_osd, 896 &req->r_osd->o_linger_requests); 897 } 898 899 static void __unregister_linger_request(struct ceph_osd_client *osdc, 900 struct ceph_osd_request *req) 901 { 902 dout("__unregister_linger_request %p\n", req); 903 if (req->r_osd) { 904 list_del_init(&req->r_linger_item); 905 list_del_init(&req->r_linger_osd); 906 907 if (list_empty(&req->r_osd->o_requests) && 908 list_empty(&req->r_osd->o_linger_requests)) { 909 dout("moving osd to %p lru\n", req->r_osd); 910 __move_osd_to_lru(osdc, req->r_osd); 911 } 912 if (list_empty(&req->r_osd_item)) 913 req->r_osd = NULL; 914 } 915 } 916 917 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 918 struct ceph_osd_request *req) 919 { 920 mutex_lock(&osdc->request_mutex); 921 if (req->r_linger) { 922 __unregister_linger_request(osdc, req); 923 ceph_osdc_put_request(req); 924 } 925 mutex_unlock(&osdc->request_mutex); 926 } 927 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 928 929 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 930 struct ceph_osd_request *req) 931 { 932 if (!req->r_linger) { 933 dout("set_request_linger %p\n", req); 934 req->r_linger = 1; 935 /* 936 * caller is now responsible for calling 937 * unregister_linger_request 938 */ 939 ceph_osdc_get_request(req); 940 } 941 } 942 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 943 944 /* 945 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 946 * (as needed), and set the request r_osd appropriately. If there is 947 * no up osd, set r_osd to NULL. Move the request to the appropriate list 948 * (unsent, homeless) or leave on in-flight lru. 949 * 950 * Return 0 if unchanged, 1 if changed, or negative on error. 951 * 952 * Caller should hold map_sem for read and request_mutex. 953 */ 954 static int __map_request(struct ceph_osd_client *osdc, 955 struct ceph_osd_request *req, int force_resend) 956 { 957 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 958 struct ceph_pg pgid; 959 int acting[CEPH_PG_MAX_SIZE]; 960 int o = -1, num = 0; 961 int err; 962 963 dout("map_request %p tid %lld\n", req, req->r_tid); 964 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 965 &req->r_file_layout, osdc->osdmap); 966 if (err) { 967 list_move(&req->r_req_lru_item, &osdc->req_notarget); 968 return err; 969 } 970 pgid = reqhead->layout.ol_pgid; 971 req->r_pgid = pgid; 972 973 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 974 if (err > 0) { 975 o = acting[0]; 976 num = err; 977 } 978 979 if ((!force_resend && 980 req->r_osd && req->r_osd->o_osd == o && 981 req->r_sent >= req->r_osd->o_incarnation && 982 req->r_num_pg_osds == num && 983 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 984 (req->r_osd == NULL && o == -1)) 985 return 0; /* no change */ 986 987 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", 988 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 989 req->r_osd ? req->r_osd->o_osd : -1); 990 991 /* record full pg acting set */ 992 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 993 req->r_num_pg_osds = num; 994 995 if (req->r_osd) { 996 __cancel_request(req); 997 list_del_init(&req->r_osd_item); 998 req->r_osd = NULL; 999 } 1000 1001 req->r_osd = __lookup_osd(osdc, o); 1002 if (!req->r_osd && o >= 0) { 1003 err = -ENOMEM; 1004 req->r_osd = create_osd(osdc, o); 1005 if (!req->r_osd) { 1006 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1007 goto out; 1008 } 1009 1010 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1011 __insert_osd(osdc, req->r_osd); 1012 1013 ceph_con_open(&req->r_osd->o_con, 1014 CEPH_ENTITY_TYPE_OSD, o, 1015 &osdc->osdmap->osd_addr[o]); 1016 } 1017 1018 if (req->r_osd) { 1019 __remove_osd_from_lru(req->r_osd); 1020 list_add(&req->r_osd_item, &req->r_osd->o_requests); 1021 list_move(&req->r_req_lru_item, &osdc->req_unsent); 1022 } else { 1023 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1024 } 1025 err = 1; /* osd or pg changed */ 1026 1027 out: 1028 return err; 1029 } 1030 1031 /* 1032 * caller should hold map_sem (for read) and request_mutex 1033 */ 1034 static void __send_request(struct ceph_osd_client *osdc, 1035 struct ceph_osd_request *req) 1036 { 1037 struct ceph_osd_request_head *reqhead; 1038 1039 dout("send_request %p tid %llu to osd%d flags %d\n", 1040 req, req->r_tid, req->r_osd->o_osd, req->r_flags); 1041 1042 reqhead = req->r_request->front.iov_base; 1043 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); 1044 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 1045 reqhead->reassert_version = req->r_reassert_version; 1046 1047 req->r_stamp = jiffies; 1048 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1049 1050 ceph_msg_get(req->r_request); /* send consumes a ref */ 1051 ceph_con_send(&req->r_osd->o_con, req->r_request); 1052 req->r_sent = req->r_osd->o_incarnation; 1053 } 1054 1055 /* 1056 * Send any requests in the queue (req_unsent). 1057 */ 1058 static void send_queued(struct ceph_osd_client *osdc) 1059 { 1060 struct ceph_osd_request *req, *tmp; 1061 1062 dout("send_queued\n"); 1063 mutex_lock(&osdc->request_mutex); 1064 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { 1065 __send_request(osdc, req); 1066 } 1067 mutex_unlock(&osdc->request_mutex); 1068 } 1069 1070 /* 1071 * Timeout callback, called every N seconds when 1 or more osd 1072 * requests has been active for more than N seconds. When this 1073 * happens, we ping all OSDs with requests who have timed out to 1074 * ensure any communications channel reset is detected. Reset the 1075 * request timeouts another N seconds in the future as we go. 1076 * Reschedule the timeout event another N seconds in future (unless 1077 * there are no open requests). 1078 */ 1079 static void handle_timeout(struct work_struct *work) 1080 { 1081 struct ceph_osd_client *osdc = 1082 container_of(work, struct ceph_osd_client, timeout_work.work); 1083 struct ceph_osd_request *req, *last_req = NULL; 1084 struct ceph_osd *osd; 1085 unsigned long timeout = osdc->client->options->osd_timeout * HZ; 1086 unsigned long keepalive = 1087 osdc->client->options->osd_keepalive_timeout * HZ; 1088 unsigned long last_stamp = 0; 1089 struct list_head slow_osds; 1090 dout("timeout\n"); 1091 down_read(&osdc->map_sem); 1092 1093 ceph_monc_request_next_osdmap(&osdc->client->monc); 1094 1095 mutex_lock(&osdc->request_mutex); 1096 1097 /* 1098 * reset osds that appear to be _really_ unresponsive. this 1099 * is a failsafe measure.. we really shouldn't be getting to 1100 * this point if the system is working properly. the monitors 1101 * should mark the osd as failed and we should find out about 1102 * it from an updated osd map. 1103 */ 1104 while (timeout && !list_empty(&osdc->req_lru)) { 1105 req = list_entry(osdc->req_lru.next, struct ceph_osd_request, 1106 r_req_lru_item); 1107 1108 /* hasn't been long enough since we sent it? */ 1109 if (time_before(jiffies, req->r_stamp + timeout)) 1110 break; 1111 1112 /* hasn't been long enough since it was acked? */ 1113 if (req->r_request->ack_stamp == 0 || 1114 time_before(jiffies, req->r_request->ack_stamp + timeout)) 1115 break; 1116 1117 BUG_ON(req == last_req && req->r_stamp == last_stamp); 1118 last_req = req; 1119 last_stamp = req->r_stamp; 1120 1121 osd = req->r_osd; 1122 BUG_ON(!osd); 1123 pr_warning(" tid %llu timed out on osd%d, will reset osd\n", 1124 req->r_tid, osd->o_osd); 1125 __kick_osd_requests(osdc, osd); 1126 } 1127 1128 /* 1129 * ping osds that are a bit slow. this ensures that if there 1130 * is a break in the TCP connection we will notice, and reopen 1131 * a connection with that osd (from the fault callback). 1132 */ 1133 INIT_LIST_HEAD(&slow_osds); 1134 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1135 if (time_before(jiffies, req->r_stamp + keepalive)) 1136 break; 1137 1138 osd = req->r_osd; 1139 BUG_ON(!osd); 1140 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1141 req->r_tid, osd->o_osd); 1142 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1143 } 1144 while (!list_empty(&slow_osds)) { 1145 osd = list_entry(slow_osds.next, struct ceph_osd, 1146 o_keepalive_item); 1147 list_del_init(&osd->o_keepalive_item); 1148 ceph_con_keepalive(&osd->o_con); 1149 } 1150 1151 __schedule_osd_timeout(osdc); 1152 mutex_unlock(&osdc->request_mutex); 1153 send_queued(osdc); 1154 up_read(&osdc->map_sem); 1155 } 1156 1157 static void handle_osds_timeout(struct work_struct *work) 1158 { 1159 struct ceph_osd_client *osdc = 1160 container_of(work, struct ceph_osd_client, 1161 osds_timeout_work.work); 1162 unsigned long delay = 1163 osdc->client->options->osd_idle_ttl * HZ >> 2; 1164 1165 dout("osds timeout\n"); 1166 down_read(&osdc->map_sem); 1167 remove_old_osds(osdc); 1168 up_read(&osdc->map_sem); 1169 1170 schedule_delayed_work(&osdc->osds_timeout_work, 1171 round_jiffies_relative(delay)); 1172 } 1173 1174 static void complete_request(struct ceph_osd_request *req) 1175 { 1176 if (req->r_safe_callback) 1177 req->r_safe_callback(req, NULL); 1178 complete_all(&req->r_safe_completion); /* fsync waiter */ 1179 } 1180 1181 /* 1182 * handle osd op reply. either call the callback if it is specified, 1183 * or do the completion to wake up the waiting thread. 1184 */ 1185 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1186 struct ceph_connection *con) 1187 { 1188 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1189 struct ceph_osd_request *req; 1190 u64 tid; 1191 int numops, object_len, flags; 1192 s32 result; 1193 1194 tid = le64_to_cpu(msg->hdr.tid); 1195 if (msg->front.iov_len < sizeof(*rhead)) 1196 goto bad; 1197 numops = le32_to_cpu(rhead->num_ops); 1198 object_len = le32_to_cpu(rhead->object_len); 1199 result = le32_to_cpu(rhead->result); 1200 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1201 numops * sizeof(struct ceph_osd_op)) 1202 goto bad; 1203 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1204 /* lookup */ 1205 mutex_lock(&osdc->request_mutex); 1206 req = __lookup_request(osdc, tid); 1207 if (req == NULL) { 1208 dout("handle_reply tid %llu dne\n", tid); 1209 mutex_unlock(&osdc->request_mutex); 1210 return; 1211 } 1212 ceph_osdc_get_request(req); 1213 flags = le32_to_cpu(rhead->flags); 1214 1215 /* 1216 * if this connection filled our message, drop our reference now, to 1217 * avoid a (safe but slower) revoke later. 1218 */ 1219 if (req->r_con_filling_msg == con && req->r_reply == msg) { 1220 dout(" dropping con_filling_msg ref %p\n", con); 1221 req->r_con_filling_msg = NULL; 1222 con->ops->put(con); 1223 } 1224 1225 if (!req->r_got_reply) { 1226 unsigned int bytes; 1227 1228 req->r_result = le32_to_cpu(rhead->result); 1229 bytes = le32_to_cpu(msg->hdr.data_len); 1230 dout("handle_reply result %d bytes %d\n", req->r_result, 1231 bytes); 1232 if (req->r_result == 0) 1233 req->r_result = bytes; 1234 1235 /* in case this is a write and we need to replay, */ 1236 req->r_reassert_version = rhead->reassert_version; 1237 1238 req->r_got_reply = 1; 1239 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1240 dout("handle_reply tid %llu dup ack\n", tid); 1241 mutex_unlock(&osdc->request_mutex); 1242 goto done; 1243 } 1244 1245 dout("handle_reply tid %llu flags %d\n", tid, flags); 1246 1247 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1248 __register_linger_request(osdc, req); 1249 1250 /* either this is a read, or we got the safe response */ 1251 if (result < 0 || 1252 (flags & CEPH_OSD_FLAG_ONDISK) || 1253 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1254 __unregister_request(osdc, req); 1255 1256 mutex_unlock(&osdc->request_mutex); 1257 1258 if (req->r_callback) 1259 req->r_callback(req, msg); 1260 else 1261 complete_all(&req->r_completion); 1262 1263 if (flags & CEPH_OSD_FLAG_ONDISK) 1264 complete_request(req); 1265 1266 done: 1267 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1268 ceph_osdc_put_request(req); 1269 return; 1270 1271 bad: 1272 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1273 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1274 (int)sizeof(*rhead)); 1275 ceph_msg_dump(msg); 1276 } 1277 1278 static void reset_changed_osds(struct ceph_osd_client *osdc) 1279 { 1280 struct rb_node *p, *n; 1281 1282 for (p = rb_first(&osdc->osds); p; p = n) { 1283 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1284 1285 n = rb_next(p); 1286 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1287 memcmp(&osd->o_con.peer_addr, 1288 ceph_osd_addr(osdc->osdmap, 1289 osd->o_osd), 1290 sizeof(struct ceph_entity_addr)) != 0) 1291 __reset_osd(osdc, osd); 1292 } 1293 } 1294 1295 /* 1296 * Requeue requests whose mapping to an OSD has changed. If requests map to 1297 * no osd, request a new map. 1298 * 1299 * Caller should hold map_sem for read and request_mutex. 1300 */ 1301 static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1302 { 1303 struct ceph_osd_request *req, *nreq; 1304 struct rb_node *p; 1305 int needmap = 0; 1306 int err; 1307 1308 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1309 mutex_lock(&osdc->request_mutex); 1310 for (p = rb_first(&osdc->requests); p; ) { 1311 req = rb_entry(p, struct ceph_osd_request, r_node); 1312 p = rb_next(p); 1313 err = __map_request(osdc, req, force_resend); 1314 if (err < 0) 1315 continue; /* error */ 1316 if (req->r_osd == NULL) { 1317 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1318 needmap++; /* request a newer map */ 1319 } else if (err > 0) { 1320 if (!req->r_linger) { 1321 dout("%p tid %llu requeued on osd%d\n", req, 1322 req->r_tid, 1323 req->r_osd ? req->r_osd->o_osd : -1); 1324 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1325 } 1326 } 1327 if (req->r_linger && list_empty(&req->r_linger_item)) { 1328 /* 1329 * register as a linger so that we will 1330 * re-submit below and get a new tid 1331 */ 1332 dout("%p tid %llu restart on osd%d\n", 1333 req, req->r_tid, 1334 req->r_osd ? req->r_osd->o_osd : -1); 1335 __register_linger_request(osdc, req); 1336 __unregister_request(osdc, req); 1337 } 1338 } 1339 1340 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1341 r_linger_item) { 1342 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1343 1344 err = __map_request(osdc, req, force_resend); 1345 if (err == 0) 1346 continue; /* no change and no osd was specified */ 1347 if (err < 0) 1348 continue; /* hrm! */ 1349 if (req->r_osd == NULL) { 1350 dout("tid %llu maps to no valid osd\n", req->r_tid); 1351 needmap++; /* request a newer map */ 1352 continue; 1353 } 1354 1355 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1356 req->r_osd ? req->r_osd->o_osd : -1); 1357 __unregister_linger_request(osdc, req); 1358 __register_request(osdc, req); 1359 } 1360 mutex_unlock(&osdc->request_mutex); 1361 1362 if (needmap) { 1363 dout("%d requests for down osds, need new map\n", needmap); 1364 ceph_monc_request_next_osdmap(&osdc->client->monc); 1365 } 1366 } 1367 1368 1369 /* 1370 * Process updated osd map. 1371 * 1372 * The message contains any number of incremental and full maps, normally 1373 * indicating some sort of topology change in the cluster. Kick requests 1374 * off to different OSDs as needed. 1375 */ 1376 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1377 { 1378 void *p, *end, *next; 1379 u32 nr_maps, maplen; 1380 u32 epoch; 1381 struct ceph_osdmap *newmap = NULL, *oldmap; 1382 int err; 1383 struct ceph_fsid fsid; 1384 1385 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1386 p = msg->front.iov_base; 1387 end = p + msg->front.iov_len; 1388 1389 /* verify fsid */ 1390 ceph_decode_need(&p, end, sizeof(fsid), bad); 1391 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1392 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1393 return; 1394 1395 down_write(&osdc->map_sem); 1396 1397 /* incremental maps */ 1398 ceph_decode_32_safe(&p, end, nr_maps, bad); 1399 dout(" %d inc maps\n", nr_maps); 1400 while (nr_maps > 0) { 1401 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1402 epoch = ceph_decode_32(&p); 1403 maplen = ceph_decode_32(&p); 1404 ceph_decode_need(&p, end, maplen, bad); 1405 next = p + maplen; 1406 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1407 dout("applying incremental map %u len %d\n", 1408 epoch, maplen); 1409 newmap = osdmap_apply_incremental(&p, next, 1410 osdc->osdmap, 1411 &osdc->client->msgr); 1412 if (IS_ERR(newmap)) { 1413 err = PTR_ERR(newmap); 1414 goto bad; 1415 } 1416 BUG_ON(!newmap); 1417 if (newmap != osdc->osdmap) { 1418 ceph_osdmap_destroy(osdc->osdmap); 1419 osdc->osdmap = newmap; 1420 } 1421 kick_requests(osdc, 0); 1422 reset_changed_osds(osdc); 1423 } else { 1424 dout("ignoring incremental map %u len %d\n", 1425 epoch, maplen); 1426 } 1427 p = next; 1428 nr_maps--; 1429 } 1430 if (newmap) 1431 goto done; 1432 1433 /* full maps */ 1434 ceph_decode_32_safe(&p, end, nr_maps, bad); 1435 dout(" %d full maps\n", nr_maps); 1436 while (nr_maps) { 1437 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1438 epoch = ceph_decode_32(&p); 1439 maplen = ceph_decode_32(&p); 1440 ceph_decode_need(&p, end, maplen, bad); 1441 if (nr_maps > 1) { 1442 dout("skipping non-latest full map %u len %d\n", 1443 epoch, maplen); 1444 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1445 dout("skipping full map %u len %d, " 1446 "older than our %u\n", epoch, maplen, 1447 osdc->osdmap->epoch); 1448 } else { 1449 int skipped_map = 0; 1450 1451 dout("taking full map %u len %d\n", epoch, maplen); 1452 newmap = osdmap_decode(&p, p+maplen); 1453 if (IS_ERR(newmap)) { 1454 err = PTR_ERR(newmap); 1455 goto bad; 1456 } 1457 BUG_ON(!newmap); 1458 oldmap = osdc->osdmap; 1459 osdc->osdmap = newmap; 1460 if (oldmap) { 1461 if (oldmap->epoch + 1 < newmap->epoch) 1462 skipped_map = 1; 1463 ceph_osdmap_destroy(oldmap); 1464 } 1465 kick_requests(osdc, skipped_map); 1466 } 1467 p += maplen; 1468 nr_maps--; 1469 } 1470 1471 done: 1472 downgrade_write(&osdc->map_sem); 1473 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1474 1475 /* 1476 * subscribe to subsequent osdmap updates if full to ensure 1477 * we find out when we are no longer full and stop returning 1478 * ENOSPC. 1479 */ 1480 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1481 ceph_monc_request_next_osdmap(&osdc->client->monc); 1482 1483 send_queued(osdc); 1484 up_read(&osdc->map_sem); 1485 wake_up_all(&osdc->client->auth_wq); 1486 return; 1487 1488 bad: 1489 pr_err("osdc handle_map corrupt msg\n"); 1490 ceph_msg_dump(msg); 1491 up_write(&osdc->map_sem); 1492 return; 1493 } 1494 1495 /* 1496 * watch/notify callback event infrastructure 1497 * 1498 * These callbacks are used both for watch and notify operations. 1499 */ 1500 static void __release_event(struct kref *kref) 1501 { 1502 struct ceph_osd_event *event = 1503 container_of(kref, struct ceph_osd_event, kref); 1504 1505 dout("__release_event %p\n", event); 1506 kfree(event); 1507 } 1508 1509 static void get_event(struct ceph_osd_event *event) 1510 { 1511 kref_get(&event->kref); 1512 } 1513 1514 void ceph_osdc_put_event(struct ceph_osd_event *event) 1515 { 1516 kref_put(&event->kref, __release_event); 1517 } 1518 EXPORT_SYMBOL(ceph_osdc_put_event); 1519 1520 static void __insert_event(struct ceph_osd_client *osdc, 1521 struct ceph_osd_event *new) 1522 { 1523 struct rb_node **p = &osdc->event_tree.rb_node; 1524 struct rb_node *parent = NULL; 1525 struct ceph_osd_event *event = NULL; 1526 1527 while (*p) { 1528 parent = *p; 1529 event = rb_entry(parent, struct ceph_osd_event, node); 1530 if (new->cookie < event->cookie) 1531 p = &(*p)->rb_left; 1532 else if (new->cookie > event->cookie) 1533 p = &(*p)->rb_right; 1534 else 1535 BUG(); 1536 } 1537 1538 rb_link_node(&new->node, parent, p); 1539 rb_insert_color(&new->node, &osdc->event_tree); 1540 } 1541 1542 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1543 u64 cookie) 1544 { 1545 struct rb_node **p = &osdc->event_tree.rb_node; 1546 struct rb_node *parent = NULL; 1547 struct ceph_osd_event *event = NULL; 1548 1549 while (*p) { 1550 parent = *p; 1551 event = rb_entry(parent, struct ceph_osd_event, node); 1552 if (cookie < event->cookie) 1553 p = &(*p)->rb_left; 1554 else if (cookie > event->cookie) 1555 p = &(*p)->rb_right; 1556 else 1557 return event; 1558 } 1559 return NULL; 1560 } 1561 1562 static void __remove_event(struct ceph_osd_event *event) 1563 { 1564 struct ceph_osd_client *osdc = event->osdc; 1565 1566 if (!RB_EMPTY_NODE(&event->node)) { 1567 dout("__remove_event removed %p\n", event); 1568 rb_erase(&event->node, &osdc->event_tree); 1569 ceph_osdc_put_event(event); 1570 } else { 1571 dout("__remove_event didn't remove %p\n", event); 1572 } 1573 } 1574 1575 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1576 void (*event_cb)(u64, u64, u8, void *), 1577 int one_shot, void *data, 1578 struct ceph_osd_event **pevent) 1579 { 1580 struct ceph_osd_event *event; 1581 1582 event = kmalloc(sizeof(*event), GFP_NOIO); 1583 if (!event) 1584 return -ENOMEM; 1585 1586 dout("create_event %p\n", event); 1587 event->cb = event_cb; 1588 event->one_shot = one_shot; 1589 event->data = data; 1590 event->osdc = osdc; 1591 INIT_LIST_HEAD(&event->osd_node); 1592 kref_init(&event->kref); /* one ref for us */ 1593 kref_get(&event->kref); /* one ref for the caller */ 1594 init_completion(&event->completion); 1595 1596 spin_lock(&osdc->event_lock); 1597 event->cookie = ++osdc->event_count; 1598 __insert_event(osdc, event); 1599 spin_unlock(&osdc->event_lock); 1600 1601 *pevent = event; 1602 return 0; 1603 } 1604 EXPORT_SYMBOL(ceph_osdc_create_event); 1605 1606 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1607 { 1608 struct ceph_osd_client *osdc = event->osdc; 1609 1610 dout("cancel_event %p\n", event); 1611 spin_lock(&osdc->event_lock); 1612 __remove_event(event); 1613 spin_unlock(&osdc->event_lock); 1614 ceph_osdc_put_event(event); /* caller's */ 1615 } 1616 EXPORT_SYMBOL(ceph_osdc_cancel_event); 1617 1618 1619 static void do_event_work(struct work_struct *work) 1620 { 1621 struct ceph_osd_event_work *event_work = 1622 container_of(work, struct ceph_osd_event_work, work); 1623 struct ceph_osd_event *event = event_work->event; 1624 u64 ver = event_work->ver; 1625 u64 notify_id = event_work->notify_id; 1626 u8 opcode = event_work->opcode; 1627 1628 dout("do_event_work completing %p\n", event); 1629 event->cb(ver, notify_id, opcode, event->data); 1630 complete(&event->completion); 1631 dout("do_event_work completed %p\n", event); 1632 ceph_osdc_put_event(event); 1633 kfree(event_work); 1634 } 1635 1636 1637 /* 1638 * Process osd watch notifications 1639 */ 1640 void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1641 { 1642 void *p, *end; 1643 u8 proto_ver; 1644 u64 cookie, ver, notify_id; 1645 u8 opcode; 1646 struct ceph_osd_event *event; 1647 struct ceph_osd_event_work *event_work; 1648 1649 p = msg->front.iov_base; 1650 end = p + msg->front.iov_len; 1651 1652 ceph_decode_8_safe(&p, end, proto_ver, bad); 1653 ceph_decode_8_safe(&p, end, opcode, bad); 1654 ceph_decode_64_safe(&p, end, cookie, bad); 1655 ceph_decode_64_safe(&p, end, ver, bad); 1656 ceph_decode_64_safe(&p, end, notify_id, bad); 1657 1658 spin_lock(&osdc->event_lock); 1659 event = __find_event(osdc, cookie); 1660 if (event) { 1661 get_event(event); 1662 if (event->one_shot) 1663 __remove_event(event); 1664 } 1665 spin_unlock(&osdc->event_lock); 1666 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1667 cookie, ver, event); 1668 if (event) { 1669 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1670 if (!event_work) { 1671 dout("ERROR: could not allocate event_work\n"); 1672 goto done_err; 1673 } 1674 INIT_WORK(&event_work->work, do_event_work); 1675 event_work->event = event; 1676 event_work->ver = ver; 1677 event_work->notify_id = notify_id; 1678 event_work->opcode = opcode; 1679 if (!queue_work(osdc->notify_wq, &event_work->work)) { 1680 dout("WARNING: failed to queue notify event work\n"); 1681 goto done_err; 1682 } 1683 } 1684 1685 return; 1686 1687 done_err: 1688 complete(&event->completion); 1689 ceph_osdc_put_event(event); 1690 return; 1691 1692 bad: 1693 pr_err("osdc handle_watch_notify corrupt msg\n"); 1694 return; 1695 } 1696 1697 int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) 1698 { 1699 int err; 1700 1701 dout("wait_event %p\n", event); 1702 err = wait_for_completion_interruptible_timeout(&event->completion, 1703 timeout * HZ); 1704 ceph_osdc_put_event(event); 1705 if (err > 0) 1706 err = 0; 1707 dout("wait_event %p returns %d\n", event, err); 1708 return err; 1709 } 1710 EXPORT_SYMBOL(ceph_osdc_wait_event); 1711 1712 /* 1713 * Register request, send initial attempt. 1714 */ 1715 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1716 struct ceph_osd_request *req, 1717 bool nofail) 1718 { 1719 int rc = 0; 1720 1721 req->r_request->pages = req->r_pages; 1722 req->r_request->nr_pages = req->r_num_pages; 1723 #ifdef CONFIG_BLOCK 1724 req->r_request->bio = req->r_bio; 1725 #endif 1726 req->r_request->trail = req->r_trail; 1727 1728 register_request(osdc, req); 1729 1730 down_read(&osdc->map_sem); 1731 mutex_lock(&osdc->request_mutex); 1732 /* 1733 * a racing kick_requests() may have sent the message for us 1734 * while we dropped request_mutex above, so only send now if 1735 * the request still han't been touched yet. 1736 */ 1737 if (req->r_sent == 0) { 1738 rc = __map_request(osdc, req, 0); 1739 if (rc < 0) { 1740 if (nofail) { 1741 dout("osdc_start_request failed map, " 1742 " will retry %lld\n", req->r_tid); 1743 rc = 0; 1744 } 1745 goto out_unlock; 1746 } 1747 if (req->r_osd == NULL) { 1748 dout("send_request %p no up osds in pg\n", req); 1749 ceph_monc_request_next_osdmap(&osdc->client->monc); 1750 } else { 1751 __send_request(osdc, req); 1752 } 1753 rc = 0; 1754 } 1755 1756 out_unlock: 1757 mutex_unlock(&osdc->request_mutex); 1758 up_read(&osdc->map_sem); 1759 return rc; 1760 } 1761 EXPORT_SYMBOL(ceph_osdc_start_request); 1762 1763 /* 1764 * wait for a request to complete 1765 */ 1766 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1767 struct ceph_osd_request *req) 1768 { 1769 int rc; 1770 1771 rc = wait_for_completion_interruptible(&req->r_completion); 1772 if (rc < 0) { 1773 mutex_lock(&osdc->request_mutex); 1774 __cancel_request(req); 1775 __unregister_request(osdc, req); 1776 mutex_unlock(&osdc->request_mutex); 1777 complete_request(req); 1778 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1779 return rc; 1780 } 1781 1782 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1783 return req->r_result; 1784 } 1785 EXPORT_SYMBOL(ceph_osdc_wait_request); 1786 1787 /* 1788 * sync - wait for all in-flight requests to flush. avoid starvation. 1789 */ 1790 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1791 { 1792 struct ceph_osd_request *req; 1793 u64 last_tid, next_tid = 0; 1794 1795 mutex_lock(&osdc->request_mutex); 1796 last_tid = osdc->last_tid; 1797 while (1) { 1798 req = __lookup_request_ge(osdc, next_tid); 1799 if (!req) 1800 break; 1801 if (req->r_tid > last_tid) 1802 break; 1803 1804 next_tid = req->r_tid + 1; 1805 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1806 continue; 1807 1808 ceph_osdc_get_request(req); 1809 mutex_unlock(&osdc->request_mutex); 1810 dout("sync waiting on tid %llu (last is %llu)\n", 1811 req->r_tid, last_tid); 1812 wait_for_completion(&req->r_safe_completion); 1813 mutex_lock(&osdc->request_mutex); 1814 ceph_osdc_put_request(req); 1815 } 1816 mutex_unlock(&osdc->request_mutex); 1817 dout("sync done (thru tid %llu)\n", last_tid); 1818 } 1819 EXPORT_SYMBOL(ceph_osdc_sync); 1820 1821 /* 1822 * init, shutdown 1823 */ 1824 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1825 { 1826 int err; 1827 1828 dout("init\n"); 1829 osdc->client = client; 1830 osdc->osdmap = NULL; 1831 init_rwsem(&osdc->map_sem); 1832 init_completion(&osdc->map_waiters); 1833 osdc->last_requested_map = 0; 1834 mutex_init(&osdc->request_mutex); 1835 osdc->last_tid = 0; 1836 osdc->osds = RB_ROOT; 1837 INIT_LIST_HEAD(&osdc->osd_lru); 1838 osdc->requests = RB_ROOT; 1839 INIT_LIST_HEAD(&osdc->req_lru); 1840 INIT_LIST_HEAD(&osdc->req_unsent); 1841 INIT_LIST_HEAD(&osdc->req_notarget); 1842 INIT_LIST_HEAD(&osdc->req_linger); 1843 osdc->num_requests = 0; 1844 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1845 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1846 spin_lock_init(&osdc->event_lock); 1847 osdc->event_tree = RB_ROOT; 1848 osdc->event_count = 0; 1849 1850 schedule_delayed_work(&osdc->osds_timeout_work, 1851 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1852 1853 err = -ENOMEM; 1854 osdc->req_mempool = mempool_create_kmalloc_pool(10, 1855 sizeof(struct ceph_osd_request)); 1856 if (!osdc->req_mempool) 1857 goto out; 1858 1859 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 1860 OSD_OP_FRONT_LEN, 10, true, 1861 "osd_op"); 1862 if (err < 0) 1863 goto out_mempool; 1864 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 1865 OSD_OPREPLY_FRONT_LEN, 10, true, 1866 "osd_op_reply"); 1867 if (err < 0) 1868 goto out_msgpool; 1869 1870 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 1871 if (IS_ERR(osdc->notify_wq)) { 1872 err = PTR_ERR(osdc->notify_wq); 1873 osdc->notify_wq = NULL; 1874 goto out_msgpool; 1875 } 1876 return 0; 1877 1878 out_msgpool: 1879 ceph_msgpool_destroy(&osdc->msgpool_op); 1880 out_mempool: 1881 mempool_destroy(osdc->req_mempool); 1882 out: 1883 return err; 1884 } 1885 EXPORT_SYMBOL(ceph_osdc_init); 1886 1887 void ceph_osdc_stop(struct ceph_osd_client *osdc) 1888 { 1889 flush_workqueue(osdc->notify_wq); 1890 destroy_workqueue(osdc->notify_wq); 1891 cancel_delayed_work_sync(&osdc->timeout_work); 1892 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1893 if (osdc->osdmap) { 1894 ceph_osdmap_destroy(osdc->osdmap); 1895 osdc->osdmap = NULL; 1896 } 1897 remove_all_osds(osdc); 1898 mempool_destroy(osdc->req_mempool); 1899 ceph_msgpool_destroy(&osdc->msgpool_op); 1900 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1901 } 1902 EXPORT_SYMBOL(ceph_osdc_stop); 1903 1904 /* 1905 * Read some contiguous pages. If we cross a stripe boundary, shorten 1906 * *plen. Return number of bytes read, or error. 1907 */ 1908 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 1909 struct ceph_vino vino, struct ceph_file_layout *layout, 1910 u64 off, u64 *plen, 1911 u32 truncate_seq, u64 truncate_size, 1912 struct page **pages, int num_pages, int page_align) 1913 { 1914 struct ceph_osd_request *req; 1915 int rc = 0; 1916 1917 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 1918 vino.snap, off, *plen); 1919 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1920 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1921 NULL, 0, truncate_seq, truncate_size, NULL, 1922 false, 1, page_align); 1923 if (!req) 1924 return -ENOMEM; 1925 1926 /* it may be a short read due to an object boundary */ 1927 req->r_pages = pages; 1928 1929 dout("readpages final extent is %llu~%llu (%d pages align %d)\n", 1930 off, *plen, req->r_num_pages, page_align); 1931 1932 rc = ceph_osdc_start_request(osdc, req, false); 1933 if (!rc) 1934 rc = ceph_osdc_wait_request(osdc, req); 1935 1936 ceph_osdc_put_request(req); 1937 dout("readpages result %d\n", rc); 1938 return rc; 1939 } 1940 EXPORT_SYMBOL(ceph_osdc_readpages); 1941 1942 /* 1943 * do a synchronous write on N pages 1944 */ 1945 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 1946 struct ceph_file_layout *layout, 1947 struct ceph_snap_context *snapc, 1948 u64 off, u64 len, 1949 u32 truncate_seq, u64 truncate_size, 1950 struct timespec *mtime, 1951 struct page **pages, int num_pages, 1952 int flags, int do_sync, bool nofail) 1953 { 1954 struct ceph_osd_request *req; 1955 int rc = 0; 1956 int page_align = off & ~PAGE_MASK; 1957 1958 BUG_ON(vino.snap != CEPH_NOSNAP); 1959 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1960 CEPH_OSD_OP_WRITE, 1961 flags | CEPH_OSD_FLAG_ONDISK | 1962 CEPH_OSD_FLAG_WRITE, 1963 snapc, do_sync, 1964 truncate_seq, truncate_size, mtime, 1965 nofail, 1, page_align); 1966 if (!req) 1967 return -ENOMEM; 1968 1969 /* it may be a short write due to an object boundary */ 1970 req->r_pages = pages; 1971 dout("writepages %llu~%llu (%d pages)\n", off, len, 1972 req->r_num_pages); 1973 1974 rc = ceph_osdc_start_request(osdc, req, nofail); 1975 if (!rc) 1976 rc = ceph_osdc_wait_request(osdc, req); 1977 1978 ceph_osdc_put_request(req); 1979 if (rc == 0) 1980 rc = len; 1981 dout("writepages result %d\n", rc); 1982 return rc; 1983 } 1984 EXPORT_SYMBOL(ceph_osdc_writepages); 1985 1986 /* 1987 * handle incoming message 1988 */ 1989 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1990 { 1991 struct ceph_osd *osd = con->private; 1992 struct ceph_osd_client *osdc; 1993 int type = le16_to_cpu(msg->hdr.type); 1994 1995 if (!osd) 1996 goto out; 1997 osdc = osd->o_osdc; 1998 1999 switch (type) { 2000 case CEPH_MSG_OSD_MAP: 2001 ceph_osdc_handle_map(osdc, msg); 2002 break; 2003 case CEPH_MSG_OSD_OPREPLY: 2004 handle_reply(osdc, msg, con); 2005 break; 2006 case CEPH_MSG_WATCH_NOTIFY: 2007 handle_watch_notify(osdc, msg); 2008 break; 2009 2010 default: 2011 pr_err("received unknown message type %d %s\n", type, 2012 ceph_msg_type_name(type)); 2013 } 2014 out: 2015 ceph_msg_put(msg); 2016 } 2017 2018 /* 2019 * lookup and return message for incoming reply. set up reply message 2020 * pages. 2021 */ 2022 static struct ceph_msg *get_reply(struct ceph_connection *con, 2023 struct ceph_msg_header *hdr, 2024 int *skip) 2025 { 2026 struct ceph_osd *osd = con->private; 2027 struct ceph_osd_client *osdc = osd->o_osdc; 2028 struct ceph_msg *m; 2029 struct ceph_osd_request *req; 2030 int front = le32_to_cpu(hdr->front_len); 2031 int data_len = le32_to_cpu(hdr->data_len); 2032 u64 tid; 2033 2034 tid = le64_to_cpu(hdr->tid); 2035 mutex_lock(&osdc->request_mutex); 2036 req = __lookup_request(osdc, tid); 2037 if (!req) { 2038 *skip = 1; 2039 m = NULL; 2040 dout("get_reply unknown tid %llu from osd%d\n", tid, 2041 osd->o_osd); 2042 goto out; 2043 } 2044 2045 if (req->r_con_filling_msg) { 2046 dout("%s revoking msg %p from old con %p\n", __func__, 2047 req->r_reply, req->r_con_filling_msg); 2048 ceph_msg_revoke_incoming(req->r_reply); 2049 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 2050 req->r_con_filling_msg = NULL; 2051 } 2052 2053 if (front > req->r_reply->front.iov_len) { 2054 pr_warning("get_reply front %d > preallocated %d\n", 2055 front, (int)req->r_reply->front.iov_len); 2056 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2057 if (!m) 2058 goto out; 2059 ceph_msg_put(req->r_reply); 2060 req->r_reply = m; 2061 } 2062 m = ceph_msg_get(req->r_reply); 2063 2064 if (data_len > 0) { 2065 int want = calc_pages_for(req->r_page_alignment, data_len); 2066 2067 if (unlikely(req->r_num_pages < want)) { 2068 pr_warning("tid %lld reply has %d bytes %d pages, we" 2069 " had only %d pages ready\n", tid, data_len, 2070 want, req->r_num_pages); 2071 *skip = 1; 2072 ceph_msg_put(m); 2073 m = NULL; 2074 goto out; 2075 } 2076 m->pages = req->r_pages; 2077 m->nr_pages = req->r_num_pages; 2078 m->page_alignment = req->r_page_alignment; 2079 #ifdef CONFIG_BLOCK 2080 m->bio = req->r_bio; 2081 #endif 2082 } 2083 *skip = 0; 2084 req->r_con_filling_msg = con->ops->get(con); 2085 dout("get_reply tid %lld %p\n", tid, m); 2086 2087 out: 2088 mutex_unlock(&osdc->request_mutex); 2089 return m; 2090 2091 } 2092 2093 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2094 struct ceph_msg_header *hdr, 2095 int *skip) 2096 { 2097 struct ceph_osd *osd = con->private; 2098 int type = le16_to_cpu(hdr->type); 2099 int front = le32_to_cpu(hdr->front_len); 2100 2101 *skip = 0; 2102 switch (type) { 2103 case CEPH_MSG_OSD_MAP: 2104 case CEPH_MSG_WATCH_NOTIFY: 2105 return ceph_msg_new(type, front, GFP_NOFS, false); 2106 case CEPH_MSG_OSD_OPREPLY: 2107 return get_reply(con, hdr, skip); 2108 default: 2109 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2110 osd->o_osd); 2111 *skip = 1; 2112 return NULL; 2113 } 2114 } 2115 2116 /* 2117 * Wrappers to refcount containing ceph_osd struct 2118 */ 2119 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2120 { 2121 struct ceph_osd *osd = con->private; 2122 if (get_osd(osd)) 2123 return con; 2124 return NULL; 2125 } 2126 2127 static void put_osd_con(struct ceph_connection *con) 2128 { 2129 struct ceph_osd *osd = con->private; 2130 put_osd(osd); 2131 } 2132 2133 /* 2134 * authentication 2135 */ 2136 /* 2137 * Note: returned pointer is the address of a structure that's 2138 * managed separately. Caller must *not* attempt to free it. 2139 */ 2140 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2141 int *proto, int force_new) 2142 { 2143 struct ceph_osd *o = con->private; 2144 struct ceph_osd_client *osdc = o->o_osdc; 2145 struct ceph_auth_client *ac = osdc->client->monc.auth; 2146 struct ceph_auth_handshake *auth = &o->o_auth; 2147 2148 if (force_new && auth->authorizer) { 2149 if (ac->ops && ac->ops->destroy_authorizer) 2150 ac->ops->destroy_authorizer(ac, auth->authorizer); 2151 auth->authorizer = NULL; 2152 } 2153 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { 2154 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2155 auth); 2156 if (ret) 2157 return ERR_PTR(ret); 2158 } 2159 *proto = ac->protocol; 2160 2161 return auth; 2162 } 2163 2164 2165 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2166 { 2167 struct ceph_osd *o = con->private; 2168 struct ceph_osd_client *osdc = o->o_osdc; 2169 struct ceph_auth_client *ac = osdc->client->monc.auth; 2170 2171 /* 2172 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null, 2173 * XXX which do we do: succeed or fail? 2174 */ 2175 return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2176 } 2177 2178 static int invalidate_authorizer(struct ceph_connection *con) 2179 { 2180 struct ceph_osd *o = con->private; 2181 struct ceph_osd_client *osdc = o->o_osdc; 2182 struct ceph_auth_client *ac = osdc->client->monc.auth; 2183 2184 if (ac->ops && ac->ops->invalidate_authorizer) 2185 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2186 2187 return ceph_monc_validate_auth(&osdc->client->monc); 2188 } 2189 2190 static const struct ceph_connection_operations osd_con_ops = { 2191 .get = get_osd_con, 2192 .put = put_osd_con, 2193 .dispatch = dispatch, 2194 .get_authorizer = get_authorizer, 2195 .verify_authorizer_reply = verify_authorizer_reply, 2196 .invalidate_authorizer = invalidate_authorizer, 2197 .alloc_msg = alloc_msg, 2198 .fault = osd_reset, 2199 }; 2200