1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 26 static void __send_queued(struct ceph_osd_client *osdc); 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 static void __register_request(struct ceph_osd_client *osdc, 29 struct ceph_osd_request *req); 30 static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __send_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 35 static int op_has_extent(int op) 36 { 37 return (op == CEPH_OSD_OP_READ || 38 op == CEPH_OSD_OP_WRITE); 39 } 40 41 /* 42 * Implement client access to distributed object storage cluster. 43 * 44 * All data objects are stored within a cluster/cloud of OSDs, or 45 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 46 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 47 * remote daemons serving up and coordinating consistent and safe 48 * access to storage. 49 * 50 * Cluster membership and the mapping of data objects onto storage devices 51 * are described by the osd map. 52 * 53 * We keep track of pending OSD requests (read, write), resubmit 54 * requests to different OSDs when the cluster topology/data layout 55 * change, or retry the affected requests when the communications 56 * channel with an OSD is reset. 57 */ 58 59 /* 60 * calculate the mapping of a file extent onto an object, and fill out the 61 * request accordingly. shorten extent as necessary if it crosses an 62 * object boundary. 63 * 64 * fill osd op in request message. 65 */ 66 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 67 struct ceph_osd_req_op *op, u64 *bno) 68 { 69 u64 orig_len = *plen; 70 u64 objoff = 0; 71 u64 objlen = 0; 72 int r; 73 74 /* object extent? */ 75 r = ceph_calc_file_object_mapping(layout, off, orig_len, bno, 76 &objoff, &objlen); 77 if (r < 0) 78 return r; 79 if (objlen < orig_len) { 80 *plen = objlen; 81 dout(" skipping last %llu, final file extent %llu~%llu\n", 82 orig_len - *plen, off, *plen); 83 } 84 85 if (op_has_extent(op->op)) { 86 u32 osize = le32_to_cpu(layout->fl_object_size); 87 op->extent.offset = objoff; 88 op->extent.length = objlen; 89 if (op->extent.truncate_size <= off - objoff) { 90 op->extent.truncate_size = 0; 91 } else { 92 op->extent.truncate_size -= off - objoff; 93 if (op->extent.truncate_size > osize) 94 op->extent.truncate_size = osize; 95 } 96 } 97 if (op->op == CEPH_OSD_OP_WRITE) 98 op->payload_len = *plen; 99 100 dout("calc_layout bno=%llx %llu~%llu\n", *bno, objoff, objlen); 101 102 return 0; 103 } 104 105 /* 106 * requests 107 */ 108 void ceph_osdc_release_request(struct kref *kref) 109 { 110 int num_pages; 111 struct ceph_osd_request *req = container_of(kref, 112 struct ceph_osd_request, 113 r_kref); 114 115 if (req->r_request) 116 ceph_msg_put(req->r_request); 117 if (req->r_con_filling_msg) { 118 dout("%s revoking msg %p from con %p\n", __func__, 119 req->r_reply, req->r_con_filling_msg); 120 ceph_msg_revoke_incoming(req->r_reply); 121 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 122 req->r_con_filling_msg = NULL; 123 } 124 if (req->r_reply) 125 ceph_msg_put(req->r_reply); 126 127 if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES && 128 req->r_data_in.own_pages) { 129 num_pages = calc_pages_for((u64)req->r_data_in.alignment, 130 (u64)req->r_data_in.length); 131 ceph_release_page_vector(req->r_data_in.pages, num_pages); 132 } 133 if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES && 134 req->r_data_out.own_pages) { 135 num_pages = calc_pages_for((u64)req->r_data_out.alignment, 136 (u64)req->r_data_out.length); 137 ceph_release_page_vector(req->r_data_out.pages, num_pages); 138 } 139 140 ceph_put_snap_context(req->r_snapc); 141 if (req->r_mempool) 142 mempool_free(req, req->r_osdc->req_mempool); 143 else 144 kfree(req); 145 } 146 EXPORT_SYMBOL(ceph_osdc_release_request); 147 148 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 149 struct ceph_snap_context *snapc, 150 unsigned int num_ops, 151 bool use_mempool, 152 gfp_t gfp_flags) 153 { 154 struct ceph_osd_request *req; 155 struct ceph_msg *msg; 156 size_t msg_size; 157 158 msg_size = 4 + 4 + 8 + 8 + 4+8; 159 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 160 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 161 msg_size += 4 + MAX_OBJ_NAME_SIZE; 162 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 163 msg_size += 8; /* snapid */ 164 msg_size += 8; /* snap_seq */ 165 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 166 msg_size += 4; 167 168 if (use_mempool) { 169 req = mempool_alloc(osdc->req_mempool, gfp_flags); 170 memset(req, 0, sizeof(*req)); 171 } else { 172 req = kzalloc(sizeof(*req), gfp_flags); 173 } 174 if (req == NULL) 175 return NULL; 176 177 req->r_osdc = osdc; 178 req->r_mempool = use_mempool; 179 180 kref_init(&req->r_kref); 181 init_completion(&req->r_completion); 182 init_completion(&req->r_safe_completion); 183 RB_CLEAR_NODE(&req->r_node); 184 INIT_LIST_HEAD(&req->r_unsafe_item); 185 INIT_LIST_HEAD(&req->r_linger_item); 186 INIT_LIST_HEAD(&req->r_linger_osd); 187 INIT_LIST_HEAD(&req->r_req_lru_item); 188 INIT_LIST_HEAD(&req->r_osd_item); 189 190 /* create reply message */ 191 if (use_mempool) 192 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 193 else 194 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 195 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 196 if (!msg) { 197 ceph_osdc_put_request(req); 198 return NULL; 199 } 200 req->r_reply = msg; 201 202 req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE; 203 req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE; 204 205 /* create request message; allow space for oid */ 206 if (use_mempool) 207 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 208 else 209 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 210 if (!msg) { 211 ceph_osdc_put_request(req); 212 return NULL; 213 } 214 215 memset(msg->front.iov_base, 0, msg->front.iov_len); 216 217 req->r_request = msg; 218 219 return req; 220 } 221 EXPORT_SYMBOL(ceph_osdc_alloc_request); 222 223 static u64 osd_req_encode_op(struct ceph_osd_request *req, 224 struct ceph_osd_op *dst, 225 struct ceph_osd_req_op *src) 226 { 227 u64 out_data_len = 0; 228 struct ceph_pagelist *pagelist; 229 230 dst->op = cpu_to_le16(src->op); 231 232 switch (src->op) { 233 case CEPH_OSD_OP_STAT: 234 break; 235 case CEPH_OSD_OP_READ: 236 case CEPH_OSD_OP_WRITE: 237 if (src->op == CEPH_OSD_OP_WRITE) 238 out_data_len = src->extent.length; 239 dst->extent.offset = cpu_to_le64(src->extent.offset); 240 dst->extent.length = cpu_to_le64(src->extent.length); 241 dst->extent.truncate_size = 242 cpu_to_le64(src->extent.truncate_size); 243 dst->extent.truncate_seq = 244 cpu_to_le32(src->extent.truncate_seq); 245 break; 246 case CEPH_OSD_OP_CALL: 247 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 248 BUG_ON(!pagelist); 249 ceph_pagelist_init(pagelist); 250 251 dst->cls.class_len = src->cls.class_len; 252 dst->cls.method_len = src->cls.method_len; 253 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 254 ceph_pagelist_append(pagelist, src->cls.class_name, 255 src->cls.class_len); 256 ceph_pagelist_append(pagelist, src->cls.method_name, 257 src->cls.method_len); 258 ceph_pagelist_append(pagelist, src->cls.indata, 259 src->cls.indata_len); 260 261 req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST; 262 req->r_data_out.pagelist = pagelist; 263 out_data_len = pagelist->length; 264 break; 265 case CEPH_OSD_OP_STARTSYNC: 266 break; 267 case CEPH_OSD_OP_NOTIFY_ACK: 268 case CEPH_OSD_OP_WATCH: 269 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 270 dst->watch.ver = cpu_to_le64(src->watch.ver); 271 dst->watch.flag = src->watch.flag; 272 break; 273 default: 274 pr_err("unrecognized osd opcode %d\n", src->op); 275 WARN_ON(1); 276 break; 277 case CEPH_OSD_OP_MAPEXT: 278 case CEPH_OSD_OP_MASKTRUNC: 279 case CEPH_OSD_OP_SPARSE_READ: 280 case CEPH_OSD_OP_NOTIFY: 281 case CEPH_OSD_OP_ASSERT_VER: 282 case CEPH_OSD_OP_WRITEFULL: 283 case CEPH_OSD_OP_TRUNCATE: 284 case CEPH_OSD_OP_ZERO: 285 case CEPH_OSD_OP_DELETE: 286 case CEPH_OSD_OP_APPEND: 287 case CEPH_OSD_OP_SETTRUNC: 288 case CEPH_OSD_OP_TRIMTRUNC: 289 case CEPH_OSD_OP_TMAPUP: 290 case CEPH_OSD_OP_TMAPPUT: 291 case CEPH_OSD_OP_TMAPGET: 292 case CEPH_OSD_OP_CREATE: 293 case CEPH_OSD_OP_ROLLBACK: 294 case CEPH_OSD_OP_OMAPGETKEYS: 295 case CEPH_OSD_OP_OMAPGETVALS: 296 case CEPH_OSD_OP_OMAPGETHEADER: 297 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 298 case CEPH_OSD_OP_MODE_RD: 299 case CEPH_OSD_OP_OMAPSETVALS: 300 case CEPH_OSD_OP_OMAPSETHEADER: 301 case CEPH_OSD_OP_OMAPCLEAR: 302 case CEPH_OSD_OP_OMAPRMKEYS: 303 case CEPH_OSD_OP_OMAP_CMP: 304 case CEPH_OSD_OP_CLONERANGE: 305 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 306 case CEPH_OSD_OP_SRC_CMPXATTR: 307 case CEPH_OSD_OP_GETXATTR: 308 case CEPH_OSD_OP_GETXATTRS: 309 case CEPH_OSD_OP_CMPXATTR: 310 case CEPH_OSD_OP_SETXATTR: 311 case CEPH_OSD_OP_SETXATTRS: 312 case CEPH_OSD_OP_RESETXATTRS: 313 case CEPH_OSD_OP_RMXATTR: 314 case CEPH_OSD_OP_PULL: 315 case CEPH_OSD_OP_PUSH: 316 case CEPH_OSD_OP_BALANCEREADS: 317 case CEPH_OSD_OP_UNBALANCEREADS: 318 case CEPH_OSD_OP_SCRUB: 319 case CEPH_OSD_OP_SCRUB_RESERVE: 320 case CEPH_OSD_OP_SCRUB_UNRESERVE: 321 case CEPH_OSD_OP_SCRUB_STOP: 322 case CEPH_OSD_OP_SCRUB_MAP: 323 case CEPH_OSD_OP_WRLOCK: 324 case CEPH_OSD_OP_WRUNLOCK: 325 case CEPH_OSD_OP_RDLOCK: 326 case CEPH_OSD_OP_RDUNLOCK: 327 case CEPH_OSD_OP_UPLOCK: 328 case CEPH_OSD_OP_DNLOCK: 329 case CEPH_OSD_OP_PGLS: 330 case CEPH_OSD_OP_PGLS_FILTER: 331 pr_err("unsupported osd opcode %s\n", 332 ceph_osd_op_name(src->op)); 333 WARN_ON(1); 334 break; 335 } 336 dst->payload_len = cpu_to_le32(src->payload_len); 337 338 return out_data_len; 339 } 340 341 /* 342 * build new request AND message 343 * 344 */ 345 void ceph_osdc_build_request(struct ceph_osd_request *req, 346 u64 off, unsigned int num_ops, 347 struct ceph_osd_req_op *src_ops, 348 struct ceph_snap_context *snapc, u64 snap_id, 349 struct timespec *mtime) 350 { 351 struct ceph_msg *msg = req->r_request; 352 struct ceph_osd_req_op *src_op; 353 void *p; 354 size_t msg_size; 355 int flags = req->r_flags; 356 u64 data_len; 357 int i; 358 359 req->r_num_ops = num_ops; 360 req->r_snapid = snap_id; 361 req->r_snapc = ceph_get_snap_context(snapc); 362 363 /* encode request */ 364 msg->hdr.version = cpu_to_le16(4); 365 366 p = msg->front.iov_base; 367 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 368 req->r_request_osdmap_epoch = p; 369 p += 4; 370 req->r_request_flags = p; 371 p += 4; 372 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 373 ceph_encode_timespec(p, mtime); 374 p += sizeof(struct ceph_timespec); 375 req->r_request_reassert_version = p; 376 p += sizeof(struct ceph_eversion); /* will get filled in */ 377 378 /* oloc */ 379 ceph_encode_8(&p, 4); 380 ceph_encode_8(&p, 4); 381 ceph_encode_32(&p, 8 + 4 + 4); 382 req->r_request_pool = p; 383 p += 8; 384 ceph_encode_32(&p, -1); /* preferred */ 385 ceph_encode_32(&p, 0); /* key len */ 386 387 ceph_encode_8(&p, 1); 388 req->r_request_pgid = p; 389 p += 8 + 4; 390 ceph_encode_32(&p, -1); /* preferred */ 391 392 /* oid */ 393 ceph_encode_32(&p, req->r_oid_len); 394 memcpy(p, req->r_oid, req->r_oid_len); 395 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 396 p += req->r_oid_len; 397 398 /* ops--can imply data */ 399 ceph_encode_16(&p, num_ops); 400 src_op = src_ops; 401 req->r_request_ops = p; 402 data_len = 0; 403 for (i = 0; i < num_ops; i++, src_op++) { 404 data_len += osd_req_encode_op(req, p, src_op); 405 p += sizeof(struct ceph_osd_op); 406 } 407 408 /* snaps */ 409 ceph_encode_64(&p, req->r_snapid); 410 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 411 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 412 if (req->r_snapc) { 413 for (i = 0; i < snapc->num_snaps; i++) { 414 ceph_encode_64(&p, req->r_snapc->snaps[i]); 415 } 416 } 417 418 req->r_request_attempts = p; 419 p += 4; 420 421 /* data */ 422 if (flags & CEPH_OSD_FLAG_WRITE) 423 req->r_request->hdr.data_off = cpu_to_le16(off); 424 req->r_request->hdr.data_len = cpu_to_le32(data_len); 425 426 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 427 msg_size = p - msg->front.iov_base; 428 msg->front.iov_len = msg_size; 429 msg->hdr.front_len = cpu_to_le32(msg_size); 430 431 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, 432 num_ops); 433 return; 434 } 435 EXPORT_SYMBOL(ceph_osdc_build_request); 436 437 /* 438 * build new request AND message, calculate layout, and adjust file 439 * extent as needed. 440 * 441 * if the file was recently truncated, we include information about its 442 * old and new size so that the object can be updated appropriately. (we 443 * avoid synchronously deleting truncated objects because it's slow.) 444 * 445 * if @do_sync, include a 'startsync' command so that the osd will flush 446 * data quickly. 447 */ 448 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 449 struct ceph_file_layout *layout, 450 struct ceph_vino vino, 451 u64 off, u64 *plen, 452 int opcode, int flags, 453 struct ceph_snap_context *snapc, 454 int do_sync, 455 u32 truncate_seq, 456 u64 truncate_size, 457 struct timespec *mtime, 458 bool use_mempool) 459 { 460 struct ceph_osd_req_op ops[2]; 461 struct ceph_osd_request *req; 462 unsigned int num_op = 1; 463 u64 bno = 0; 464 int r; 465 466 memset(&ops, 0, sizeof ops); 467 468 ops[0].op = opcode; 469 ops[0].extent.truncate_seq = truncate_seq; 470 ops[0].extent.truncate_size = truncate_size; 471 472 if (do_sync) { 473 ops[1].op = CEPH_OSD_OP_STARTSYNC; 474 num_op++; 475 } 476 477 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, 478 GFP_NOFS); 479 if (!req) 480 return ERR_PTR(-ENOMEM); 481 req->r_flags = flags; 482 483 /* calculate max write size */ 484 r = calc_layout(layout, off, plen, ops, &bno); 485 if (r < 0) { 486 ceph_osdc_put_request(req); 487 return ERR_PTR(r); 488 } 489 req->r_file_layout = *layout; /* keep a copy */ 490 491 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 492 req->r_oid_len = strlen(req->r_oid); 493 494 ceph_osdc_build_request(req, off, num_op, ops, 495 snapc, vino.snap, mtime); 496 497 return req; 498 } 499 EXPORT_SYMBOL(ceph_osdc_new_request); 500 501 /* 502 * We keep osd requests in an rbtree, sorted by ->r_tid. 503 */ 504 static void __insert_request(struct ceph_osd_client *osdc, 505 struct ceph_osd_request *new) 506 { 507 struct rb_node **p = &osdc->requests.rb_node; 508 struct rb_node *parent = NULL; 509 struct ceph_osd_request *req = NULL; 510 511 while (*p) { 512 parent = *p; 513 req = rb_entry(parent, struct ceph_osd_request, r_node); 514 if (new->r_tid < req->r_tid) 515 p = &(*p)->rb_left; 516 else if (new->r_tid > req->r_tid) 517 p = &(*p)->rb_right; 518 else 519 BUG(); 520 } 521 522 rb_link_node(&new->r_node, parent, p); 523 rb_insert_color(&new->r_node, &osdc->requests); 524 } 525 526 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 527 u64 tid) 528 { 529 struct ceph_osd_request *req; 530 struct rb_node *n = osdc->requests.rb_node; 531 532 while (n) { 533 req = rb_entry(n, struct ceph_osd_request, r_node); 534 if (tid < req->r_tid) 535 n = n->rb_left; 536 else if (tid > req->r_tid) 537 n = n->rb_right; 538 else 539 return req; 540 } 541 return NULL; 542 } 543 544 static struct ceph_osd_request * 545 __lookup_request_ge(struct ceph_osd_client *osdc, 546 u64 tid) 547 { 548 struct ceph_osd_request *req; 549 struct rb_node *n = osdc->requests.rb_node; 550 551 while (n) { 552 req = rb_entry(n, struct ceph_osd_request, r_node); 553 if (tid < req->r_tid) { 554 if (!n->rb_left) 555 return req; 556 n = n->rb_left; 557 } else if (tid > req->r_tid) { 558 n = n->rb_right; 559 } else { 560 return req; 561 } 562 } 563 return NULL; 564 } 565 566 /* 567 * Resubmit requests pending on the given osd. 568 */ 569 static void __kick_osd_requests(struct ceph_osd_client *osdc, 570 struct ceph_osd *osd) 571 { 572 struct ceph_osd_request *req, *nreq; 573 LIST_HEAD(resend); 574 int err; 575 576 dout("__kick_osd_requests osd%d\n", osd->o_osd); 577 err = __reset_osd(osdc, osd); 578 if (err) 579 return; 580 /* 581 * Build up a list of requests to resend by traversing the 582 * osd's list of requests. Requests for a given object are 583 * sent in tid order, and that is also the order they're 584 * kept on this list. Therefore all requests that are in 585 * flight will be found first, followed by all requests that 586 * have not yet been sent. And to resend requests while 587 * preserving this order we will want to put any sent 588 * requests back on the front of the osd client's unsent 589 * list. 590 * 591 * So we build a separate ordered list of already-sent 592 * requests for the affected osd and splice it onto the 593 * front of the osd client's unsent list. Once we've seen a 594 * request that has not yet been sent we're done. Those 595 * requests are already sitting right where they belong. 596 */ 597 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 598 if (!req->r_sent) 599 break; 600 list_move_tail(&req->r_req_lru_item, &resend); 601 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 602 osd->o_osd); 603 if (!req->r_linger) 604 req->r_flags |= CEPH_OSD_FLAG_RETRY; 605 } 606 list_splice(&resend, &osdc->req_unsent); 607 608 /* 609 * Linger requests are re-registered before sending, which 610 * sets up a new tid for each. We add them to the unsent 611 * list at the end to keep things in tid order. 612 */ 613 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 614 r_linger_osd) { 615 /* 616 * reregister request prior to unregistering linger so 617 * that r_osd is preserved. 618 */ 619 BUG_ON(!list_empty(&req->r_req_lru_item)); 620 __register_request(osdc, req); 621 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); 622 list_add(&req->r_osd_item, &req->r_osd->o_requests); 623 __unregister_linger_request(osdc, req); 624 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 625 osd->o_osd); 626 } 627 } 628 629 /* 630 * If the osd connection drops, we need to resubmit all requests. 631 */ 632 static void osd_reset(struct ceph_connection *con) 633 { 634 struct ceph_osd *osd = con->private; 635 struct ceph_osd_client *osdc; 636 637 if (!osd) 638 return; 639 dout("osd_reset osd%d\n", osd->o_osd); 640 osdc = osd->o_osdc; 641 down_read(&osdc->map_sem); 642 mutex_lock(&osdc->request_mutex); 643 __kick_osd_requests(osdc, osd); 644 __send_queued(osdc); 645 mutex_unlock(&osdc->request_mutex); 646 up_read(&osdc->map_sem); 647 } 648 649 /* 650 * Track open sessions with osds. 651 */ 652 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 653 { 654 struct ceph_osd *osd; 655 656 osd = kzalloc(sizeof(*osd), GFP_NOFS); 657 if (!osd) 658 return NULL; 659 660 atomic_set(&osd->o_ref, 1); 661 osd->o_osdc = osdc; 662 osd->o_osd = onum; 663 RB_CLEAR_NODE(&osd->o_node); 664 INIT_LIST_HEAD(&osd->o_requests); 665 INIT_LIST_HEAD(&osd->o_linger_requests); 666 INIT_LIST_HEAD(&osd->o_osd_lru); 667 osd->o_incarnation = 1; 668 669 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 670 671 INIT_LIST_HEAD(&osd->o_keepalive_item); 672 return osd; 673 } 674 675 static struct ceph_osd *get_osd(struct ceph_osd *osd) 676 { 677 if (atomic_inc_not_zero(&osd->o_ref)) { 678 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 679 atomic_read(&osd->o_ref)); 680 return osd; 681 } else { 682 dout("get_osd %p FAIL\n", osd); 683 return NULL; 684 } 685 } 686 687 static void put_osd(struct ceph_osd *osd) 688 { 689 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 690 atomic_read(&osd->o_ref) - 1); 691 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 692 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 693 694 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 695 kfree(osd); 696 } 697 } 698 699 /* 700 * remove an osd from our map 701 */ 702 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 703 { 704 dout("__remove_osd %p\n", osd); 705 BUG_ON(!list_empty(&osd->o_requests)); 706 rb_erase(&osd->o_node, &osdc->osds); 707 list_del_init(&osd->o_osd_lru); 708 ceph_con_close(&osd->o_con); 709 put_osd(osd); 710 } 711 712 static void remove_all_osds(struct ceph_osd_client *osdc) 713 { 714 dout("%s %p\n", __func__, osdc); 715 mutex_lock(&osdc->request_mutex); 716 while (!RB_EMPTY_ROOT(&osdc->osds)) { 717 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 718 struct ceph_osd, o_node); 719 __remove_osd(osdc, osd); 720 } 721 mutex_unlock(&osdc->request_mutex); 722 } 723 724 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 725 struct ceph_osd *osd) 726 { 727 dout("__move_osd_to_lru %p\n", osd); 728 BUG_ON(!list_empty(&osd->o_osd_lru)); 729 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 730 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 731 } 732 733 static void __remove_osd_from_lru(struct ceph_osd *osd) 734 { 735 dout("__remove_osd_from_lru %p\n", osd); 736 if (!list_empty(&osd->o_osd_lru)) 737 list_del_init(&osd->o_osd_lru); 738 } 739 740 static void remove_old_osds(struct ceph_osd_client *osdc) 741 { 742 struct ceph_osd *osd, *nosd; 743 744 dout("__remove_old_osds %p\n", osdc); 745 mutex_lock(&osdc->request_mutex); 746 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 747 if (time_before(jiffies, osd->lru_ttl)) 748 break; 749 __remove_osd(osdc, osd); 750 } 751 mutex_unlock(&osdc->request_mutex); 752 } 753 754 /* 755 * reset osd connect 756 */ 757 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 758 { 759 struct ceph_entity_addr *peer_addr; 760 761 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 762 if (list_empty(&osd->o_requests) && 763 list_empty(&osd->o_linger_requests)) { 764 __remove_osd(osdc, osd); 765 766 return -ENODEV; 767 } 768 769 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 770 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 771 !ceph_con_opened(&osd->o_con)) { 772 struct ceph_osd_request *req; 773 774 dout(" osd addr hasn't changed and connection never opened," 775 " letting msgr retry"); 776 /* touch each r_stamp for handle_timeout()'s benfit */ 777 list_for_each_entry(req, &osd->o_requests, r_osd_item) 778 req->r_stamp = jiffies; 779 780 return -EAGAIN; 781 } 782 783 ceph_con_close(&osd->o_con); 784 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 785 osd->o_incarnation++; 786 787 return 0; 788 } 789 790 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 791 { 792 struct rb_node **p = &osdc->osds.rb_node; 793 struct rb_node *parent = NULL; 794 struct ceph_osd *osd = NULL; 795 796 dout("__insert_osd %p osd%d\n", new, new->o_osd); 797 while (*p) { 798 parent = *p; 799 osd = rb_entry(parent, struct ceph_osd, o_node); 800 if (new->o_osd < osd->o_osd) 801 p = &(*p)->rb_left; 802 else if (new->o_osd > osd->o_osd) 803 p = &(*p)->rb_right; 804 else 805 BUG(); 806 } 807 808 rb_link_node(&new->o_node, parent, p); 809 rb_insert_color(&new->o_node, &osdc->osds); 810 } 811 812 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 813 { 814 struct ceph_osd *osd; 815 struct rb_node *n = osdc->osds.rb_node; 816 817 while (n) { 818 osd = rb_entry(n, struct ceph_osd, o_node); 819 if (o < osd->o_osd) 820 n = n->rb_left; 821 else if (o > osd->o_osd) 822 n = n->rb_right; 823 else 824 return osd; 825 } 826 return NULL; 827 } 828 829 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 830 { 831 schedule_delayed_work(&osdc->timeout_work, 832 osdc->client->options->osd_keepalive_timeout * HZ); 833 } 834 835 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 836 { 837 cancel_delayed_work(&osdc->timeout_work); 838 } 839 840 /* 841 * Register request, assign tid. If this is the first request, set up 842 * the timeout event. 843 */ 844 static void __register_request(struct ceph_osd_client *osdc, 845 struct ceph_osd_request *req) 846 { 847 req->r_tid = ++osdc->last_tid; 848 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 849 dout("__register_request %p tid %lld\n", req, req->r_tid); 850 __insert_request(osdc, req); 851 ceph_osdc_get_request(req); 852 osdc->num_requests++; 853 if (osdc->num_requests == 1) { 854 dout(" first request, scheduling timeout\n"); 855 __schedule_osd_timeout(osdc); 856 } 857 } 858 859 /* 860 * called under osdc->request_mutex 861 */ 862 static void __unregister_request(struct ceph_osd_client *osdc, 863 struct ceph_osd_request *req) 864 { 865 if (RB_EMPTY_NODE(&req->r_node)) { 866 dout("__unregister_request %p tid %lld not registered\n", 867 req, req->r_tid); 868 return; 869 } 870 871 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 872 rb_erase(&req->r_node, &osdc->requests); 873 osdc->num_requests--; 874 875 if (req->r_osd) { 876 /* make sure the original request isn't in flight. */ 877 ceph_msg_revoke(req->r_request); 878 879 list_del_init(&req->r_osd_item); 880 if (list_empty(&req->r_osd->o_requests) && 881 list_empty(&req->r_osd->o_linger_requests)) { 882 dout("moving osd to %p lru\n", req->r_osd); 883 __move_osd_to_lru(osdc, req->r_osd); 884 } 885 if (list_empty(&req->r_linger_item)) 886 req->r_osd = NULL; 887 } 888 889 list_del_init(&req->r_req_lru_item); 890 ceph_osdc_put_request(req); 891 892 if (osdc->num_requests == 0) { 893 dout(" no requests, canceling timeout\n"); 894 __cancel_osd_timeout(osdc); 895 } 896 } 897 898 /* 899 * Cancel a previously queued request message 900 */ 901 static void __cancel_request(struct ceph_osd_request *req) 902 { 903 if (req->r_sent && req->r_osd) { 904 ceph_msg_revoke(req->r_request); 905 req->r_sent = 0; 906 } 907 } 908 909 static void __register_linger_request(struct ceph_osd_client *osdc, 910 struct ceph_osd_request *req) 911 { 912 dout("__register_linger_request %p\n", req); 913 list_add_tail(&req->r_linger_item, &osdc->req_linger); 914 if (req->r_osd) 915 list_add_tail(&req->r_linger_osd, 916 &req->r_osd->o_linger_requests); 917 } 918 919 static void __unregister_linger_request(struct ceph_osd_client *osdc, 920 struct ceph_osd_request *req) 921 { 922 dout("__unregister_linger_request %p\n", req); 923 list_del_init(&req->r_linger_item); 924 if (req->r_osd) { 925 list_del_init(&req->r_linger_osd); 926 927 if (list_empty(&req->r_osd->o_requests) && 928 list_empty(&req->r_osd->o_linger_requests)) { 929 dout("moving osd to %p lru\n", req->r_osd); 930 __move_osd_to_lru(osdc, req->r_osd); 931 } 932 if (list_empty(&req->r_osd_item)) 933 req->r_osd = NULL; 934 } 935 } 936 937 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 938 struct ceph_osd_request *req) 939 { 940 mutex_lock(&osdc->request_mutex); 941 if (req->r_linger) { 942 __unregister_linger_request(osdc, req); 943 ceph_osdc_put_request(req); 944 } 945 mutex_unlock(&osdc->request_mutex); 946 } 947 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 948 949 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 950 struct ceph_osd_request *req) 951 { 952 if (!req->r_linger) { 953 dout("set_request_linger %p\n", req); 954 req->r_linger = 1; 955 /* 956 * caller is now responsible for calling 957 * unregister_linger_request 958 */ 959 ceph_osdc_get_request(req); 960 } 961 } 962 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 963 964 /* 965 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 966 * (as needed), and set the request r_osd appropriately. If there is 967 * no up osd, set r_osd to NULL. Move the request to the appropriate list 968 * (unsent, homeless) or leave on in-flight lru. 969 * 970 * Return 0 if unchanged, 1 if changed, or negative on error. 971 * 972 * Caller should hold map_sem for read and request_mutex. 973 */ 974 static int __map_request(struct ceph_osd_client *osdc, 975 struct ceph_osd_request *req, int force_resend) 976 { 977 struct ceph_pg pgid; 978 int acting[CEPH_PG_MAX_SIZE]; 979 int o = -1, num = 0; 980 int err; 981 982 dout("map_request %p tid %lld\n", req, req->r_tid); 983 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 984 ceph_file_layout_pg_pool(req->r_file_layout)); 985 if (err) { 986 list_move(&req->r_req_lru_item, &osdc->req_notarget); 987 return err; 988 } 989 req->r_pgid = pgid; 990 991 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 992 if (err > 0) { 993 o = acting[0]; 994 num = err; 995 } 996 997 if ((!force_resend && 998 req->r_osd && req->r_osd->o_osd == o && 999 req->r_sent >= req->r_osd->o_incarnation && 1000 req->r_num_pg_osds == num && 1001 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1002 (req->r_osd == NULL && o == -1)) 1003 return 0; /* no change */ 1004 1005 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1006 req->r_tid, pgid.pool, pgid.seed, o, 1007 req->r_osd ? req->r_osd->o_osd : -1); 1008 1009 /* record full pg acting set */ 1010 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1011 req->r_num_pg_osds = num; 1012 1013 if (req->r_osd) { 1014 __cancel_request(req); 1015 list_del_init(&req->r_osd_item); 1016 req->r_osd = NULL; 1017 } 1018 1019 req->r_osd = __lookup_osd(osdc, o); 1020 if (!req->r_osd && o >= 0) { 1021 err = -ENOMEM; 1022 req->r_osd = create_osd(osdc, o); 1023 if (!req->r_osd) { 1024 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1025 goto out; 1026 } 1027 1028 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1029 __insert_osd(osdc, req->r_osd); 1030 1031 ceph_con_open(&req->r_osd->o_con, 1032 CEPH_ENTITY_TYPE_OSD, o, 1033 &osdc->osdmap->osd_addr[o]); 1034 } 1035 1036 if (req->r_osd) { 1037 __remove_osd_from_lru(req->r_osd); 1038 list_add(&req->r_osd_item, &req->r_osd->o_requests); 1039 list_move(&req->r_req_lru_item, &osdc->req_unsent); 1040 } else { 1041 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1042 } 1043 err = 1; /* osd or pg changed */ 1044 1045 out: 1046 return err; 1047 } 1048 1049 /* 1050 * caller should hold map_sem (for read) and request_mutex 1051 */ 1052 static void __send_request(struct ceph_osd_client *osdc, 1053 struct ceph_osd_request *req) 1054 { 1055 void *p; 1056 1057 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1058 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1059 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1060 1061 /* fill in message content that changes each time we send it */ 1062 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1063 put_unaligned_le32(req->r_flags, req->r_request_flags); 1064 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1065 p = req->r_request_pgid; 1066 ceph_encode_64(&p, req->r_pgid.pool); 1067 ceph_encode_32(&p, req->r_pgid.seed); 1068 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1069 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1070 sizeof(req->r_reassert_version)); 1071 1072 req->r_stamp = jiffies; 1073 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1074 1075 ceph_msg_get(req->r_request); /* send consumes a ref */ 1076 ceph_con_send(&req->r_osd->o_con, req->r_request); 1077 req->r_sent = req->r_osd->o_incarnation; 1078 } 1079 1080 /* 1081 * Send any requests in the queue (req_unsent). 1082 */ 1083 static void __send_queued(struct ceph_osd_client *osdc) 1084 { 1085 struct ceph_osd_request *req, *tmp; 1086 1087 dout("__send_queued\n"); 1088 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1089 __send_request(osdc, req); 1090 } 1091 1092 /* 1093 * Timeout callback, called every N seconds when 1 or more osd 1094 * requests has been active for more than N seconds. When this 1095 * happens, we ping all OSDs with requests who have timed out to 1096 * ensure any communications channel reset is detected. Reset the 1097 * request timeouts another N seconds in the future as we go. 1098 * Reschedule the timeout event another N seconds in future (unless 1099 * there are no open requests). 1100 */ 1101 static void handle_timeout(struct work_struct *work) 1102 { 1103 struct ceph_osd_client *osdc = 1104 container_of(work, struct ceph_osd_client, timeout_work.work); 1105 struct ceph_osd_request *req; 1106 struct ceph_osd *osd; 1107 unsigned long keepalive = 1108 osdc->client->options->osd_keepalive_timeout * HZ; 1109 struct list_head slow_osds; 1110 dout("timeout\n"); 1111 down_read(&osdc->map_sem); 1112 1113 ceph_monc_request_next_osdmap(&osdc->client->monc); 1114 1115 mutex_lock(&osdc->request_mutex); 1116 1117 /* 1118 * ping osds that are a bit slow. this ensures that if there 1119 * is a break in the TCP connection we will notice, and reopen 1120 * a connection with that osd (from the fault callback). 1121 */ 1122 INIT_LIST_HEAD(&slow_osds); 1123 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1124 if (time_before(jiffies, req->r_stamp + keepalive)) 1125 break; 1126 1127 osd = req->r_osd; 1128 BUG_ON(!osd); 1129 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1130 req->r_tid, osd->o_osd); 1131 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1132 } 1133 while (!list_empty(&slow_osds)) { 1134 osd = list_entry(slow_osds.next, struct ceph_osd, 1135 o_keepalive_item); 1136 list_del_init(&osd->o_keepalive_item); 1137 ceph_con_keepalive(&osd->o_con); 1138 } 1139 1140 __schedule_osd_timeout(osdc); 1141 __send_queued(osdc); 1142 mutex_unlock(&osdc->request_mutex); 1143 up_read(&osdc->map_sem); 1144 } 1145 1146 static void handle_osds_timeout(struct work_struct *work) 1147 { 1148 struct ceph_osd_client *osdc = 1149 container_of(work, struct ceph_osd_client, 1150 osds_timeout_work.work); 1151 unsigned long delay = 1152 osdc->client->options->osd_idle_ttl * HZ >> 2; 1153 1154 dout("osds timeout\n"); 1155 down_read(&osdc->map_sem); 1156 remove_old_osds(osdc); 1157 up_read(&osdc->map_sem); 1158 1159 schedule_delayed_work(&osdc->osds_timeout_work, 1160 round_jiffies_relative(delay)); 1161 } 1162 1163 static void complete_request(struct ceph_osd_request *req) 1164 { 1165 if (req->r_safe_callback) 1166 req->r_safe_callback(req, NULL); 1167 complete_all(&req->r_safe_completion); /* fsync waiter */ 1168 } 1169 1170 static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid) 1171 { 1172 __u8 v; 1173 1174 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad); 1175 v = ceph_decode_8(p); 1176 if (v > 1) { 1177 pr_warning("do not understand pg encoding %d > 1", v); 1178 return -EINVAL; 1179 } 1180 pgid->pool = ceph_decode_64(p); 1181 pgid->seed = ceph_decode_32(p); 1182 *p += 4; 1183 return 0; 1184 1185 bad: 1186 pr_warning("incomplete pg encoding"); 1187 return -EINVAL; 1188 } 1189 1190 /* 1191 * handle osd op reply. either call the callback if it is specified, 1192 * or do the completion to wake up the waiting thread. 1193 */ 1194 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1195 struct ceph_connection *con) 1196 { 1197 void *p, *end; 1198 struct ceph_osd_request *req; 1199 u64 tid; 1200 int object_len; 1201 int numops, payload_len, flags; 1202 s32 result; 1203 s32 retry_attempt; 1204 struct ceph_pg pg; 1205 int err; 1206 u32 reassert_epoch; 1207 u64 reassert_version; 1208 u32 osdmap_epoch; 1209 int already_completed; 1210 int i; 1211 1212 tid = le64_to_cpu(msg->hdr.tid); 1213 dout("handle_reply %p tid %llu\n", msg, tid); 1214 1215 p = msg->front.iov_base; 1216 end = p + msg->front.iov_len; 1217 1218 ceph_decode_need(&p, end, 4, bad); 1219 object_len = ceph_decode_32(&p); 1220 ceph_decode_need(&p, end, object_len, bad); 1221 p += object_len; 1222 1223 err = __decode_pgid(&p, end, &pg); 1224 if (err) 1225 goto bad; 1226 1227 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1228 flags = ceph_decode_64(&p); 1229 result = ceph_decode_32(&p); 1230 reassert_epoch = ceph_decode_32(&p); 1231 reassert_version = ceph_decode_64(&p); 1232 osdmap_epoch = ceph_decode_32(&p); 1233 1234 /* lookup */ 1235 mutex_lock(&osdc->request_mutex); 1236 req = __lookup_request(osdc, tid); 1237 if (req == NULL) { 1238 dout("handle_reply tid %llu dne\n", tid); 1239 mutex_unlock(&osdc->request_mutex); 1240 return; 1241 } 1242 ceph_osdc_get_request(req); 1243 1244 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1245 req, result); 1246 1247 ceph_decode_need(&p, end, 4, bad); 1248 numops = ceph_decode_32(&p); 1249 if (numops > CEPH_OSD_MAX_OP) 1250 goto bad_put; 1251 if (numops != req->r_num_ops) 1252 goto bad_put; 1253 payload_len = 0; 1254 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); 1255 for (i = 0; i < numops; i++) { 1256 struct ceph_osd_op *op = p; 1257 int len; 1258 1259 len = le32_to_cpu(op->payload_len); 1260 req->r_reply_op_len[i] = len; 1261 dout(" op %d has %d bytes\n", i, len); 1262 payload_len += len; 1263 p += sizeof(*op); 1264 } 1265 if (payload_len != le32_to_cpu(msg->hdr.data_len)) { 1266 pr_warning("sum of op payload lens %d != data_len %d", 1267 payload_len, le32_to_cpu(msg->hdr.data_len)); 1268 goto bad_put; 1269 } 1270 1271 ceph_decode_need(&p, end, 4 + numops * 4, bad); 1272 retry_attempt = ceph_decode_32(&p); 1273 for (i = 0; i < numops; i++) 1274 req->r_reply_op_result[i] = ceph_decode_32(&p); 1275 1276 /* 1277 * if this connection filled our message, drop our reference now, to 1278 * avoid a (safe but slower) revoke later. 1279 */ 1280 if (req->r_con_filling_msg == con && req->r_reply == msg) { 1281 dout(" dropping con_filling_msg ref %p\n", con); 1282 req->r_con_filling_msg = NULL; 1283 con->ops->put(con); 1284 } 1285 1286 if (!req->r_got_reply) { 1287 unsigned int bytes; 1288 1289 req->r_result = result; 1290 bytes = le32_to_cpu(msg->hdr.data_len); 1291 dout("handle_reply result %d bytes %d\n", req->r_result, 1292 bytes); 1293 if (req->r_result == 0) 1294 req->r_result = bytes; 1295 1296 /* in case this is a write and we need to replay, */ 1297 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1298 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1299 1300 req->r_got_reply = 1; 1301 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1302 dout("handle_reply tid %llu dup ack\n", tid); 1303 mutex_unlock(&osdc->request_mutex); 1304 goto done; 1305 } 1306 1307 dout("handle_reply tid %llu flags %d\n", tid, flags); 1308 1309 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1310 __register_linger_request(osdc, req); 1311 1312 /* either this is a read, or we got the safe response */ 1313 if (result < 0 || 1314 (flags & CEPH_OSD_FLAG_ONDISK) || 1315 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1316 __unregister_request(osdc, req); 1317 1318 already_completed = req->r_completed; 1319 req->r_completed = 1; 1320 mutex_unlock(&osdc->request_mutex); 1321 if (already_completed) 1322 goto done; 1323 1324 if (req->r_callback) 1325 req->r_callback(req, msg); 1326 else 1327 complete_all(&req->r_completion); 1328 1329 if (flags & CEPH_OSD_FLAG_ONDISK) 1330 complete_request(req); 1331 1332 done: 1333 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1334 ceph_osdc_put_request(req); 1335 return; 1336 1337 bad_put: 1338 ceph_osdc_put_request(req); 1339 bad: 1340 pr_err("corrupt osd_op_reply got %d %d\n", 1341 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1342 ceph_msg_dump(msg); 1343 } 1344 1345 static void reset_changed_osds(struct ceph_osd_client *osdc) 1346 { 1347 struct rb_node *p, *n; 1348 1349 for (p = rb_first(&osdc->osds); p; p = n) { 1350 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1351 1352 n = rb_next(p); 1353 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1354 memcmp(&osd->o_con.peer_addr, 1355 ceph_osd_addr(osdc->osdmap, 1356 osd->o_osd), 1357 sizeof(struct ceph_entity_addr)) != 0) 1358 __reset_osd(osdc, osd); 1359 } 1360 } 1361 1362 /* 1363 * Requeue requests whose mapping to an OSD has changed. If requests map to 1364 * no osd, request a new map. 1365 * 1366 * Caller should hold map_sem for read. 1367 */ 1368 static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1369 { 1370 struct ceph_osd_request *req, *nreq; 1371 struct rb_node *p; 1372 int needmap = 0; 1373 int err; 1374 1375 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1376 mutex_lock(&osdc->request_mutex); 1377 for (p = rb_first(&osdc->requests); p; ) { 1378 req = rb_entry(p, struct ceph_osd_request, r_node); 1379 p = rb_next(p); 1380 1381 /* 1382 * For linger requests that have not yet been 1383 * registered, move them to the linger list; they'll 1384 * be sent to the osd in the loop below. Unregister 1385 * the request before re-registering it as a linger 1386 * request to ensure the __map_request() below 1387 * will decide it needs to be sent. 1388 */ 1389 if (req->r_linger && list_empty(&req->r_linger_item)) { 1390 dout("%p tid %llu restart on osd%d\n", 1391 req, req->r_tid, 1392 req->r_osd ? req->r_osd->o_osd : -1); 1393 __unregister_request(osdc, req); 1394 __register_linger_request(osdc, req); 1395 continue; 1396 } 1397 1398 err = __map_request(osdc, req, force_resend); 1399 if (err < 0) 1400 continue; /* error */ 1401 if (req->r_osd == NULL) { 1402 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1403 needmap++; /* request a newer map */ 1404 } else if (err > 0) { 1405 if (!req->r_linger) { 1406 dout("%p tid %llu requeued on osd%d\n", req, 1407 req->r_tid, 1408 req->r_osd ? req->r_osd->o_osd : -1); 1409 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1410 } 1411 } 1412 } 1413 1414 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1415 r_linger_item) { 1416 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1417 1418 err = __map_request(osdc, req, force_resend); 1419 dout("__map_request returned %d\n", err); 1420 if (err == 0) 1421 continue; /* no change and no osd was specified */ 1422 if (err < 0) 1423 continue; /* hrm! */ 1424 if (req->r_osd == NULL) { 1425 dout("tid %llu maps to no valid osd\n", req->r_tid); 1426 needmap++; /* request a newer map */ 1427 continue; 1428 } 1429 1430 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1431 req->r_osd ? req->r_osd->o_osd : -1); 1432 __register_request(osdc, req); 1433 __unregister_linger_request(osdc, req); 1434 } 1435 mutex_unlock(&osdc->request_mutex); 1436 1437 if (needmap) { 1438 dout("%d requests for down osds, need new map\n", needmap); 1439 ceph_monc_request_next_osdmap(&osdc->client->monc); 1440 } 1441 reset_changed_osds(osdc); 1442 } 1443 1444 1445 /* 1446 * Process updated osd map. 1447 * 1448 * The message contains any number of incremental and full maps, normally 1449 * indicating some sort of topology change in the cluster. Kick requests 1450 * off to different OSDs as needed. 1451 */ 1452 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1453 { 1454 void *p, *end, *next; 1455 u32 nr_maps, maplen; 1456 u32 epoch; 1457 struct ceph_osdmap *newmap = NULL, *oldmap; 1458 int err; 1459 struct ceph_fsid fsid; 1460 1461 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1462 p = msg->front.iov_base; 1463 end = p + msg->front.iov_len; 1464 1465 /* verify fsid */ 1466 ceph_decode_need(&p, end, sizeof(fsid), bad); 1467 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1468 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1469 return; 1470 1471 down_write(&osdc->map_sem); 1472 1473 /* incremental maps */ 1474 ceph_decode_32_safe(&p, end, nr_maps, bad); 1475 dout(" %d inc maps\n", nr_maps); 1476 while (nr_maps > 0) { 1477 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1478 epoch = ceph_decode_32(&p); 1479 maplen = ceph_decode_32(&p); 1480 ceph_decode_need(&p, end, maplen, bad); 1481 next = p + maplen; 1482 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1483 dout("applying incremental map %u len %d\n", 1484 epoch, maplen); 1485 newmap = osdmap_apply_incremental(&p, next, 1486 osdc->osdmap, 1487 &osdc->client->msgr); 1488 if (IS_ERR(newmap)) { 1489 err = PTR_ERR(newmap); 1490 goto bad; 1491 } 1492 BUG_ON(!newmap); 1493 if (newmap != osdc->osdmap) { 1494 ceph_osdmap_destroy(osdc->osdmap); 1495 osdc->osdmap = newmap; 1496 } 1497 kick_requests(osdc, 0); 1498 } else { 1499 dout("ignoring incremental map %u len %d\n", 1500 epoch, maplen); 1501 } 1502 p = next; 1503 nr_maps--; 1504 } 1505 if (newmap) 1506 goto done; 1507 1508 /* full maps */ 1509 ceph_decode_32_safe(&p, end, nr_maps, bad); 1510 dout(" %d full maps\n", nr_maps); 1511 while (nr_maps) { 1512 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1513 epoch = ceph_decode_32(&p); 1514 maplen = ceph_decode_32(&p); 1515 ceph_decode_need(&p, end, maplen, bad); 1516 if (nr_maps > 1) { 1517 dout("skipping non-latest full map %u len %d\n", 1518 epoch, maplen); 1519 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1520 dout("skipping full map %u len %d, " 1521 "older than our %u\n", epoch, maplen, 1522 osdc->osdmap->epoch); 1523 } else { 1524 int skipped_map = 0; 1525 1526 dout("taking full map %u len %d\n", epoch, maplen); 1527 newmap = osdmap_decode(&p, p+maplen); 1528 if (IS_ERR(newmap)) { 1529 err = PTR_ERR(newmap); 1530 goto bad; 1531 } 1532 BUG_ON(!newmap); 1533 oldmap = osdc->osdmap; 1534 osdc->osdmap = newmap; 1535 if (oldmap) { 1536 if (oldmap->epoch + 1 < newmap->epoch) 1537 skipped_map = 1; 1538 ceph_osdmap_destroy(oldmap); 1539 } 1540 kick_requests(osdc, skipped_map); 1541 } 1542 p += maplen; 1543 nr_maps--; 1544 } 1545 1546 done: 1547 downgrade_write(&osdc->map_sem); 1548 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1549 1550 /* 1551 * subscribe to subsequent osdmap updates if full to ensure 1552 * we find out when we are no longer full and stop returning 1553 * ENOSPC. 1554 */ 1555 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1556 ceph_monc_request_next_osdmap(&osdc->client->monc); 1557 1558 mutex_lock(&osdc->request_mutex); 1559 __send_queued(osdc); 1560 mutex_unlock(&osdc->request_mutex); 1561 up_read(&osdc->map_sem); 1562 wake_up_all(&osdc->client->auth_wq); 1563 return; 1564 1565 bad: 1566 pr_err("osdc handle_map corrupt msg\n"); 1567 ceph_msg_dump(msg); 1568 up_write(&osdc->map_sem); 1569 return; 1570 } 1571 1572 /* 1573 * watch/notify callback event infrastructure 1574 * 1575 * These callbacks are used both for watch and notify operations. 1576 */ 1577 static void __release_event(struct kref *kref) 1578 { 1579 struct ceph_osd_event *event = 1580 container_of(kref, struct ceph_osd_event, kref); 1581 1582 dout("__release_event %p\n", event); 1583 kfree(event); 1584 } 1585 1586 static void get_event(struct ceph_osd_event *event) 1587 { 1588 kref_get(&event->kref); 1589 } 1590 1591 void ceph_osdc_put_event(struct ceph_osd_event *event) 1592 { 1593 kref_put(&event->kref, __release_event); 1594 } 1595 EXPORT_SYMBOL(ceph_osdc_put_event); 1596 1597 static void __insert_event(struct ceph_osd_client *osdc, 1598 struct ceph_osd_event *new) 1599 { 1600 struct rb_node **p = &osdc->event_tree.rb_node; 1601 struct rb_node *parent = NULL; 1602 struct ceph_osd_event *event = NULL; 1603 1604 while (*p) { 1605 parent = *p; 1606 event = rb_entry(parent, struct ceph_osd_event, node); 1607 if (new->cookie < event->cookie) 1608 p = &(*p)->rb_left; 1609 else if (new->cookie > event->cookie) 1610 p = &(*p)->rb_right; 1611 else 1612 BUG(); 1613 } 1614 1615 rb_link_node(&new->node, parent, p); 1616 rb_insert_color(&new->node, &osdc->event_tree); 1617 } 1618 1619 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1620 u64 cookie) 1621 { 1622 struct rb_node **p = &osdc->event_tree.rb_node; 1623 struct rb_node *parent = NULL; 1624 struct ceph_osd_event *event = NULL; 1625 1626 while (*p) { 1627 parent = *p; 1628 event = rb_entry(parent, struct ceph_osd_event, node); 1629 if (cookie < event->cookie) 1630 p = &(*p)->rb_left; 1631 else if (cookie > event->cookie) 1632 p = &(*p)->rb_right; 1633 else 1634 return event; 1635 } 1636 return NULL; 1637 } 1638 1639 static void __remove_event(struct ceph_osd_event *event) 1640 { 1641 struct ceph_osd_client *osdc = event->osdc; 1642 1643 if (!RB_EMPTY_NODE(&event->node)) { 1644 dout("__remove_event removed %p\n", event); 1645 rb_erase(&event->node, &osdc->event_tree); 1646 ceph_osdc_put_event(event); 1647 } else { 1648 dout("__remove_event didn't remove %p\n", event); 1649 } 1650 } 1651 1652 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1653 void (*event_cb)(u64, u64, u8, void *), 1654 void *data, struct ceph_osd_event **pevent) 1655 { 1656 struct ceph_osd_event *event; 1657 1658 event = kmalloc(sizeof(*event), GFP_NOIO); 1659 if (!event) 1660 return -ENOMEM; 1661 1662 dout("create_event %p\n", event); 1663 event->cb = event_cb; 1664 event->one_shot = 0; 1665 event->data = data; 1666 event->osdc = osdc; 1667 INIT_LIST_HEAD(&event->osd_node); 1668 RB_CLEAR_NODE(&event->node); 1669 kref_init(&event->kref); /* one ref for us */ 1670 kref_get(&event->kref); /* one ref for the caller */ 1671 1672 spin_lock(&osdc->event_lock); 1673 event->cookie = ++osdc->event_count; 1674 __insert_event(osdc, event); 1675 spin_unlock(&osdc->event_lock); 1676 1677 *pevent = event; 1678 return 0; 1679 } 1680 EXPORT_SYMBOL(ceph_osdc_create_event); 1681 1682 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1683 { 1684 struct ceph_osd_client *osdc = event->osdc; 1685 1686 dout("cancel_event %p\n", event); 1687 spin_lock(&osdc->event_lock); 1688 __remove_event(event); 1689 spin_unlock(&osdc->event_lock); 1690 ceph_osdc_put_event(event); /* caller's */ 1691 } 1692 EXPORT_SYMBOL(ceph_osdc_cancel_event); 1693 1694 1695 static void do_event_work(struct work_struct *work) 1696 { 1697 struct ceph_osd_event_work *event_work = 1698 container_of(work, struct ceph_osd_event_work, work); 1699 struct ceph_osd_event *event = event_work->event; 1700 u64 ver = event_work->ver; 1701 u64 notify_id = event_work->notify_id; 1702 u8 opcode = event_work->opcode; 1703 1704 dout("do_event_work completing %p\n", event); 1705 event->cb(ver, notify_id, opcode, event->data); 1706 dout("do_event_work completed %p\n", event); 1707 ceph_osdc_put_event(event); 1708 kfree(event_work); 1709 } 1710 1711 1712 /* 1713 * Process osd watch notifications 1714 */ 1715 static void handle_watch_notify(struct ceph_osd_client *osdc, 1716 struct ceph_msg *msg) 1717 { 1718 void *p, *end; 1719 u8 proto_ver; 1720 u64 cookie, ver, notify_id; 1721 u8 opcode; 1722 struct ceph_osd_event *event; 1723 struct ceph_osd_event_work *event_work; 1724 1725 p = msg->front.iov_base; 1726 end = p + msg->front.iov_len; 1727 1728 ceph_decode_8_safe(&p, end, proto_ver, bad); 1729 ceph_decode_8_safe(&p, end, opcode, bad); 1730 ceph_decode_64_safe(&p, end, cookie, bad); 1731 ceph_decode_64_safe(&p, end, ver, bad); 1732 ceph_decode_64_safe(&p, end, notify_id, bad); 1733 1734 spin_lock(&osdc->event_lock); 1735 event = __find_event(osdc, cookie); 1736 if (event) { 1737 BUG_ON(event->one_shot); 1738 get_event(event); 1739 } 1740 spin_unlock(&osdc->event_lock); 1741 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1742 cookie, ver, event); 1743 if (event) { 1744 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1745 if (!event_work) { 1746 dout("ERROR: could not allocate event_work\n"); 1747 goto done_err; 1748 } 1749 INIT_WORK(&event_work->work, do_event_work); 1750 event_work->event = event; 1751 event_work->ver = ver; 1752 event_work->notify_id = notify_id; 1753 event_work->opcode = opcode; 1754 if (!queue_work(osdc->notify_wq, &event_work->work)) { 1755 dout("WARNING: failed to queue notify event work\n"); 1756 goto done_err; 1757 } 1758 } 1759 1760 return; 1761 1762 done_err: 1763 ceph_osdc_put_event(event); 1764 return; 1765 1766 bad: 1767 pr_err("osdc handle_watch_notify corrupt msg\n"); 1768 return; 1769 } 1770 1771 static void ceph_osdc_msg_data_set(struct ceph_msg *msg, 1772 struct ceph_osd_data *osd_data) 1773 { 1774 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 1775 BUG_ON(osd_data->length > (u64) SIZE_MAX); 1776 if (osd_data->length) 1777 ceph_msg_data_set_pages(msg, osd_data->pages, 1778 osd_data->length, osd_data->alignment); 1779 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 1780 BUG_ON(!osd_data->pagelist->length); 1781 ceph_msg_data_set_pagelist(msg, osd_data->pagelist); 1782 #ifdef CONFIG_BLOCK 1783 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 1784 ceph_msg_data_set_bio(msg, osd_data->bio); 1785 #endif 1786 } else { 1787 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 1788 } 1789 } 1790 1791 /* 1792 * Register request, send initial attempt. 1793 */ 1794 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1795 struct ceph_osd_request *req, 1796 bool nofail) 1797 { 1798 int rc = 0; 1799 1800 /* Set up response incoming data and request outgoing data fields */ 1801 1802 ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in); 1803 ceph_osdc_msg_data_set(req->r_request, &req->r_data_out); 1804 1805 down_read(&osdc->map_sem); 1806 mutex_lock(&osdc->request_mutex); 1807 __register_request(osdc, req); 1808 WARN_ON(req->r_sent); 1809 rc = __map_request(osdc, req, 0); 1810 if (rc < 0) { 1811 if (nofail) { 1812 dout("osdc_start_request failed map, " 1813 " will retry %lld\n", req->r_tid); 1814 rc = 0; 1815 } 1816 goto out_unlock; 1817 } 1818 if (req->r_osd == NULL) { 1819 dout("send_request %p no up osds in pg\n", req); 1820 ceph_monc_request_next_osdmap(&osdc->client->monc); 1821 } else { 1822 __send_request(osdc, req); 1823 } 1824 rc = 0; 1825 out_unlock: 1826 mutex_unlock(&osdc->request_mutex); 1827 up_read(&osdc->map_sem); 1828 return rc; 1829 } 1830 EXPORT_SYMBOL(ceph_osdc_start_request); 1831 1832 /* 1833 * wait for a request to complete 1834 */ 1835 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1836 struct ceph_osd_request *req) 1837 { 1838 int rc; 1839 1840 rc = wait_for_completion_interruptible(&req->r_completion); 1841 if (rc < 0) { 1842 mutex_lock(&osdc->request_mutex); 1843 __cancel_request(req); 1844 __unregister_request(osdc, req); 1845 mutex_unlock(&osdc->request_mutex); 1846 complete_request(req); 1847 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1848 return rc; 1849 } 1850 1851 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1852 return req->r_result; 1853 } 1854 EXPORT_SYMBOL(ceph_osdc_wait_request); 1855 1856 /* 1857 * sync - wait for all in-flight requests to flush. avoid starvation. 1858 */ 1859 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1860 { 1861 struct ceph_osd_request *req; 1862 u64 last_tid, next_tid = 0; 1863 1864 mutex_lock(&osdc->request_mutex); 1865 last_tid = osdc->last_tid; 1866 while (1) { 1867 req = __lookup_request_ge(osdc, next_tid); 1868 if (!req) 1869 break; 1870 if (req->r_tid > last_tid) 1871 break; 1872 1873 next_tid = req->r_tid + 1; 1874 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1875 continue; 1876 1877 ceph_osdc_get_request(req); 1878 mutex_unlock(&osdc->request_mutex); 1879 dout("sync waiting on tid %llu (last is %llu)\n", 1880 req->r_tid, last_tid); 1881 wait_for_completion(&req->r_safe_completion); 1882 mutex_lock(&osdc->request_mutex); 1883 ceph_osdc_put_request(req); 1884 } 1885 mutex_unlock(&osdc->request_mutex); 1886 dout("sync done (thru tid %llu)\n", last_tid); 1887 } 1888 EXPORT_SYMBOL(ceph_osdc_sync); 1889 1890 /* 1891 * init, shutdown 1892 */ 1893 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1894 { 1895 int err; 1896 1897 dout("init\n"); 1898 osdc->client = client; 1899 osdc->osdmap = NULL; 1900 init_rwsem(&osdc->map_sem); 1901 init_completion(&osdc->map_waiters); 1902 osdc->last_requested_map = 0; 1903 mutex_init(&osdc->request_mutex); 1904 osdc->last_tid = 0; 1905 osdc->osds = RB_ROOT; 1906 INIT_LIST_HEAD(&osdc->osd_lru); 1907 osdc->requests = RB_ROOT; 1908 INIT_LIST_HEAD(&osdc->req_lru); 1909 INIT_LIST_HEAD(&osdc->req_unsent); 1910 INIT_LIST_HEAD(&osdc->req_notarget); 1911 INIT_LIST_HEAD(&osdc->req_linger); 1912 osdc->num_requests = 0; 1913 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1914 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1915 spin_lock_init(&osdc->event_lock); 1916 osdc->event_tree = RB_ROOT; 1917 osdc->event_count = 0; 1918 1919 schedule_delayed_work(&osdc->osds_timeout_work, 1920 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1921 1922 err = -ENOMEM; 1923 osdc->req_mempool = mempool_create_kmalloc_pool(10, 1924 sizeof(struct ceph_osd_request)); 1925 if (!osdc->req_mempool) 1926 goto out; 1927 1928 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 1929 OSD_OP_FRONT_LEN, 10, true, 1930 "osd_op"); 1931 if (err < 0) 1932 goto out_mempool; 1933 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 1934 OSD_OPREPLY_FRONT_LEN, 10, true, 1935 "osd_op_reply"); 1936 if (err < 0) 1937 goto out_msgpool; 1938 1939 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 1940 if (IS_ERR(osdc->notify_wq)) { 1941 err = PTR_ERR(osdc->notify_wq); 1942 osdc->notify_wq = NULL; 1943 goto out_msgpool; 1944 } 1945 return 0; 1946 1947 out_msgpool: 1948 ceph_msgpool_destroy(&osdc->msgpool_op); 1949 out_mempool: 1950 mempool_destroy(osdc->req_mempool); 1951 out: 1952 return err; 1953 } 1954 1955 void ceph_osdc_stop(struct ceph_osd_client *osdc) 1956 { 1957 flush_workqueue(osdc->notify_wq); 1958 destroy_workqueue(osdc->notify_wq); 1959 cancel_delayed_work_sync(&osdc->timeout_work); 1960 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1961 if (osdc->osdmap) { 1962 ceph_osdmap_destroy(osdc->osdmap); 1963 osdc->osdmap = NULL; 1964 } 1965 remove_all_osds(osdc); 1966 mempool_destroy(osdc->req_mempool); 1967 ceph_msgpool_destroy(&osdc->msgpool_op); 1968 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1969 } 1970 1971 /* 1972 * Read some contiguous pages. If we cross a stripe boundary, shorten 1973 * *plen. Return number of bytes read, or error. 1974 */ 1975 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 1976 struct ceph_vino vino, struct ceph_file_layout *layout, 1977 u64 off, u64 *plen, 1978 u32 truncate_seq, u64 truncate_size, 1979 struct page **pages, int num_pages, int page_align) 1980 { 1981 struct ceph_osd_request *req; 1982 struct ceph_osd_data *osd_data; 1983 int rc = 0; 1984 1985 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 1986 vino.snap, off, *plen); 1987 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1988 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1989 NULL, 0, truncate_seq, truncate_size, NULL, 1990 false); 1991 if (IS_ERR(req)) 1992 return PTR_ERR(req); 1993 1994 /* it may be a short read due to an object boundary */ 1995 1996 osd_data = &req->r_data_in; 1997 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 1998 osd_data->pages = pages; 1999 osd_data->length = *plen; 2000 osd_data->alignment = page_align; 2001 2002 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2003 off, *plen, osd_data->length, page_align); 2004 2005 rc = ceph_osdc_start_request(osdc, req, false); 2006 if (!rc) 2007 rc = ceph_osdc_wait_request(osdc, req); 2008 2009 ceph_osdc_put_request(req); 2010 dout("readpages result %d\n", rc); 2011 return rc; 2012 } 2013 EXPORT_SYMBOL(ceph_osdc_readpages); 2014 2015 /* 2016 * do a synchronous write on N pages 2017 */ 2018 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2019 struct ceph_file_layout *layout, 2020 struct ceph_snap_context *snapc, 2021 u64 off, u64 len, 2022 u32 truncate_seq, u64 truncate_size, 2023 struct timespec *mtime, 2024 struct page **pages, int num_pages) 2025 { 2026 struct ceph_osd_request *req; 2027 struct ceph_osd_data *osd_data; 2028 int rc = 0; 2029 int page_align = off & ~PAGE_MASK; 2030 2031 BUG_ON(vino.snap != CEPH_NOSNAP); 2032 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 2033 CEPH_OSD_OP_WRITE, 2034 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2035 snapc, 0, 2036 truncate_seq, truncate_size, mtime, 2037 true); 2038 if (IS_ERR(req)) 2039 return PTR_ERR(req); 2040 2041 /* it may be a short write due to an object boundary */ 2042 osd_data = &req->r_data_out; 2043 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2044 osd_data->pages = pages; 2045 osd_data->length = len; 2046 osd_data->alignment = page_align; 2047 dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length); 2048 2049 rc = ceph_osdc_start_request(osdc, req, true); 2050 if (!rc) 2051 rc = ceph_osdc_wait_request(osdc, req); 2052 2053 ceph_osdc_put_request(req); 2054 if (rc == 0) 2055 rc = len; 2056 dout("writepages result %d\n", rc); 2057 return rc; 2058 } 2059 EXPORT_SYMBOL(ceph_osdc_writepages); 2060 2061 /* 2062 * handle incoming message 2063 */ 2064 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2065 { 2066 struct ceph_osd *osd = con->private; 2067 struct ceph_osd_client *osdc; 2068 int type = le16_to_cpu(msg->hdr.type); 2069 2070 if (!osd) 2071 goto out; 2072 osdc = osd->o_osdc; 2073 2074 switch (type) { 2075 case CEPH_MSG_OSD_MAP: 2076 ceph_osdc_handle_map(osdc, msg); 2077 break; 2078 case CEPH_MSG_OSD_OPREPLY: 2079 handle_reply(osdc, msg, con); 2080 break; 2081 case CEPH_MSG_WATCH_NOTIFY: 2082 handle_watch_notify(osdc, msg); 2083 break; 2084 2085 default: 2086 pr_err("received unknown message type %d %s\n", type, 2087 ceph_msg_type_name(type)); 2088 } 2089 out: 2090 ceph_msg_put(msg); 2091 } 2092 2093 /* 2094 * lookup and return message for incoming reply. set up reply message 2095 * pages. 2096 */ 2097 static struct ceph_msg *get_reply(struct ceph_connection *con, 2098 struct ceph_msg_header *hdr, 2099 int *skip) 2100 { 2101 struct ceph_osd *osd = con->private; 2102 struct ceph_osd_client *osdc = osd->o_osdc; 2103 struct ceph_msg *m; 2104 struct ceph_osd_request *req; 2105 int front = le32_to_cpu(hdr->front_len); 2106 int data_len = le32_to_cpu(hdr->data_len); 2107 u64 tid; 2108 2109 tid = le64_to_cpu(hdr->tid); 2110 mutex_lock(&osdc->request_mutex); 2111 req = __lookup_request(osdc, tid); 2112 if (!req) { 2113 *skip = 1; 2114 m = NULL; 2115 dout("get_reply unknown tid %llu from osd%d\n", tid, 2116 osd->o_osd); 2117 goto out; 2118 } 2119 2120 if (req->r_con_filling_msg) { 2121 dout("%s revoking msg %p from old con %p\n", __func__, 2122 req->r_reply, req->r_con_filling_msg); 2123 ceph_msg_revoke_incoming(req->r_reply); 2124 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 2125 req->r_con_filling_msg = NULL; 2126 } 2127 2128 if (front > req->r_reply->front.iov_len) { 2129 pr_warning("get_reply front %d > preallocated %d\n", 2130 front, (int)req->r_reply->front.iov_len); 2131 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2132 if (!m) 2133 goto out; 2134 ceph_msg_put(req->r_reply); 2135 req->r_reply = m; 2136 } 2137 m = ceph_msg_get(req->r_reply); 2138 2139 if (data_len > 0) { 2140 struct ceph_osd_data *osd_data = &req->r_data_in; 2141 2142 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2143 if (osd_data->pages && 2144 unlikely(osd_data->length < data_len)) { 2145 2146 pr_warning("tid %lld reply has %d bytes " 2147 "we had only %llu bytes ready\n", 2148 tid, data_len, osd_data->length); 2149 *skip = 1; 2150 ceph_msg_put(m); 2151 m = NULL; 2152 goto out; 2153 } 2154 } 2155 } 2156 *skip = 0; 2157 req->r_con_filling_msg = con->ops->get(con); 2158 dout("get_reply tid %lld %p\n", tid, m); 2159 2160 out: 2161 mutex_unlock(&osdc->request_mutex); 2162 return m; 2163 2164 } 2165 2166 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2167 struct ceph_msg_header *hdr, 2168 int *skip) 2169 { 2170 struct ceph_osd *osd = con->private; 2171 int type = le16_to_cpu(hdr->type); 2172 int front = le32_to_cpu(hdr->front_len); 2173 2174 *skip = 0; 2175 switch (type) { 2176 case CEPH_MSG_OSD_MAP: 2177 case CEPH_MSG_WATCH_NOTIFY: 2178 return ceph_msg_new(type, front, GFP_NOFS, false); 2179 case CEPH_MSG_OSD_OPREPLY: 2180 return get_reply(con, hdr, skip); 2181 default: 2182 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2183 osd->o_osd); 2184 *skip = 1; 2185 return NULL; 2186 } 2187 } 2188 2189 /* 2190 * Wrappers to refcount containing ceph_osd struct 2191 */ 2192 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2193 { 2194 struct ceph_osd *osd = con->private; 2195 if (get_osd(osd)) 2196 return con; 2197 return NULL; 2198 } 2199 2200 static void put_osd_con(struct ceph_connection *con) 2201 { 2202 struct ceph_osd *osd = con->private; 2203 put_osd(osd); 2204 } 2205 2206 /* 2207 * authentication 2208 */ 2209 /* 2210 * Note: returned pointer is the address of a structure that's 2211 * managed separately. Caller must *not* attempt to free it. 2212 */ 2213 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2214 int *proto, int force_new) 2215 { 2216 struct ceph_osd *o = con->private; 2217 struct ceph_osd_client *osdc = o->o_osdc; 2218 struct ceph_auth_client *ac = osdc->client->monc.auth; 2219 struct ceph_auth_handshake *auth = &o->o_auth; 2220 2221 if (force_new && auth->authorizer) { 2222 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2223 auth->authorizer = NULL; 2224 } 2225 if (!auth->authorizer) { 2226 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2227 auth); 2228 if (ret) 2229 return ERR_PTR(ret); 2230 } else { 2231 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2232 auth); 2233 if (ret) 2234 return ERR_PTR(ret); 2235 } 2236 *proto = ac->protocol; 2237 2238 return auth; 2239 } 2240 2241 2242 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2243 { 2244 struct ceph_osd *o = con->private; 2245 struct ceph_osd_client *osdc = o->o_osdc; 2246 struct ceph_auth_client *ac = osdc->client->monc.auth; 2247 2248 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2249 } 2250 2251 static int invalidate_authorizer(struct ceph_connection *con) 2252 { 2253 struct ceph_osd *o = con->private; 2254 struct ceph_osd_client *osdc = o->o_osdc; 2255 struct ceph_auth_client *ac = osdc->client->monc.auth; 2256 2257 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2258 return ceph_monc_validate_auth(&osdc->client->monc); 2259 } 2260 2261 static const struct ceph_connection_operations osd_con_ops = { 2262 .get = get_osd_con, 2263 .put = put_osd_con, 2264 .dispatch = dispatch, 2265 .get_authorizer = get_authorizer, 2266 .verify_authorizer_reply = verify_authorizer_reply, 2267 .invalidate_authorizer = invalidate_authorizer, 2268 .alloc_msg = alloc_msg, 2269 .fault = osd_reset, 2270 }; 2271