1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 26 static void send_queued(struct ceph_osd_client *osdc); 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 static void __register_request(struct ceph_osd_client *osdc, 29 struct ceph_osd_request *req); 30 static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __send_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 35 static int op_needs_trail(int op) 36 { 37 switch (op) { 38 case CEPH_OSD_OP_GETXATTR: 39 case CEPH_OSD_OP_SETXATTR: 40 case CEPH_OSD_OP_CMPXATTR: 41 case CEPH_OSD_OP_CALL: 42 case CEPH_OSD_OP_NOTIFY: 43 return 1; 44 default: 45 return 0; 46 } 47 } 48 49 static int op_has_extent(int op) 50 { 51 return (op == CEPH_OSD_OP_READ || 52 op == CEPH_OSD_OP_WRITE); 53 } 54 55 int ceph_calc_raw_layout(struct ceph_osd_client *osdc, 56 struct ceph_file_layout *layout, 57 u64 snapid, 58 u64 off, u64 *plen, u64 *bno, 59 struct ceph_osd_request *req, 60 struct ceph_osd_req_op *op) 61 { 62 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 63 u64 orig_len = *plen; 64 u64 objoff, objlen; /* extent in object */ 65 int r; 66 67 reqhead->snapid = cpu_to_le64(snapid); 68 69 /* object extent? */ 70 r = ceph_calc_file_object_mapping(layout, off, plen, bno, 71 &objoff, &objlen); 72 if (r < 0) 73 return r; 74 if (*plen < orig_len) 75 dout(" skipping last %llu, final file extent %llu~%llu\n", 76 orig_len - *plen, off, *plen); 77 78 if (op_has_extent(op->op)) { 79 op->extent.offset = objoff; 80 op->extent.length = objlen; 81 } 82 req->r_num_pages = calc_pages_for(off, *plen); 83 req->r_page_alignment = off & ~PAGE_MASK; 84 if (op->op == CEPH_OSD_OP_WRITE) 85 op->payload_len = *plen; 86 87 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 88 *bno, objoff, objlen, req->r_num_pages); 89 return 0; 90 } 91 EXPORT_SYMBOL(ceph_calc_raw_layout); 92 93 /* 94 * Implement client access to distributed object storage cluster. 95 * 96 * All data objects are stored within a cluster/cloud of OSDs, or 97 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 98 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 99 * remote daemons serving up and coordinating consistent and safe 100 * access to storage. 101 * 102 * Cluster membership and the mapping of data objects onto storage devices 103 * are described by the osd map. 104 * 105 * We keep track of pending OSD requests (read, write), resubmit 106 * requests to different OSDs when the cluster topology/data layout 107 * change, or retry the affected requests when the communications 108 * channel with an OSD is reset. 109 */ 110 111 /* 112 * calculate the mapping of a file extent onto an object, and fill out the 113 * request accordingly. shorten extent as necessary if it crosses an 114 * object boundary. 115 * 116 * fill osd op in request message. 117 */ 118 static int calc_layout(struct ceph_osd_client *osdc, 119 struct ceph_vino vino, 120 struct ceph_file_layout *layout, 121 u64 off, u64 *plen, 122 struct ceph_osd_request *req, 123 struct ceph_osd_req_op *op) 124 { 125 u64 bno; 126 int r; 127 128 r = ceph_calc_raw_layout(osdc, layout, vino.snap, off, 129 plen, &bno, req, op); 130 if (r < 0) 131 return r; 132 133 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 134 req->r_oid_len = strlen(req->r_oid); 135 136 return r; 137 } 138 139 /* 140 * requests 141 */ 142 void ceph_osdc_release_request(struct kref *kref) 143 { 144 struct ceph_osd_request *req = container_of(kref, 145 struct ceph_osd_request, 146 r_kref); 147 148 if (req->r_request) 149 ceph_msg_put(req->r_request); 150 if (req->r_con_filling_msg) { 151 dout("%s revoking pages %p from con %p\n", __func__, 152 req->r_pages, req->r_con_filling_msg); 153 ceph_msg_revoke_incoming(req->r_reply); 154 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 155 } 156 if (req->r_reply) 157 ceph_msg_put(req->r_reply); 158 if (req->r_own_pages) 159 ceph_release_page_vector(req->r_pages, 160 req->r_num_pages); 161 #ifdef CONFIG_BLOCK 162 if (req->r_bio) 163 bio_put(req->r_bio); 164 #endif 165 ceph_put_snap_context(req->r_snapc); 166 if (req->r_trail) { 167 ceph_pagelist_release(req->r_trail); 168 kfree(req->r_trail); 169 } 170 if (req->r_mempool) 171 mempool_free(req, req->r_osdc->req_mempool); 172 else 173 kfree(req); 174 } 175 EXPORT_SYMBOL(ceph_osdc_release_request); 176 177 static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail) 178 { 179 int i = 0; 180 181 if (needs_trail) 182 *needs_trail = 0; 183 while (ops[i].op) { 184 if (needs_trail && op_needs_trail(ops[i].op)) 185 *needs_trail = 1; 186 i++; 187 } 188 189 return i; 190 } 191 192 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 193 int flags, 194 struct ceph_snap_context *snapc, 195 struct ceph_osd_req_op *ops, 196 bool use_mempool, 197 gfp_t gfp_flags, 198 struct page **pages, 199 struct bio *bio) 200 { 201 struct ceph_osd_request *req; 202 struct ceph_msg *msg; 203 int needs_trail; 204 int num_op = get_num_ops(ops, &needs_trail); 205 size_t msg_size = sizeof(struct ceph_osd_request_head); 206 207 msg_size += num_op*sizeof(struct ceph_osd_op); 208 209 if (use_mempool) { 210 req = mempool_alloc(osdc->req_mempool, gfp_flags); 211 memset(req, 0, sizeof(*req)); 212 } else { 213 req = kzalloc(sizeof(*req), gfp_flags); 214 } 215 if (req == NULL) 216 return NULL; 217 218 req->r_osdc = osdc; 219 req->r_mempool = use_mempool; 220 221 kref_init(&req->r_kref); 222 init_completion(&req->r_completion); 223 init_completion(&req->r_safe_completion); 224 RB_CLEAR_NODE(&req->r_node); 225 INIT_LIST_HEAD(&req->r_unsafe_item); 226 INIT_LIST_HEAD(&req->r_linger_item); 227 INIT_LIST_HEAD(&req->r_linger_osd); 228 INIT_LIST_HEAD(&req->r_req_lru_item); 229 INIT_LIST_HEAD(&req->r_osd_item); 230 231 req->r_flags = flags; 232 233 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0); 234 235 /* create reply message */ 236 if (use_mempool) 237 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 238 else 239 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 240 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 241 if (!msg) { 242 ceph_osdc_put_request(req); 243 return NULL; 244 } 245 req->r_reply = msg; 246 247 /* allocate space for the trailing data */ 248 if (needs_trail) { 249 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags); 250 if (!req->r_trail) { 251 ceph_osdc_put_request(req); 252 return NULL; 253 } 254 ceph_pagelist_init(req->r_trail); 255 } 256 257 /* create request message; allow space for oid */ 258 msg_size += MAX_OBJ_NAME_SIZE; 259 if (snapc) 260 msg_size += sizeof(u64) * snapc->num_snaps; 261 if (use_mempool) 262 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 263 else 264 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 265 if (!msg) { 266 ceph_osdc_put_request(req); 267 return NULL; 268 } 269 270 memset(msg->front.iov_base, 0, msg->front.iov_len); 271 272 req->r_request = msg; 273 req->r_pages = pages; 274 #ifdef CONFIG_BLOCK 275 if (bio) { 276 req->r_bio = bio; 277 bio_get(req->r_bio); 278 } 279 #endif 280 281 return req; 282 } 283 EXPORT_SYMBOL(ceph_osdc_alloc_request); 284 285 static void osd_req_encode_op(struct ceph_osd_request *req, 286 struct ceph_osd_op *dst, 287 struct ceph_osd_req_op *src) 288 { 289 dst->op = cpu_to_le16(src->op); 290 291 switch (src->op) { 292 case CEPH_OSD_OP_READ: 293 case CEPH_OSD_OP_WRITE: 294 dst->extent.offset = 295 cpu_to_le64(src->extent.offset); 296 dst->extent.length = 297 cpu_to_le64(src->extent.length); 298 dst->extent.truncate_size = 299 cpu_to_le64(src->extent.truncate_size); 300 dst->extent.truncate_seq = 301 cpu_to_le32(src->extent.truncate_seq); 302 break; 303 304 case CEPH_OSD_OP_GETXATTR: 305 case CEPH_OSD_OP_SETXATTR: 306 case CEPH_OSD_OP_CMPXATTR: 307 BUG_ON(!req->r_trail); 308 309 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 310 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 311 dst->xattr.cmp_op = src->xattr.cmp_op; 312 dst->xattr.cmp_mode = src->xattr.cmp_mode; 313 ceph_pagelist_append(req->r_trail, src->xattr.name, 314 src->xattr.name_len); 315 ceph_pagelist_append(req->r_trail, src->xattr.val, 316 src->xattr.value_len); 317 break; 318 case CEPH_OSD_OP_CALL: 319 BUG_ON(!req->r_trail); 320 321 dst->cls.class_len = src->cls.class_len; 322 dst->cls.method_len = src->cls.method_len; 323 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 324 325 ceph_pagelist_append(req->r_trail, src->cls.class_name, 326 src->cls.class_len); 327 ceph_pagelist_append(req->r_trail, src->cls.method_name, 328 src->cls.method_len); 329 ceph_pagelist_append(req->r_trail, src->cls.indata, 330 src->cls.indata_len); 331 break; 332 case CEPH_OSD_OP_ROLLBACK: 333 dst->snap.snapid = cpu_to_le64(src->snap.snapid); 334 break; 335 case CEPH_OSD_OP_STARTSYNC: 336 break; 337 case CEPH_OSD_OP_NOTIFY: 338 { 339 __le32 prot_ver = cpu_to_le32(src->watch.prot_ver); 340 __le32 timeout = cpu_to_le32(src->watch.timeout); 341 342 BUG_ON(!req->r_trail); 343 344 ceph_pagelist_append(req->r_trail, 345 &prot_ver, sizeof(prot_ver)); 346 ceph_pagelist_append(req->r_trail, 347 &timeout, sizeof(timeout)); 348 } 349 case CEPH_OSD_OP_NOTIFY_ACK: 350 case CEPH_OSD_OP_WATCH: 351 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 352 dst->watch.ver = cpu_to_le64(src->watch.ver); 353 dst->watch.flag = src->watch.flag; 354 break; 355 default: 356 pr_err("unrecognized osd opcode %d\n", dst->op); 357 WARN_ON(1); 358 break; 359 } 360 dst->payload_len = cpu_to_le32(src->payload_len); 361 } 362 363 /* 364 * build new request AND message 365 * 366 */ 367 void ceph_osdc_build_request(struct ceph_osd_request *req, 368 u64 off, u64 *plen, 369 struct ceph_osd_req_op *src_ops, 370 struct ceph_snap_context *snapc, 371 struct timespec *mtime, 372 const char *oid, 373 int oid_len) 374 { 375 struct ceph_msg *msg = req->r_request; 376 struct ceph_osd_request_head *head; 377 struct ceph_osd_req_op *src_op; 378 struct ceph_osd_op *op; 379 void *p; 380 int num_op = get_num_ops(src_ops, NULL); 381 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 382 int flags = req->r_flags; 383 u64 data_len = 0; 384 int i; 385 386 head = msg->front.iov_base; 387 op = (void *)(head + 1); 388 p = (void *)(op + num_op); 389 390 req->r_snapc = ceph_get_snap_context(snapc); 391 392 head->client_inc = cpu_to_le32(1); /* always, for now. */ 393 head->flags = cpu_to_le32(flags); 394 if (flags & CEPH_OSD_FLAG_WRITE) 395 ceph_encode_timespec(&head->mtime, mtime); 396 head->num_ops = cpu_to_le16(num_op); 397 398 399 /* fill in oid */ 400 head->object_len = cpu_to_le32(oid_len); 401 memcpy(p, oid, oid_len); 402 p += oid_len; 403 404 src_op = src_ops; 405 while (src_op->op) { 406 osd_req_encode_op(req, op, src_op); 407 src_op++; 408 op++; 409 } 410 411 if (req->r_trail) 412 data_len += req->r_trail->length; 413 414 if (snapc) { 415 head->snap_seq = cpu_to_le64(snapc->seq); 416 head->num_snaps = cpu_to_le32(snapc->num_snaps); 417 for (i = 0; i < snapc->num_snaps; i++) { 418 put_unaligned_le64(snapc->snaps[i], p); 419 p += sizeof(u64); 420 } 421 } 422 423 if (flags & CEPH_OSD_FLAG_WRITE) { 424 req->r_request->hdr.data_off = cpu_to_le16(off); 425 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len); 426 } else if (data_len) { 427 req->r_request->hdr.data_off = 0; 428 req->r_request->hdr.data_len = cpu_to_le32(data_len); 429 } 430 431 req->r_request->page_alignment = req->r_page_alignment; 432 433 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 434 msg_size = p - msg->front.iov_base; 435 msg->front.iov_len = msg_size; 436 msg->hdr.front_len = cpu_to_le32(msg_size); 437 return; 438 } 439 EXPORT_SYMBOL(ceph_osdc_build_request); 440 441 /* 442 * build new request AND message, calculate layout, and adjust file 443 * extent as needed. 444 * 445 * if the file was recently truncated, we include information about its 446 * old and new size so that the object can be updated appropriately. (we 447 * avoid synchronously deleting truncated objects because it's slow.) 448 * 449 * if @do_sync, include a 'startsync' command so that the osd will flush 450 * data quickly. 451 */ 452 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 453 struct ceph_file_layout *layout, 454 struct ceph_vino vino, 455 u64 off, u64 *plen, 456 int opcode, int flags, 457 struct ceph_snap_context *snapc, 458 int do_sync, 459 u32 truncate_seq, 460 u64 truncate_size, 461 struct timespec *mtime, 462 bool use_mempool, int num_reply, 463 int page_align) 464 { 465 struct ceph_osd_req_op ops[3]; 466 struct ceph_osd_request *req; 467 int r; 468 469 ops[0].op = opcode; 470 ops[0].extent.truncate_seq = truncate_seq; 471 ops[0].extent.truncate_size = truncate_size; 472 ops[0].payload_len = 0; 473 474 if (do_sync) { 475 ops[1].op = CEPH_OSD_OP_STARTSYNC; 476 ops[1].payload_len = 0; 477 ops[2].op = 0; 478 } else 479 ops[1].op = 0; 480 481 req = ceph_osdc_alloc_request(osdc, flags, 482 snapc, ops, 483 use_mempool, 484 GFP_NOFS, NULL, NULL); 485 if (!req) 486 return ERR_PTR(-ENOMEM); 487 488 /* calculate max write size */ 489 r = calc_layout(osdc, vino, layout, off, plen, req, ops); 490 if (r < 0) 491 return ERR_PTR(r); 492 req->r_file_layout = *layout; /* keep a copy */ 493 494 /* in case it differs from natural (file) alignment that 495 calc_layout filled in for us */ 496 req->r_num_pages = calc_pages_for(page_align, *plen); 497 req->r_page_alignment = page_align; 498 499 ceph_osdc_build_request(req, off, plen, ops, 500 snapc, 501 mtime, 502 req->r_oid, req->r_oid_len); 503 504 return req; 505 } 506 EXPORT_SYMBOL(ceph_osdc_new_request); 507 508 /* 509 * We keep osd requests in an rbtree, sorted by ->r_tid. 510 */ 511 static void __insert_request(struct ceph_osd_client *osdc, 512 struct ceph_osd_request *new) 513 { 514 struct rb_node **p = &osdc->requests.rb_node; 515 struct rb_node *parent = NULL; 516 struct ceph_osd_request *req = NULL; 517 518 while (*p) { 519 parent = *p; 520 req = rb_entry(parent, struct ceph_osd_request, r_node); 521 if (new->r_tid < req->r_tid) 522 p = &(*p)->rb_left; 523 else if (new->r_tid > req->r_tid) 524 p = &(*p)->rb_right; 525 else 526 BUG(); 527 } 528 529 rb_link_node(&new->r_node, parent, p); 530 rb_insert_color(&new->r_node, &osdc->requests); 531 } 532 533 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 534 u64 tid) 535 { 536 struct ceph_osd_request *req; 537 struct rb_node *n = osdc->requests.rb_node; 538 539 while (n) { 540 req = rb_entry(n, struct ceph_osd_request, r_node); 541 if (tid < req->r_tid) 542 n = n->rb_left; 543 else if (tid > req->r_tid) 544 n = n->rb_right; 545 else 546 return req; 547 } 548 return NULL; 549 } 550 551 static struct ceph_osd_request * 552 __lookup_request_ge(struct ceph_osd_client *osdc, 553 u64 tid) 554 { 555 struct ceph_osd_request *req; 556 struct rb_node *n = osdc->requests.rb_node; 557 558 while (n) { 559 req = rb_entry(n, struct ceph_osd_request, r_node); 560 if (tid < req->r_tid) { 561 if (!n->rb_left) 562 return req; 563 n = n->rb_left; 564 } else if (tid > req->r_tid) { 565 n = n->rb_right; 566 } else { 567 return req; 568 } 569 } 570 return NULL; 571 } 572 573 /* 574 * Resubmit requests pending on the given osd. 575 */ 576 static void __kick_osd_requests(struct ceph_osd_client *osdc, 577 struct ceph_osd *osd) 578 { 579 struct ceph_osd_request *req, *nreq; 580 int err; 581 582 dout("__kick_osd_requests osd%d\n", osd->o_osd); 583 err = __reset_osd(osdc, osd); 584 if (err) 585 return; 586 587 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 588 list_move(&req->r_req_lru_item, &osdc->req_unsent); 589 dout("requeued %p tid %llu osd%d\n", req, req->r_tid, 590 osd->o_osd); 591 if (!req->r_linger) 592 req->r_flags |= CEPH_OSD_FLAG_RETRY; 593 } 594 595 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 596 r_linger_osd) { 597 /* 598 * reregister request prior to unregistering linger so 599 * that r_osd is preserved. 600 */ 601 BUG_ON(!list_empty(&req->r_req_lru_item)); 602 __register_request(osdc, req); 603 list_add(&req->r_req_lru_item, &osdc->req_unsent); 604 list_add(&req->r_osd_item, &req->r_osd->o_requests); 605 __unregister_linger_request(osdc, req); 606 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 607 osd->o_osd); 608 } 609 } 610 611 /* 612 * If the osd connection drops, we need to resubmit all requests. 613 */ 614 static void osd_reset(struct ceph_connection *con) 615 { 616 struct ceph_osd *osd = con->private; 617 struct ceph_osd_client *osdc; 618 619 if (!osd) 620 return; 621 dout("osd_reset osd%d\n", osd->o_osd); 622 osdc = osd->o_osdc; 623 down_read(&osdc->map_sem); 624 mutex_lock(&osdc->request_mutex); 625 __kick_osd_requests(osdc, osd); 626 mutex_unlock(&osdc->request_mutex); 627 send_queued(osdc); 628 up_read(&osdc->map_sem); 629 } 630 631 /* 632 * Track open sessions with osds. 633 */ 634 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 635 { 636 struct ceph_osd *osd; 637 638 osd = kzalloc(sizeof(*osd), GFP_NOFS); 639 if (!osd) 640 return NULL; 641 642 atomic_set(&osd->o_ref, 1); 643 osd->o_osdc = osdc; 644 osd->o_osd = onum; 645 RB_CLEAR_NODE(&osd->o_node); 646 INIT_LIST_HEAD(&osd->o_requests); 647 INIT_LIST_HEAD(&osd->o_linger_requests); 648 INIT_LIST_HEAD(&osd->o_osd_lru); 649 osd->o_incarnation = 1; 650 651 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 652 653 INIT_LIST_HEAD(&osd->o_keepalive_item); 654 return osd; 655 } 656 657 static struct ceph_osd *get_osd(struct ceph_osd *osd) 658 { 659 if (atomic_inc_not_zero(&osd->o_ref)) { 660 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 661 atomic_read(&osd->o_ref)); 662 return osd; 663 } else { 664 dout("get_osd %p FAIL\n", osd); 665 return NULL; 666 } 667 } 668 669 static void put_osd(struct ceph_osd *osd) 670 { 671 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 672 atomic_read(&osd->o_ref) - 1); 673 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 674 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 675 676 if (ac->ops && ac->ops->destroy_authorizer) 677 ac->ops->destroy_authorizer(ac, osd->o_auth.authorizer); 678 kfree(osd); 679 } 680 } 681 682 /* 683 * remove an osd from our map 684 */ 685 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 686 { 687 dout("__remove_osd %p\n", osd); 688 BUG_ON(!list_empty(&osd->o_requests)); 689 rb_erase(&osd->o_node, &osdc->osds); 690 list_del_init(&osd->o_osd_lru); 691 ceph_con_close(&osd->o_con); 692 put_osd(osd); 693 } 694 695 static void remove_all_osds(struct ceph_osd_client *osdc) 696 { 697 dout("%s %p\n", __func__, osdc); 698 mutex_lock(&osdc->request_mutex); 699 while (!RB_EMPTY_ROOT(&osdc->osds)) { 700 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 701 struct ceph_osd, o_node); 702 __remove_osd(osdc, osd); 703 } 704 mutex_unlock(&osdc->request_mutex); 705 } 706 707 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 708 struct ceph_osd *osd) 709 { 710 dout("__move_osd_to_lru %p\n", osd); 711 BUG_ON(!list_empty(&osd->o_osd_lru)); 712 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 713 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 714 } 715 716 static void __remove_osd_from_lru(struct ceph_osd *osd) 717 { 718 dout("__remove_osd_from_lru %p\n", osd); 719 if (!list_empty(&osd->o_osd_lru)) 720 list_del_init(&osd->o_osd_lru); 721 } 722 723 static void remove_old_osds(struct ceph_osd_client *osdc) 724 { 725 struct ceph_osd *osd, *nosd; 726 727 dout("__remove_old_osds %p\n", osdc); 728 mutex_lock(&osdc->request_mutex); 729 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 730 if (time_before(jiffies, osd->lru_ttl)) 731 break; 732 __remove_osd(osdc, osd); 733 } 734 mutex_unlock(&osdc->request_mutex); 735 } 736 737 /* 738 * reset osd connect 739 */ 740 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 741 { 742 struct ceph_osd_request *req; 743 int ret = 0; 744 745 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 746 if (list_empty(&osd->o_requests) && 747 list_empty(&osd->o_linger_requests)) { 748 __remove_osd(osdc, osd); 749 ret = -ENODEV; 750 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd], 751 &osd->o_con.peer_addr, 752 sizeof(osd->o_con.peer_addr)) == 0 && 753 !ceph_con_opened(&osd->o_con)) { 754 dout(" osd addr hasn't changed and connection never opened," 755 " letting msgr retry"); 756 /* touch each r_stamp for handle_timeout()'s benfit */ 757 list_for_each_entry(req, &osd->o_requests, r_osd_item) 758 req->r_stamp = jiffies; 759 ret = -EAGAIN; 760 } else { 761 ceph_con_close(&osd->o_con); 762 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, 763 &osdc->osdmap->osd_addr[osd->o_osd]); 764 osd->o_incarnation++; 765 } 766 return ret; 767 } 768 769 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 770 { 771 struct rb_node **p = &osdc->osds.rb_node; 772 struct rb_node *parent = NULL; 773 struct ceph_osd *osd = NULL; 774 775 dout("__insert_osd %p osd%d\n", new, new->o_osd); 776 while (*p) { 777 parent = *p; 778 osd = rb_entry(parent, struct ceph_osd, o_node); 779 if (new->o_osd < osd->o_osd) 780 p = &(*p)->rb_left; 781 else if (new->o_osd > osd->o_osd) 782 p = &(*p)->rb_right; 783 else 784 BUG(); 785 } 786 787 rb_link_node(&new->o_node, parent, p); 788 rb_insert_color(&new->o_node, &osdc->osds); 789 } 790 791 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 792 { 793 struct ceph_osd *osd; 794 struct rb_node *n = osdc->osds.rb_node; 795 796 while (n) { 797 osd = rb_entry(n, struct ceph_osd, o_node); 798 if (o < osd->o_osd) 799 n = n->rb_left; 800 else if (o > osd->o_osd) 801 n = n->rb_right; 802 else 803 return osd; 804 } 805 return NULL; 806 } 807 808 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 809 { 810 schedule_delayed_work(&osdc->timeout_work, 811 osdc->client->options->osd_keepalive_timeout * HZ); 812 } 813 814 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 815 { 816 cancel_delayed_work(&osdc->timeout_work); 817 } 818 819 /* 820 * Register request, assign tid. If this is the first request, set up 821 * the timeout event. 822 */ 823 static void __register_request(struct ceph_osd_client *osdc, 824 struct ceph_osd_request *req) 825 { 826 req->r_tid = ++osdc->last_tid; 827 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 828 dout("__register_request %p tid %lld\n", req, req->r_tid); 829 __insert_request(osdc, req); 830 ceph_osdc_get_request(req); 831 osdc->num_requests++; 832 if (osdc->num_requests == 1) { 833 dout(" first request, scheduling timeout\n"); 834 __schedule_osd_timeout(osdc); 835 } 836 } 837 838 static void register_request(struct ceph_osd_client *osdc, 839 struct ceph_osd_request *req) 840 { 841 mutex_lock(&osdc->request_mutex); 842 __register_request(osdc, req); 843 mutex_unlock(&osdc->request_mutex); 844 } 845 846 /* 847 * called under osdc->request_mutex 848 */ 849 static void __unregister_request(struct ceph_osd_client *osdc, 850 struct ceph_osd_request *req) 851 { 852 if (RB_EMPTY_NODE(&req->r_node)) { 853 dout("__unregister_request %p tid %lld not registered\n", 854 req, req->r_tid); 855 return; 856 } 857 858 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 859 rb_erase(&req->r_node, &osdc->requests); 860 osdc->num_requests--; 861 862 if (req->r_osd) { 863 /* make sure the original request isn't in flight. */ 864 ceph_msg_revoke(req->r_request); 865 866 list_del_init(&req->r_osd_item); 867 if (list_empty(&req->r_osd->o_requests) && 868 list_empty(&req->r_osd->o_linger_requests)) { 869 dout("moving osd to %p lru\n", req->r_osd); 870 __move_osd_to_lru(osdc, req->r_osd); 871 } 872 if (list_empty(&req->r_linger_item)) 873 req->r_osd = NULL; 874 } 875 876 list_del_init(&req->r_req_lru_item); 877 ceph_osdc_put_request(req); 878 879 if (osdc->num_requests == 0) { 880 dout(" no requests, canceling timeout\n"); 881 __cancel_osd_timeout(osdc); 882 } 883 } 884 885 /* 886 * Cancel a previously queued request message 887 */ 888 static void __cancel_request(struct ceph_osd_request *req) 889 { 890 if (req->r_sent && req->r_osd) { 891 ceph_msg_revoke(req->r_request); 892 req->r_sent = 0; 893 } 894 } 895 896 static void __register_linger_request(struct ceph_osd_client *osdc, 897 struct ceph_osd_request *req) 898 { 899 dout("__register_linger_request %p\n", req); 900 list_add_tail(&req->r_linger_item, &osdc->req_linger); 901 if (req->r_osd) 902 list_add_tail(&req->r_linger_osd, 903 &req->r_osd->o_linger_requests); 904 } 905 906 static void __unregister_linger_request(struct ceph_osd_client *osdc, 907 struct ceph_osd_request *req) 908 { 909 dout("__unregister_linger_request %p\n", req); 910 list_del_init(&req->r_linger_item); 911 if (req->r_osd) { 912 list_del_init(&req->r_linger_osd); 913 914 if (list_empty(&req->r_osd->o_requests) && 915 list_empty(&req->r_osd->o_linger_requests)) { 916 dout("moving osd to %p lru\n", req->r_osd); 917 __move_osd_to_lru(osdc, req->r_osd); 918 } 919 if (list_empty(&req->r_osd_item)) 920 req->r_osd = NULL; 921 } 922 } 923 924 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 925 struct ceph_osd_request *req) 926 { 927 mutex_lock(&osdc->request_mutex); 928 if (req->r_linger) { 929 __unregister_linger_request(osdc, req); 930 ceph_osdc_put_request(req); 931 } 932 mutex_unlock(&osdc->request_mutex); 933 } 934 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 935 936 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 937 struct ceph_osd_request *req) 938 { 939 if (!req->r_linger) { 940 dout("set_request_linger %p\n", req); 941 req->r_linger = 1; 942 /* 943 * caller is now responsible for calling 944 * unregister_linger_request 945 */ 946 ceph_osdc_get_request(req); 947 } 948 } 949 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 950 951 /* 952 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 953 * (as needed), and set the request r_osd appropriately. If there is 954 * no up osd, set r_osd to NULL. Move the request to the appropriate list 955 * (unsent, homeless) or leave on in-flight lru. 956 * 957 * Return 0 if unchanged, 1 if changed, or negative on error. 958 * 959 * Caller should hold map_sem for read and request_mutex. 960 */ 961 static int __map_request(struct ceph_osd_client *osdc, 962 struct ceph_osd_request *req, int force_resend) 963 { 964 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 965 struct ceph_pg pgid; 966 int acting[CEPH_PG_MAX_SIZE]; 967 int o = -1, num = 0; 968 int err; 969 970 dout("map_request %p tid %lld\n", req, req->r_tid); 971 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid, 972 &req->r_file_layout, osdc->osdmap); 973 if (err) { 974 list_move(&req->r_req_lru_item, &osdc->req_notarget); 975 return err; 976 } 977 pgid = reqhead->layout.ol_pgid; 978 req->r_pgid = pgid; 979 980 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 981 if (err > 0) { 982 o = acting[0]; 983 num = err; 984 } 985 986 if ((!force_resend && 987 req->r_osd && req->r_osd->o_osd == o && 988 req->r_sent >= req->r_osd->o_incarnation && 989 req->r_num_pg_osds == num && 990 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 991 (req->r_osd == NULL && o == -1)) 992 return 0; /* no change */ 993 994 dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n", 995 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o, 996 req->r_osd ? req->r_osd->o_osd : -1); 997 998 /* record full pg acting set */ 999 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1000 req->r_num_pg_osds = num; 1001 1002 if (req->r_osd) { 1003 __cancel_request(req); 1004 list_del_init(&req->r_osd_item); 1005 req->r_osd = NULL; 1006 } 1007 1008 req->r_osd = __lookup_osd(osdc, o); 1009 if (!req->r_osd && o >= 0) { 1010 err = -ENOMEM; 1011 req->r_osd = create_osd(osdc, o); 1012 if (!req->r_osd) { 1013 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1014 goto out; 1015 } 1016 1017 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1018 __insert_osd(osdc, req->r_osd); 1019 1020 ceph_con_open(&req->r_osd->o_con, 1021 CEPH_ENTITY_TYPE_OSD, o, 1022 &osdc->osdmap->osd_addr[o]); 1023 } 1024 1025 if (req->r_osd) { 1026 __remove_osd_from_lru(req->r_osd); 1027 list_add(&req->r_osd_item, &req->r_osd->o_requests); 1028 list_move(&req->r_req_lru_item, &osdc->req_unsent); 1029 } else { 1030 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1031 } 1032 err = 1; /* osd or pg changed */ 1033 1034 out: 1035 return err; 1036 } 1037 1038 /* 1039 * caller should hold map_sem (for read) and request_mutex 1040 */ 1041 static void __send_request(struct ceph_osd_client *osdc, 1042 struct ceph_osd_request *req) 1043 { 1044 struct ceph_osd_request_head *reqhead; 1045 1046 dout("send_request %p tid %llu to osd%d flags %d\n", 1047 req, req->r_tid, req->r_osd->o_osd, req->r_flags); 1048 1049 reqhead = req->r_request->front.iov_base; 1050 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch); 1051 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */ 1052 reqhead->reassert_version = req->r_reassert_version; 1053 1054 req->r_stamp = jiffies; 1055 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1056 1057 ceph_msg_get(req->r_request); /* send consumes a ref */ 1058 ceph_con_send(&req->r_osd->o_con, req->r_request); 1059 req->r_sent = req->r_osd->o_incarnation; 1060 } 1061 1062 /* 1063 * Send any requests in the queue (req_unsent). 1064 */ 1065 static void send_queued(struct ceph_osd_client *osdc) 1066 { 1067 struct ceph_osd_request *req, *tmp; 1068 1069 dout("send_queued\n"); 1070 mutex_lock(&osdc->request_mutex); 1071 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) { 1072 __send_request(osdc, req); 1073 } 1074 mutex_unlock(&osdc->request_mutex); 1075 } 1076 1077 /* 1078 * Timeout callback, called every N seconds when 1 or more osd 1079 * requests has been active for more than N seconds. When this 1080 * happens, we ping all OSDs with requests who have timed out to 1081 * ensure any communications channel reset is detected. Reset the 1082 * request timeouts another N seconds in the future as we go. 1083 * Reschedule the timeout event another N seconds in future (unless 1084 * there are no open requests). 1085 */ 1086 static void handle_timeout(struct work_struct *work) 1087 { 1088 struct ceph_osd_client *osdc = 1089 container_of(work, struct ceph_osd_client, timeout_work.work); 1090 struct ceph_osd_request *req; 1091 struct ceph_osd *osd; 1092 unsigned long keepalive = 1093 osdc->client->options->osd_keepalive_timeout * HZ; 1094 struct list_head slow_osds; 1095 dout("timeout\n"); 1096 down_read(&osdc->map_sem); 1097 1098 ceph_monc_request_next_osdmap(&osdc->client->monc); 1099 1100 mutex_lock(&osdc->request_mutex); 1101 1102 /* 1103 * ping osds that are a bit slow. this ensures that if there 1104 * is a break in the TCP connection we will notice, and reopen 1105 * a connection with that osd (from the fault callback). 1106 */ 1107 INIT_LIST_HEAD(&slow_osds); 1108 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1109 if (time_before(jiffies, req->r_stamp + keepalive)) 1110 break; 1111 1112 osd = req->r_osd; 1113 BUG_ON(!osd); 1114 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1115 req->r_tid, osd->o_osd); 1116 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1117 } 1118 while (!list_empty(&slow_osds)) { 1119 osd = list_entry(slow_osds.next, struct ceph_osd, 1120 o_keepalive_item); 1121 list_del_init(&osd->o_keepalive_item); 1122 ceph_con_keepalive(&osd->o_con); 1123 } 1124 1125 __schedule_osd_timeout(osdc); 1126 mutex_unlock(&osdc->request_mutex); 1127 send_queued(osdc); 1128 up_read(&osdc->map_sem); 1129 } 1130 1131 static void handle_osds_timeout(struct work_struct *work) 1132 { 1133 struct ceph_osd_client *osdc = 1134 container_of(work, struct ceph_osd_client, 1135 osds_timeout_work.work); 1136 unsigned long delay = 1137 osdc->client->options->osd_idle_ttl * HZ >> 2; 1138 1139 dout("osds timeout\n"); 1140 down_read(&osdc->map_sem); 1141 remove_old_osds(osdc); 1142 up_read(&osdc->map_sem); 1143 1144 schedule_delayed_work(&osdc->osds_timeout_work, 1145 round_jiffies_relative(delay)); 1146 } 1147 1148 static void complete_request(struct ceph_osd_request *req) 1149 { 1150 if (req->r_safe_callback) 1151 req->r_safe_callback(req, NULL); 1152 complete_all(&req->r_safe_completion); /* fsync waiter */ 1153 } 1154 1155 /* 1156 * handle osd op reply. either call the callback if it is specified, 1157 * or do the completion to wake up the waiting thread. 1158 */ 1159 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1160 struct ceph_connection *con) 1161 { 1162 struct ceph_osd_reply_head *rhead = msg->front.iov_base; 1163 struct ceph_osd_request *req; 1164 u64 tid; 1165 int numops, object_len, flags; 1166 s32 result; 1167 1168 tid = le64_to_cpu(msg->hdr.tid); 1169 if (msg->front.iov_len < sizeof(*rhead)) 1170 goto bad; 1171 numops = le32_to_cpu(rhead->num_ops); 1172 object_len = le32_to_cpu(rhead->object_len); 1173 result = le32_to_cpu(rhead->result); 1174 if (msg->front.iov_len != sizeof(*rhead) + object_len + 1175 numops * sizeof(struct ceph_osd_op)) 1176 goto bad; 1177 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result); 1178 /* lookup */ 1179 mutex_lock(&osdc->request_mutex); 1180 req = __lookup_request(osdc, tid); 1181 if (req == NULL) { 1182 dout("handle_reply tid %llu dne\n", tid); 1183 mutex_unlock(&osdc->request_mutex); 1184 return; 1185 } 1186 ceph_osdc_get_request(req); 1187 flags = le32_to_cpu(rhead->flags); 1188 1189 /* 1190 * if this connection filled our message, drop our reference now, to 1191 * avoid a (safe but slower) revoke later. 1192 */ 1193 if (req->r_con_filling_msg == con && req->r_reply == msg) { 1194 dout(" dropping con_filling_msg ref %p\n", con); 1195 req->r_con_filling_msg = NULL; 1196 con->ops->put(con); 1197 } 1198 1199 if (!req->r_got_reply) { 1200 unsigned int bytes; 1201 1202 req->r_result = le32_to_cpu(rhead->result); 1203 bytes = le32_to_cpu(msg->hdr.data_len); 1204 dout("handle_reply result %d bytes %d\n", req->r_result, 1205 bytes); 1206 if (req->r_result == 0) 1207 req->r_result = bytes; 1208 1209 /* in case this is a write and we need to replay, */ 1210 req->r_reassert_version = rhead->reassert_version; 1211 1212 req->r_got_reply = 1; 1213 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1214 dout("handle_reply tid %llu dup ack\n", tid); 1215 mutex_unlock(&osdc->request_mutex); 1216 goto done; 1217 } 1218 1219 dout("handle_reply tid %llu flags %d\n", tid, flags); 1220 1221 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1222 __register_linger_request(osdc, req); 1223 1224 /* either this is a read, or we got the safe response */ 1225 if (result < 0 || 1226 (flags & CEPH_OSD_FLAG_ONDISK) || 1227 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1228 __unregister_request(osdc, req); 1229 1230 mutex_unlock(&osdc->request_mutex); 1231 1232 if (req->r_callback) 1233 req->r_callback(req, msg); 1234 else 1235 complete_all(&req->r_completion); 1236 1237 if (flags & CEPH_OSD_FLAG_ONDISK) 1238 complete_request(req); 1239 1240 done: 1241 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1242 ceph_osdc_put_request(req); 1243 return; 1244 1245 bad: 1246 pr_err("corrupt osd_op_reply got %d %d expected %d\n", 1247 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len), 1248 (int)sizeof(*rhead)); 1249 ceph_msg_dump(msg); 1250 } 1251 1252 static void reset_changed_osds(struct ceph_osd_client *osdc) 1253 { 1254 struct rb_node *p, *n; 1255 1256 for (p = rb_first(&osdc->osds); p; p = n) { 1257 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1258 1259 n = rb_next(p); 1260 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1261 memcmp(&osd->o_con.peer_addr, 1262 ceph_osd_addr(osdc->osdmap, 1263 osd->o_osd), 1264 sizeof(struct ceph_entity_addr)) != 0) 1265 __reset_osd(osdc, osd); 1266 } 1267 } 1268 1269 /* 1270 * Requeue requests whose mapping to an OSD has changed. If requests map to 1271 * no osd, request a new map. 1272 * 1273 * Caller should hold map_sem for read. 1274 */ 1275 static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1276 { 1277 struct ceph_osd_request *req, *nreq; 1278 struct rb_node *p; 1279 int needmap = 0; 1280 int err; 1281 1282 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1283 mutex_lock(&osdc->request_mutex); 1284 for (p = rb_first(&osdc->requests); p; ) { 1285 req = rb_entry(p, struct ceph_osd_request, r_node); 1286 p = rb_next(p); 1287 1288 /* 1289 * For linger requests that have not yet been 1290 * registered, move them to the linger list; they'll 1291 * be sent to the osd in the loop below. Unregister 1292 * the request before re-registering it as a linger 1293 * request to ensure the __map_request() below 1294 * will decide it needs to be sent. 1295 */ 1296 if (req->r_linger && list_empty(&req->r_linger_item)) { 1297 dout("%p tid %llu restart on osd%d\n", 1298 req, req->r_tid, 1299 req->r_osd ? req->r_osd->o_osd : -1); 1300 __unregister_request(osdc, req); 1301 __register_linger_request(osdc, req); 1302 continue; 1303 } 1304 1305 err = __map_request(osdc, req, force_resend); 1306 if (err < 0) 1307 continue; /* error */ 1308 if (req->r_osd == NULL) { 1309 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1310 needmap++; /* request a newer map */ 1311 } else if (err > 0) { 1312 if (!req->r_linger) { 1313 dout("%p tid %llu requeued on osd%d\n", req, 1314 req->r_tid, 1315 req->r_osd ? req->r_osd->o_osd : -1); 1316 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1317 } 1318 } 1319 } 1320 1321 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1322 r_linger_item) { 1323 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1324 1325 err = __map_request(osdc, req, force_resend); 1326 dout("__map_request returned %d\n", err); 1327 if (err == 0) 1328 continue; /* no change and no osd was specified */ 1329 if (err < 0) 1330 continue; /* hrm! */ 1331 if (req->r_osd == NULL) { 1332 dout("tid %llu maps to no valid osd\n", req->r_tid); 1333 needmap++; /* request a newer map */ 1334 continue; 1335 } 1336 1337 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1338 req->r_osd ? req->r_osd->o_osd : -1); 1339 __register_request(osdc, req); 1340 __unregister_linger_request(osdc, req); 1341 } 1342 mutex_unlock(&osdc->request_mutex); 1343 1344 if (needmap) { 1345 dout("%d requests for down osds, need new map\n", needmap); 1346 ceph_monc_request_next_osdmap(&osdc->client->monc); 1347 } 1348 reset_changed_osds(osdc); 1349 } 1350 1351 1352 /* 1353 * Process updated osd map. 1354 * 1355 * The message contains any number of incremental and full maps, normally 1356 * indicating some sort of topology change in the cluster. Kick requests 1357 * off to different OSDs as needed. 1358 */ 1359 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1360 { 1361 void *p, *end, *next; 1362 u32 nr_maps, maplen; 1363 u32 epoch; 1364 struct ceph_osdmap *newmap = NULL, *oldmap; 1365 int err; 1366 struct ceph_fsid fsid; 1367 1368 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1369 p = msg->front.iov_base; 1370 end = p + msg->front.iov_len; 1371 1372 /* verify fsid */ 1373 ceph_decode_need(&p, end, sizeof(fsid), bad); 1374 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1375 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1376 return; 1377 1378 down_write(&osdc->map_sem); 1379 1380 /* incremental maps */ 1381 ceph_decode_32_safe(&p, end, nr_maps, bad); 1382 dout(" %d inc maps\n", nr_maps); 1383 while (nr_maps > 0) { 1384 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1385 epoch = ceph_decode_32(&p); 1386 maplen = ceph_decode_32(&p); 1387 ceph_decode_need(&p, end, maplen, bad); 1388 next = p + maplen; 1389 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1390 dout("applying incremental map %u len %d\n", 1391 epoch, maplen); 1392 newmap = osdmap_apply_incremental(&p, next, 1393 osdc->osdmap, 1394 &osdc->client->msgr); 1395 if (IS_ERR(newmap)) { 1396 err = PTR_ERR(newmap); 1397 goto bad; 1398 } 1399 BUG_ON(!newmap); 1400 if (newmap != osdc->osdmap) { 1401 ceph_osdmap_destroy(osdc->osdmap); 1402 osdc->osdmap = newmap; 1403 } 1404 kick_requests(osdc, 0); 1405 } else { 1406 dout("ignoring incremental map %u len %d\n", 1407 epoch, maplen); 1408 } 1409 p = next; 1410 nr_maps--; 1411 } 1412 if (newmap) 1413 goto done; 1414 1415 /* full maps */ 1416 ceph_decode_32_safe(&p, end, nr_maps, bad); 1417 dout(" %d full maps\n", nr_maps); 1418 while (nr_maps) { 1419 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1420 epoch = ceph_decode_32(&p); 1421 maplen = ceph_decode_32(&p); 1422 ceph_decode_need(&p, end, maplen, bad); 1423 if (nr_maps > 1) { 1424 dout("skipping non-latest full map %u len %d\n", 1425 epoch, maplen); 1426 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1427 dout("skipping full map %u len %d, " 1428 "older than our %u\n", epoch, maplen, 1429 osdc->osdmap->epoch); 1430 } else { 1431 int skipped_map = 0; 1432 1433 dout("taking full map %u len %d\n", epoch, maplen); 1434 newmap = osdmap_decode(&p, p+maplen); 1435 if (IS_ERR(newmap)) { 1436 err = PTR_ERR(newmap); 1437 goto bad; 1438 } 1439 BUG_ON(!newmap); 1440 oldmap = osdc->osdmap; 1441 osdc->osdmap = newmap; 1442 if (oldmap) { 1443 if (oldmap->epoch + 1 < newmap->epoch) 1444 skipped_map = 1; 1445 ceph_osdmap_destroy(oldmap); 1446 } 1447 kick_requests(osdc, skipped_map); 1448 } 1449 p += maplen; 1450 nr_maps--; 1451 } 1452 1453 done: 1454 downgrade_write(&osdc->map_sem); 1455 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1456 1457 /* 1458 * subscribe to subsequent osdmap updates if full to ensure 1459 * we find out when we are no longer full and stop returning 1460 * ENOSPC. 1461 */ 1462 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1463 ceph_monc_request_next_osdmap(&osdc->client->monc); 1464 1465 send_queued(osdc); 1466 up_read(&osdc->map_sem); 1467 wake_up_all(&osdc->client->auth_wq); 1468 return; 1469 1470 bad: 1471 pr_err("osdc handle_map corrupt msg\n"); 1472 ceph_msg_dump(msg); 1473 up_write(&osdc->map_sem); 1474 return; 1475 } 1476 1477 /* 1478 * watch/notify callback event infrastructure 1479 * 1480 * These callbacks are used both for watch and notify operations. 1481 */ 1482 static void __release_event(struct kref *kref) 1483 { 1484 struct ceph_osd_event *event = 1485 container_of(kref, struct ceph_osd_event, kref); 1486 1487 dout("__release_event %p\n", event); 1488 kfree(event); 1489 } 1490 1491 static void get_event(struct ceph_osd_event *event) 1492 { 1493 kref_get(&event->kref); 1494 } 1495 1496 void ceph_osdc_put_event(struct ceph_osd_event *event) 1497 { 1498 kref_put(&event->kref, __release_event); 1499 } 1500 EXPORT_SYMBOL(ceph_osdc_put_event); 1501 1502 static void __insert_event(struct ceph_osd_client *osdc, 1503 struct ceph_osd_event *new) 1504 { 1505 struct rb_node **p = &osdc->event_tree.rb_node; 1506 struct rb_node *parent = NULL; 1507 struct ceph_osd_event *event = NULL; 1508 1509 while (*p) { 1510 parent = *p; 1511 event = rb_entry(parent, struct ceph_osd_event, node); 1512 if (new->cookie < event->cookie) 1513 p = &(*p)->rb_left; 1514 else if (new->cookie > event->cookie) 1515 p = &(*p)->rb_right; 1516 else 1517 BUG(); 1518 } 1519 1520 rb_link_node(&new->node, parent, p); 1521 rb_insert_color(&new->node, &osdc->event_tree); 1522 } 1523 1524 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1525 u64 cookie) 1526 { 1527 struct rb_node **p = &osdc->event_tree.rb_node; 1528 struct rb_node *parent = NULL; 1529 struct ceph_osd_event *event = NULL; 1530 1531 while (*p) { 1532 parent = *p; 1533 event = rb_entry(parent, struct ceph_osd_event, node); 1534 if (cookie < event->cookie) 1535 p = &(*p)->rb_left; 1536 else if (cookie > event->cookie) 1537 p = &(*p)->rb_right; 1538 else 1539 return event; 1540 } 1541 return NULL; 1542 } 1543 1544 static void __remove_event(struct ceph_osd_event *event) 1545 { 1546 struct ceph_osd_client *osdc = event->osdc; 1547 1548 if (!RB_EMPTY_NODE(&event->node)) { 1549 dout("__remove_event removed %p\n", event); 1550 rb_erase(&event->node, &osdc->event_tree); 1551 ceph_osdc_put_event(event); 1552 } else { 1553 dout("__remove_event didn't remove %p\n", event); 1554 } 1555 } 1556 1557 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1558 void (*event_cb)(u64, u64, u8, void *), 1559 int one_shot, void *data, 1560 struct ceph_osd_event **pevent) 1561 { 1562 struct ceph_osd_event *event; 1563 1564 event = kmalloc(sizeof(*event), GFP_NOIO); 1565 if (!event) 1566 return -ENOMEM; 1567 1568 dout("create_event %p\n", event); 1569 event->cb = event_cb; 1570 event->one_shot = one_shot; 1571 event->data = data; 1572 event->osdc = osdc; 1573 INIT_LIST_HEAD(&event->osd_node); 1574 RB_CLEAR_NODE(&event->node); 1575 kref_init(&event->kref); /* one ref for us */ 1576 kref_get(&event->kref); /* one ref for the caller */ 1577 init_completion(&event->completion); 1578 1579 spin_lock(&osdc->event_lock); 1580 event->cookie = ++osdc->event_count; 1581 __insert_event(osdc, event); 1582 spin_unlock(&osdc->event_lock); 1583 1584 *pevent = event; 1585 return 0; 1586 } 1587 EXPORT_SYMBOL(ceph_osdc_create_event); 1588 1589 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1590 { 1591 struct ceph_osd_client *osdc = event->osdc; 1592 1593 dout("cancel_event %p\n", event); 1594 spin_lock(&osdc->event_lock); 1595 __remove_event(event); 1596 spin_unlock(&osdc->event_lock); 1597 ceph_osdc_put_event(event); /* caller's */ 1598 } 1599 EXPORT_SYMBOL(ceph_osdc_cancel_event); 1600 1601 1602 static void do_event_work(struct work_struct *work) 1603 { 1604 struct ceph_osd_event_work *event_work = 1605 container_of(work, struct ceph_osd_event_work, work); 1606 struct ceph_osd_event *event = event_work->event; 1607 u64 ver = event_work->ver; 1608 u64 notify_id = event_work->notify_id; 1609 u8 opcode = event_work->opcode; 1610 1611 dout("do_event_work completing %p\n", event); 1612 event->cb(ver, notify_id, opcode, event->data); 1613 complete(&event->completion); 1614 dout("do_event_work completed %p\n", event); 1615 ceph_osdc_put_event(event); 1616 kfree(event_work); 1617 } 1618 1619 1620 /* 1621 * Process osd watch notifications 1622 */ 1623 void handle_watch_notify(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1624 { 1625 void *p, *end; 1626 u8 proto_ver; 1627 u64 cookie, ver, notify_id; 1628 u8 opcode; 1629 struct ceph_osd_event *event; 1630 struct ceph_osd_event_work *event_work; 1631 1632 p = msg->front.iov_base; 1633 end = p + msg->front.iov_len; 1634 1635 ceph_decode_8_safe(&p, end, proto_ver, bad); 1636 ceph_decode_8_safe(&p, end, opcode, bad); 1637 ceph_decode_64_safe(&p, end, cookie, bad); 1638 ceph_decode_64_safe(&p, end, ver, bad); 1639 ceph_decode_64_safe(&p, end, notify_id, bad); 1640 1641 spin_lock(&osdc->event_lock); 1642 event = __find_event(osdc, cookie); 1643 if (event) { 1644 get_event(event); 1645 if (event->one_shot) 1646 __remove_event(event); 1647 } 1648 spin_unlock(&osdc->event_lock); 1649 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1650 cookie, ver, event); 1651 if (event) { 1652 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1653 if (!event_work) { 1654 dout("ERROR: could not allocate event_work\n"); 1655 goto done_err; 1656 } 1657 INIT_WORK(&event_work->work, do_event_work); 1658 event_work->event = event; 1659 event_work->ver = ver; 1660 event_work->notify_id = notify_id; 1661 event_work->opcode = opcode; 1662 if (!queue_work(osdc->notify_wq, &event_work->work)) { 1663 dout("WARNING: failed to queue notify event work\n"); 1664 goto done_err; 1665 } 1666 } 1667 1668 return; 1669 1670 done_err: 1671 complete(&event->completion); 1672 ceph_osdc_put_event(event); 1673 return; 1674 1675 bad: 1676 pr_err("osdc handle_watch_notify corrupt msg\n"); 1677 return; 1678 } 1679 1680 int ceph_osdc_wait_event(struct ceph_osd_event *event, unsigned long timeout) 1681 { 1682 int err; 1683 1684 dout("wait_event %p\n", event); 1685 err = wait_for_completion_interruptible_timeout(&event->completion, 1686 timeout * HZ); 1687 ceph_osdc_put_event(event); 1688 if (err > 0) 1689 err = 0; 1690 dout("wait_event %p returns %d\n", event, err); 1691 return err; 1692 } 1693 EXPORT_SYMBOL(ceph_osdc_wait_event); 1694 1695 /* 1696 * Register request, send initial attempt. 1697 */ 1698 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1699 struct ceph_osd_request *req, 1700 bool nofail) 1701 { 1702 int rc = 0; 1703 1704 req->r_request->pages = req->r_pages; 1705 req->r_request->nr_pages = req->r_num_pages; 1706 #ifdef CONFIG_BLOCK 1707 req->r_request->bio = req->r_bio; 1708 #endif 1709 req->r_request->trail = req->r_trail; 1710 1711 register_request(osdc, req); 1712 1713 down_read(&osdc->map_sem); 1714 mutex_lock(&osdc->request_mutex); 1715 /* 1716 * a racing kick_requests() may have sent the message for us 1717 * while we dropped request_mutex above, so only send now if 1718 * the request still han't been touched yet. 1719 */ 1720 if (req->r_sent == 0) { 1721 rc = __map_request(osdc, req, 0); 1722 if (rc < 0) { 1723 if (nofail) { 1724 dout("osdc_start_request failed map, " 1725 " will retry %lld\n", req->r_tid); 1726 rc = 0; 1727 } 1728 goto out_unlock; 1729 } 1730 if (req->r_osd == NULL) { 1731 dout("send_request %p no up osds in pg\n", req); 1732 ceph_monc_request_next_osdmap(&osdc->client->monc); 1733 } else { 1734 __send_request(osdc, req); 1735 } 1736 rc = 0; 1737 } 1738 1739 out_unlock: 1740 mutex_unlock(&osdc->request_mutex); 1741 up_read(&osdc->map_sem); 1742 return rc; 1743 } 1744 EXPORT_SYMBOL(ceph_osdc_start_request); 1745 1746 /* 1747 * wait for a request to complete 1748 */ 1749 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1750 struct ceph_osd_request *req) 1751 { 1752 int rc; 1753 1754 rc = wait_for_completion_interruptible(&req->r_completion); 1755 if (rc < 0) { 1756 mutex_lock(&osdc->request_mutex); 1757 __cancel_request(req); 1758 __unregister_request(osdc, req); 1759 mutex_unlock(&osdc->request_mutex); 1760 complete_request(req); 1761 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1762 return rc; 1763 } 1764 1765 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1766 return req->r_result; 1767 } 1768 EXPORT_SYMBOL(ceph_osdc_wait_request); 1769 1770 /* 1771 * sync - wait for all in-flight requests to flush. avoid starvation. 1772 */ 1773 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1774 { 1775 struct ceph_osd_request *req; 1776 u64 last_tid, next_tid = 0; 1777 1778 mutex_lock(&osdc->request_mutex); 1779 last_tid = osdc->last_tid; 1780 while (1) { 1781 req = __lookup_request_ge(osdc, next_tid); 1782 if (!req) 1783 break; 1784 if (req->r_tid > last_tid) 1785 break; 1786 1787 next_tid = req->r_tid + 1; 1788 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1789 continue; 1790 1791 ceph_osdc_get_request(req); 1792 mutex_unlock(&osdc->request_mutex); 1793 dout("sync waiting on tid %llu (last is %llu)\n", 1794 req->r_tid, last_tid); 1795 wait_for_completion(&req->r_safe_completion); 1796 mutex_lock(&osdc->request_mutex); 1797 ceph_osdc_put_request(req); 1798 } 1799 mutex_unlock(&osdc->request_mutex); 1800 dout("sync done (thru tid %llu)\n", last_tid); 1801 } 1802 EXPORT_SYMBOL(ceph_osdc_sync); 1803 1804 /* 1805 * init, shutdown 1806 */ 1807 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1808 { 1809 int err; 1810 1811 dout("init\n"); 1812 osdc->client = client; 1813 osdc->osdmap = NULL; 1814 init_rwsem(&osdc->map_sem); 1815 init_completion(&osdc->map_waiters); 1816 osdc->last_requested_map = 0; 1817 mutex_init(&osdc->request_mutex); 1818 osdc->last_tid = 0; 1819 osdc->osds = RB_ROOT; 1820 INIT_LIST_HEAD(&osdc->osd_lru); 1821 osdc->requests = RB_ROOT; 1822 INIT_LIST_HEAD(&osdc->req_lru); 1823 INIT_LIST_HEAD(&osdc->req_unsent); 1824 INIT_LIST_HEAD(&osdc->req_notarget); 1825 INIT_LIST_HEAD(&osdc->req_linger); 1826 osdc->num_requests = 0; 1827 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1828 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1829 spin_lock_init(&osdc->event_lock); 1830 osdc->event_tree = RB_ROOT; 1831 osdc->event_count = 0; 1832 1833 schedule_delayed_work(&osdc->osds_timeout_work, 1834 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1835 1836 err = -ENOMEM; 1837 osdc->req_mempool = mempool_create_kmalloc_pool(10, 1838 sizeof(struct ceph_osd_request)); 1839 if (!osdc->req_mempool) 1840 goto out; 1841 1842 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 1843 OSD_OP_FRONT_LEN, 10, true, 1844 "osd_op"); 1845 if (err < 0) 1846 goto out_mempool; 1847 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 1848 OSD_OPREPLY_FRONT_LEN, 10, true, 1849 "osd_op_reply"); 1850 if (err < 0) 1851 goto out_msgpool; 1852 1853 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 1854 if (IS_ERR(osdc->notify_wq)) { 1855 err = PTR_ERR(osdc->notify_wq); 1856 osdc->notify_wq = NULL; 1857 goto out_msgpool; 1858 } 1859 return 0; 1860 1861 out_msgpool: 1862 ceph_msgpool_destroy(&osdc->msgpool_op); 1863 out_mempool: 1864 mempool_destroy(osdc->req_mempool); 1865 out: 1866 return err; 1867 } 1868 EXPORT_SYMBOL(ceph_osdc_init); 1869 1870 void ceph_osdc_stop(struct ceph_osd_client *osdc) 1871 { 1872 flush_workqueue(osdc->notify_wq); 1873 destroy_workqueue(osdc->notify_wq); 1874 cancel_delayed_work_sync(&osdc->timeout_work); 1875 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1876 if (osdc->osdmap) { 1877 ceph_osdmap_destroy(osdc->osdmap); 1878 osdc->osdmap = NULL; 1879 } 1880 remove_all_osds(osdc); 1881 mempool_destroy(osdc->req_mempool); 1882 ceph_msgpool_destroy(&osdc->msgpool_op); 1883 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1884 } 1885 EXPORT_SYMBOL(ceph_osdc_stop); 1886 1887 /* 1888 * Read some contiguous pages. If we cross a stripe boundary, shorten 1889 * *plen. Return number of bytes read, or error. 1890 */ 1891 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 1892 struct ceph_vino vino, struct ceph_file_layout *layout, 1893 u64 off, u64 *plen, 1894 u32 truncate_seq, u64 truncate_size, 1895 struct page **pages, int num_pages, int page_align) 1896 { 1897 struct ceph_osd_request *req; 1898 int rc = 0; 1899 1900 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 1901 vino.snap, off, *plen); 1902 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1903 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1904 NULL, 0, truncate_seq, truncate_size, NULL, 1905 false, 1, page_align); 1906 if (IS_ERR(req)) 1907 return PTR_ERR(req); 1908 1909 /* it may be a short read due to an object boundary */ 1910 req->r_pages = pages; 1911 1912 dout("readpages final extent is %llu~%llu (%d pages align %d)\n", 1913 off, *plen, req->r_num_pages, page_align); 1914 1915 rc = ceph_osdc_start_request(osdc, req, false); 1916 if (!rc) 1917 rc = ceph_osdc_wait_request(osdc, req); 1918 1919 ceph_osdc_put_request(req); 1920 dout("readpages result %d\n", rc); 1921 return rc; 1922 } 1923 EXPORT_SYMBOL(ceph_osdc_readpages); 1924 1925 /* 1926 * do a synchronous write on N pages 1927 */ 1928 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 1929 struct ceph_file_layout *layout, 1930 struct ceph_snap_context *snapc, 1931 u64 off, u64 len, 1932 u32 truncate_seq, u64 truncate_size, 1933 struct timespec *mtime, 1934 struct page **pages, int num_pages, 1935 int flags, int do_sync, bool nofail) 1936 { 1937 struct ceph_osd_request *req; 1938 int rc = 0; 1939 int page_align = off & ~PAGE_MASK; 1940 1941 BUG_ON(vino.snap != CEPH_NOSNAP); 1942 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1943 CEPH_OSD_OP_WRITE, 1944 flags | CEPH_OSD_FLAG_ONDISK | 1945 CEPH_OSD_FLAG_WRITE, 1946 snapc, do_sync, 1947 truncate_seq, truncate_size, mtime, 1948 nofail, 1, page_align); 1949 if (IS_ERR(req)) 1950 return PTR_ERR(req); 1951 1952 /* it may be a short write due to an object boundary */ 1953 req->r_pages = pages; 1954 dout("writepages %llu~%llu (%d pages)\n", off, len, 1955 req->r_num_pages); 1956 1957 rc = ceph_osdc_start_request(osdc, req, nofail); 1958 if (!rc) 1959 rc = ceph_osdc_wait_request(osdc, req); 1960 1961 ceph_osdc_put_request(req); 1962 if (rc == 0) 1963 rc = len; 1964 dout("writepages result %d\n", rc); 1965 return rc; 1966 } 1967 EXPORT_SYMBOL(ceph_osdc_writepages); 1968 1969 /* 1970 * handle incoming message 1971 */ 1972 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 1973 { 1974 struct ceph_osd *osd = con->private; 1975 struct ceph_osd_client *osdc; 1976 int type = le16_to_cpu(msg->hdr.type); 1977 1978 if (!osd) 1979 goto out; 1980 osdc = osd->o_osdc; 1981 1982 switch (type) { 1983 case CEPH_MSG_OSD_MAP: 1984 ceph_osdc_handle_map(osdc, msg); 1985 break; 1986 case CEPH_MSG_OSD_OPREPLY: 1987 handle_reply(osdc, msg, con); 1988 break; 1989 case CEPH_MSG_WATCH_NOTIFY: 1990 handle_watch_notify(osdc, msg); 1991 break; 1992 1993 default: 1994 pr_err("received unknown message type %d %s\n", type, 1995 ceph_msg_type_name(type)); 1996 } 1997 out: 1998 ceph_msg_put(msg); 1999 } 2000 2001 /* 2002 * lookup and return message for incoming reply. set up reply message 2003 * pages. 2004 */ 2005 static struct ceph_msg *get_reply(struct ceph_connection *con, 2006 struct ceph_msg_header *hdr, 2007 int *skip) 2008 { 2009 struct ceph_osd *osd = con->private; 2010 struct ceph_osd_client *osdc = osd->o_osdc; 2011 struct ceph_msg *m; 2012 struct ceph_osd_request *req; 2013 int front = le32_to_cpu(hdr->front_len); 2014 int data_len = le32_to_cpu(hdr->data_len); 2015 u64 tid; 2016 2017 tid = le64_to_cpu(hdr->tid); 2018 mutex_lock(&osdc->request_mutex); 2019 req = __lookup_request(osdc, tid); 2020 if (!req) { 2021 *skip = 1; 2022 m = NULL; 2023 dout("get_reply unknown tid %llu from osd%d\n", tid, 2024 osd->o_osd); 2025 goto out; 2026 } 2027 2028 if (req->r_con_filling_msg) { 2029 dout("%s revoking msg %p from old con %p\n", __func__, 2030 req->r_reply, req->r_con_filling_msg); 2031 ceph_msg_revoke_incoming(req->r_reply); 2032 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 2033 req->r_con_filling_msg = NULL; 2034 } 2035 2036 if (front > req->r_reply->front.iov_len) { 2037 pr_warning("get_reply front %d > preallocated %d\n", 2038 front, (int)req->r_reply->front.iov_len); 2039 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2040 if (!m) 2041 goto out; 2042 ceph_msg_put(req->r_reply); 2043 req->r_reply = m; 2044 } 2045 m = ceph_msg_get(req->r_reply); 2046 2047 if (data_len > 0) { 2048 int want = calc_pages_for(req->r_page_alignment, data_len); 2049 2050 if (unlikely(req->r_num_pages < want)) { 2051 pr_warning("tid %lld reply has %d bytes %d pages, we" 2052 " had only %d pages ready\n", tid, data_len, 2053 want, req->r_num_pages); 2054 *skip = 1; 2055 ceph_msg_put(m); 2056 m = NULL; 2057 goto out; 2058 } 2059 m->pages = req->r_pages; 2060 m->nr_pages = req->r_num_pages; 2061 m->page_alignment = req->r_page_alignment; 2062 #ifdef CONFIG_BLOCK 2063 m->bio = req->r_bio; 2064 #endif 2065 } 2066 *skip = 0; 2067 req->r_con_filling_msg = con->ops->get(con); 2068 dout("get_reply tid %lld %p\n", tid, m); 2069 2070 out: 2071 mutex_unlock(&osdc->request_mutex); 2072 return m; 2073 2074 } 2075 2076 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2077 struct ceph_msg_header *hdr, 2078 int *skip) 2079 { 2080 struct ceph_osd *osd = con->private; 2081 int type = le16_to_cpu(hdr->type); 2082 int front = le32_to_cpu(hdr->front_len); 2083 2084 *skip = 0; 2085 switch (type) { 2086 case CEPH_MSG_OSD_MAP: 2087 case CEPH_MSG_WATCH_NOTIFY: 2088 return ceph_msg_new(type, front, GFP_NOFS, false); 2089 case CEPH_MSG_OSD_OPREPLY: 2090 return get_reply(con, hdr, skip); 2091 default: 2092 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2093 osd->o_osd); 2094 *skip = 1; 2095 return NULL; 2096 } 2097 } 2098 2099 /* 2100 * Wrappers to refcount containing ceph_osd struct 2101 */ 2102 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2103 { 2104 struct ceph_osd *osd = con->private; 2105 if (get_osd(osd)) 2106 return con; 2107 return NULL; 2108 } 2109 2110 static void put_osd_con(struct ceph_connection *con) 2111 { 2112 struct ceph_osd *osd = con->private; 2113 put_osd(osd); 2114 } 2115 2116 /* 2117 * authentication 2118 */ 2119 /* 2120 * Note: returned pointer is the address of a structure that's 2121 * managed separately. Caller must *not* attempt to free it. 2122 */ 2123 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2124 int *proto, int force_new) 2125 { 2126 struct ceph_osd *o = con->private; 2127 struct ceph_osd_client *osdc = o->o_osdc; 2128 struct ceph_auth_client *ac = osdc->client->monc.auth; 2129 struct ceph_auth_handshake *auth = &o->o_auth; 2130 2131 if (force_new && auth->authorizer) { 2132 if (ac->ops && ac->ops->destroy_authorizer) 2133 ac->ops->destroy_authorizer(ac, auth->authorizer); 2134 auth->authorizer = NULL; 2135 } 2136 if (!auth->authorizer && ac->ops && ac->ops->create_authorizer) { 2137 int ret = ac->ops->create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2138 auth); 2139 if (ret) 2140 return ERR_PTR(ret); 2141 } 2142 *proto = ac->protocol; 2143 2144 return auth; 2145 } 2146 2147 2148 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2149 { 2150 struct ceph_osd *o = con->private; 2151 struct ceph_osd_client *osdc = o->o_osdc; 2152 struct ceph_auth_client *ac = osdc->client->monc.auth; 2153 2154 /* 2155 * XXX If ac->ops or ac->ops->verify_authorizer_reply is null, 2156 * XXX which do we do: succeed or fail? 2157 */ 2158 return ac->ops->verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2159 } 2160 2161 static int invalidate_authorizer(struct ceph_connection *con) 2162 { 2163 struct ceph_osd *o = con->private; 2164 struct ceph_osd_client *osdc = o->o_osdc; 2165 struct ceph_auth_client *ac = osdc->client->monc.auth; 2166 2167 if (ac->ops && ac->ops->invalidate_authorizer) 2168 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2169 2170 return ceph_monc_validate_auth(&osdc->client->monc); 2171 } 2172 2173 static const struct ceph_connection_operations osd_con_ops = { 2174 .get = get_osd_con, 2175 .put = put_osd_con, 2176 .dispatch = dispatch, 2177 .get_authorizer = get_authorizer, 2178 .verify_authorizer_reply = verify_authorizer_reply, 2179 .invalidate_authorizer = invalidate_authorizer, 2180 .alloc_msg = alloc_msg, 2181 .fault = osd_reset, 2182 }; 2183