1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 26 static void __send_queued(struct ceph_osd_client *osdc); 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 static void __register_request(struct ceph_osd_client *osdc, 29 struct ceph_osd_request *req); 30 static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __send_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 35 static int op_has_extent(int op) 36 { 37 return (op == CEPH_OSD_OP_READ || 38 op == CEPH_OSD_OP_WRITE); 39 } 40 41 /* 42 * Implement client access to distributed object storage cluster. 43 * 44 * All data objects are stored within a cluster/cloud of OSDs, or 45 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 46 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 47 * remote daemons serving up and coordinating consistent and safe 48 * access to storage. 49 * 50 * Cluster membership and the mapping of data objects onto storage devices 51 * are described by the osd map. 52 * 53 * We keep track of pending OSD requests (read, write), resubmit 54 * requests to different OSDs when the cluster topology/data layout 55 * change, or retry the affected requests when the communications 56 * channel with an OSD is reset. 57 */ 58 59 /* 60 * calculate the mapping of a file extent onto an object, and fill out the 61 * request accordingly. shorten extent as necessary if it crosses an 62 * object boundary. 63 * 64 * fill osd op in request message. 65 */ 66 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 67 struct ceph_osd_req_op *op, u64 *bno) 68 { 69 u64 orig_len = *plen; 70 u64 objoff = 0; 71 u64 objlen = 0; 72 int r; 73 74 /* object extent? */ 75 r = ceph_calc_file_object_mapping(layout, off, orig_len, bno, 76 &objoff, &objlen); 77 if (r < 0) 78 return r; 79 if (objlen < orig_len) { 80 *plen = objlen; 81 dout(" skipping last %llu, final file extent %llu~%llu\n", 82 orig_len - *plen, off, *plen); 83 } 84 85 if (op_has_extent(op->op)) { 86 u32 osize = le32_to_cpu(layout->fl_object_size); 87 op->extent.offset = objoff; 88 op->extent.length = objlen; 89 if (op->extent.truncate_size <= off - objoff) { 90 op->extent.truncate_size = 0; 91 } else { 92 op->extent.truncate_size -= off - objoff; 93 if (op->extent.truncate_size > osize) 94 op->extent.truncate_size = osize; 95 } 96 } 97 if (op->op == CEPH_OSD_OP_WRITE) 98 op->payload_len = *plen; 99 100 dout("calc_layout bno=%llx %llu~%llu\n", *bno, objoff, objlen); 101 102 return 0; 103 } 104 105 /* 106 * requests 107 */ 108 void ceph_osdc_release_request(struct kref *kref) 109 { 110 int num_pages; 111 struct ceph_osd_request *req = container_of(kref, 112 struct ceph_osd_request, 113 r_kref); 114 115 if (req->r_request) 116 ceph_msg_put(req->r_request); 117 if (req->r_con_filling_msg) { 118 dout("%s revoking msg %p from con %p\n", __func__, 119 req->r_reply, req->r_con_filling_msg); 120 ceph_msg_revoke_incoming(req->r_reply); 121 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 122 req->r_con_filling_msg = NULL; 123 } 124 if (req->r_reply) 125 ceph_msg_put(req->r_reply); 126 127 if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES && 128 req->r_data_in.own_pages) { 129 num_pages = calc_pages_for((u64)req->r_data_in.alignment, 130 (u64)req->r_data_in.length); 131 ceph_release_page_vector(req->r_data_in.pages, num_pages); 132 } 133 if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES && 134 req->r_data_out.own_pages) { 135 num_pages = calc_pages_for((u64)req->r_data_out.alignment, 136 (u64)req->r_data_out.length); 137 ceph_release_page_vector(req->r_data_out.pages, num_pages); 138 } 139 140 ceph_put_snap_context(req->r_snapc); 141 if (req->r_mempool) 142 mempool_free(req, req->r_osdc->req_mempool); 143 else 144 kfree(req); 145 } 146 EXPORT_SYMBOL(ceph_osdc_release_request); 147 148 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 149 struct ceph_snap_context *snapc, 150 unsigned int num_ops, 151 bool use_mempool, 152 gfp_t gfp_flags) 153 { 154 struct ceph_osd_request *req; 155 struct ceph_msg *msg; 156 size_t msg_size; 157 158 msg_size = 4 + 4 + 8 + 8 + 4+8; 159 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 160 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 161 msg_size += 4 + MAX_OBJ_NAME_SIZE; 162 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 163 msg_size += 8; /* snapid */ 164 msg_size += 8; /* snap_seq */ 165 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 166 msg_size += 4; 167 168 if (use_mempool) { 169 req = mempool_alloc(osdc->req_mempool, gfp_flags); 170 memset(req, 0, sizeof(*req)); 171 } else { 172 req = kzalloc(sizeof(*req), gfp_flags); 173 } 174 if (req == NULL) 175 return NULL; 176 177 req->r_osdc = osdc; 178 req->r_mempool = use_mempool; 179 180 kref_init(&req->r_kref); 181 init_completion(&req->r_completion); 182 init_completion(&req->r_safe_completion); 183 RB_CLEAR_NODE(&req->r_node); 184 INIT_LIST_HEAD(&req->r_unsafe_item); 185 INIT_LIST_HEAD(&req->r_linger_item); 186 INIT_LIST_HEAD(&req->r_linger_osd); 187 INIT_LIST_HEAD(&req->r_req_lru_item); 188 INIT_LIST_HEAD(&req->r_osd_item); 189 190 /* create reply message */ 191 if (use_mempool) 192 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 193 else 194 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 195 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 196 if (!msg) { 197 ceph_osdc_put_request(req); 198 return NULL; 199 } 200 req->r_reply = msg; 201 202 req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE; 203 req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE; 204 205 /* create request message; allow space for oid */ 206 if (use_mempool) 207 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 208 else 209 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 210 if (!msg) { 211 ceph_osdc_put_request(req); 212 return NULL; 213 } 214 215 memset(msg->front.iov_base, 0, msg->front.iov_len); 216 217 req->r_request = msg; 218 219 return req; 220 } 221 EXPORT_SYMBOL(ceph_osdc_alloc_request); 222 223 static u64 osd_req_encode_op(struct ceph_osd_request *req, 224 struct ceph_osd_op *dst, 225 struct ceph_osd_req_op *src) 226 { 227 u64 out_data_len = 0; 228 struct ceph_pagelist *pagelist; 229 230 dst->op = cpu_to_le16(src->op); 231 232 switch (src->op) { 233 case CEPH_OSD_OP_STAT: 234 break; 235 case CEPH_OSD_OP_READ: 236 case CEPH_OSD_OP_WRITE: 237 if (src->op == CEPH_OSD_OP_WRITE) 238 out_data_len = src->extent.length; 239 dst->extent.offset = cpu_to_le64(src->extent.offset); 240 dst->extent.length = cpu_to_le64(src->extent.length); 241 dst->extent.truncate_size = 242 cpu_to_le64(src->extent.truncate_size); 243 dst->extent.truncate_seq = 244 cpu_to_le32(src->extent.truncate_seq); 245 break; 246 case CEPH_OSD_OP_CALL: 247 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 248 BUG_ON(!pagelist); 249 ceph_pagelist_init(pagelist); 250 251 dst->cls.class_len = src->cls.class_len; 252 dst->cls.method_len = src->cls.method_len; 253 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 254 ceph_pagelist_append(pagelist, src->cls.class_name, 255 src->cls.class_len); 256 ceph_pagelist_append(pagelist, src->cls.method_name, 257 src->cls.method_len); 258 ceph_pagelist_append(pagelist, src->cls.indata, 259 src->cls.indata_len); 260 261 req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST; 262 req->r_data_out.pagelist = pagelist; 263 out_data_len = pagelist->length; 264 break; 265 case CEPH_OSD_OP_STARTSYNC: 266 break; 267 case CEPH_OSD_OP_NOTIFY_ACK: 268 case CEPH_OSD_OP_WATCH: 269 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 270 dst->watch.ver = cpu_to_le64(src->watch.ver); 271 dst->watch.flag = src->watch.flag; 272 break; 273 default: 274 pr_err("unrecognized osd opcode %d\n", src->op); 275 WARN_ON(1); 276 break; 277 case CEPH_OSD_OP_MAPEXT: 278 case CEPH_OSD_OP_MASKTRUNC: 279 case CEPH_OSD_OP_SPARSE_READ: 280 case CEPH_OSD_OP_NOTIFY: 281 case CEPH_OSD_OP_ASSERT_VER: 282 case CEPH_OSD_OP_WRITEFULL: 283 case CEPH_OSD_OP_TRUNCATE: 284 case CEPH_OSD_OP_ZERO: 285 case CEPH_OSD_OP_DELETE: 286 case CEPH_OSD_OP_APPEND: 287 case CEPH_OSD_OP_SETTRUNC: 288 case CEPH_OSD_OP_TRIMTRUNC: 289 case CEPH_OSD_OP_TMAPUP: 290 case CEPH_OSD_OP_TMAPPUT: 291 case CEPH_OSD_OP_TMAPGET: 292 case CEPH_OSD_OP_CREATE: 293 case CEPH_OSD_OP_ROLLBACK: 294 case CEPH_OSD_OP_OMAPGETKEYS: 295 case CEPH_OSD_OP_OMAPGETVALS: 296 case CEPH_OSD_OP_OMAPGETHEADER: 297 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 298 case CEPH_OSD_OP_MODE_RD: 299 case CEPH_OSD_OP_OMAPSETVALS: 300 case CEPH_OSD_OP_OMAPSETHEADER: 301 case CEPH_OSD_OP_OMAPCLEAR: 302 case CEPH_OSD_OP_OMAPRMKEYS: 303 case CEPH_OSD_OP_OMAP_CMP: 304 case CEPH_OSD_OP_CLONERANGE: 305 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 306 case CEPH_OSD_OP_SRC_CMPXATTR: 307 case CEPH_OSD_OP_GETXATTR: 308 case CEPH_OSD_OP_GETXATTRS: 309 case CEPH_OSD_OP_CMPXATTR: 310 case CEPH_OSD_OP_SETXATTR: 311 case CEPH_OSD_OP_SETXATTRS: 312 case CEPH_OSD_OP_RESETXATTRS: 313 case CEPH_OSD_OP_RMXATTR: 314 case CEPH_OSD_OP_PULL: 315 case CEPH_OSD_OP_PUSH: 316 case CEPH_OSD_OP_BALANCEREADS: 317 case CEPH_OSD_OP_UNBALANCEREADS: 318 case CEPH_OSD_OP_SCRUB: 319 case CEPH_OSD_OP_SCRUB_RESERVE: 320 case CEPH_OSD_OP_SCRUB_UNRESERVE: 321 case CEPH_OSD_OP_SCRUB_STOP: 322 case CEPH_OSD_OP_SCRUB_MAP: 323 case CEPH_OSD_OP_WRLOCK: 324 case CEPH_OSD_OP_WRUNLOCK: 325 case CEPH_OSD_OP_RDLOCK: 326 case CEPH_OSD_OP_RDUNLOCK: 327 case CEPH_OSD_OP_UPLOCK: 328 case CEPH_OSD_OP_DNLOCK: 329 case CEPH_OSD_OP_PGLS: 330 case CEPH_OSD_OP_PGLS_FILTER: 331 pr_err("unsupported osd opcode %s\n", 332 ceph_osd_op_name(src->op)); 333 WARN_ON(1); 334 break; 335 } 336 dst->payload_len = cpu_to_le32(src->payload_len); 337 338 return out_data_len; 339 } 340 341 /* 342 * build new request AND message 343 * 344 */ 345 void ceph_osdc_build_request(struct ceph_osd_request *req, 346 u64 off, unsigned int num_ops, 347 struct ceph_osd_req_op *src_ops, 348 struct ceph_snap_context *snapc, u64 snap_id, 349 struct timespec *mtime) 350 { 351 struct ceph_msg *msg = req->r_request; 352 struct ceph_osd_req_op *src_op; 353 void *p; 354 size_t msg_size; 355 int flags = req->r_flags; 356 u64 data_len; 357 int i; 358 359 req->r_num_ops = num_ops; 360 req->r_snapid = snap_id; 361 req->r_snapc = ceph_get_snap_context(snapc); 362 363 /* encode request */ 364 msg->hdr.version = cpu_to_le16(4); 365 366 p = msg->front.iov_base; 367 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 368 req->r_request_osdmap_epoch = p; 369 p += 4; 370 req->r_request_flags = p; 371 p += 4; 372 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 373 ceph_encode_timespec(p, mtime); 374 p += sizeof(struct ceph_timespec); 375 req->r_request_reassert_version = p; 376 p += sizeof(struct ceph_eversion); /* will get filled in */ 377 378 /* oloc */ 379 ceph_encode_8(&p, 4); 380 ceph_encode_8(&p, 4); 381 ceph_encode_32(&p, 8 + 4 + 4); 382 req->r_request_pool = p; 383 p += 8; 384 ceph_encode_32(&p, -1); /* preferred */ 385 ceph_encode_32(&p, 0); /* key len */ 386 387 ceph_encode_8(&p, 1); 388 req->r_request_pgid = p; 389 p += 8 + 4; 390 ceph_encode_32(&p, -1); /* preferred */ 391 392 /* oid */ 393 ceph_encode_32(&p, req->r_oid_len); 394 memcpy(p, req->r_oid, req->r_oid_len); 395 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 396 p += req->r_oid_len; 397 398 /* ops--can imply data */ 399 ceph_encode_16(&p, num_ops); 400 src_op = src_ops; 401 req->r_request_ops = p; 402 data_len = 0; 403 for (i = 0; i < num_ops; i++, src_op++) { 404 data_len += osd_req_encode_op(req, p, src_op); 405 p += sizeof(struct ceph_osd_op); 406 } 407 408 /* snaps */ 409 ceph_encode_64(&p, req->r_snapid); 410 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 411 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 412 if (req->r_snapc) { 413 for (i = 0; i < snapc->num_snaps; i++) { 414 ceph_encode_64(&p, req->r_snapc->snaps[i]); 415 } 416 } 417 418 req->r_request_attempts = p; 419 p += 4; 420 421 /* data */ 422 if (flags & CEPH_OSD_FLAG_WRITE) { 423 u16 data_off; 424 425 /* 426 * The header "data_off" is a hint to the receiver 427 * allowing it to align received data into its 428 * buffers such that there's no need to re-copy 429 * it before writing it to disk (direct I/O). 430 */ 431 data_off = (u16) (off & 0xffff); 432 req->r_request->hdr.data_off = cpu_to_le16(data_off); 433 } 434 req->r_request->hdr.data_len = cpu_to_le32(data_len); 435 436 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 437 msg_size = p - msg->front.iov_base; 438 msg->front.iov_len = msg_size; 439 msg->hdr.front_len = cpu_to_le32(msg_size); 440 441 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, 442 num_ops); 443 return; 444 } 445 EXPORT_SYMBOL(ceph_osdc_build_request); 446 447 /* 448 * build new request AND message, calculate layout, and adjust file 449 * extent as needed. 450 * 451 * if the file was recently truncated, we include information about its 452 * old and new size so that the object can be updated appropriately. (we 453 * avoid synchronously deleting truncated objects because it's slow.) 454 * 455 * if @do_sync, include a 'startsync' command so that the osd will flush 456 * data quickly. 457 */ 458 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 459 struct ceph_file_layout *layout, 460 struct ceph_vino vino, 461 u64 off, u64 *plen, 462 int opcode, int flags, 463 struct ceph_snap_context *snapc, 464 int do_sync, 465 u32 truncate_seq, 466 u64 truncate_size, 467 struct timespec *mtime, 468 bool use_mempool) 469 { 470 struct ceph_osd_req_op ops[2]; 471 struct ceph_osd_request *req; 472 unsigned int num_op = 1; 473 u64 bno = 0; 474 int r; 475 476 memset(&ops, 0, sizeof ops); 477 478 ops[0].op = opcode; 479 ops[0].extent.truncate_seq = truncate_seq; 480 ops[0].extent.truncate_size = truncate_size; 481 482 if (do_sync) { 483 ops[1].op = CEPH_OSD_OP_STARTSYNC; 484 num_op++; 485 } 486 487 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, 488 GFP_NOFS); 489 if (!req) 490 return ERR_PTR(-ENOMEM); 491 req->r_flags = flags; 492 493 /* calculate max write size */ 494 r = calc_layout(layout, off, plen, ops, &bno); 495 if (r < 0) { 496 ceph_osdc_put_request(req); 497 return ERR_PTR(r); 498 } 499 req->r_file_layout = *layout; /* keep a copy */ 500 501 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno); 502 req->r_oid_len = strlen(req->r_oid); 503 504 ceph_osdc_build_request(req, off, num_op, ops, 505 snapc, vino.snap, mtime); 506 507 return req; 508 } 509 EXPORT_SYMBOL(ceph_osdc_new_request); 510 511 /* 512 * We keep osd requests in an rbtree, sorted by ->r_tid. 513 */ 514 static void __insert_request(struct ceph_osd_client *osdc, 515 struct ceph_osd_request *new) 516 { 517 struct rb_node **p = &osdc->requests.rb_node; 518 struct rb_node *parent = NULL; 519 struct ceph_osd_request *req = NULL; 520 521 while (*p) { 522 parent = *p; 523 req = rb_entry(parent, struct ceph_osd_request, r_node); 524 if (new->r_tid < req->r_tid) 525 p = &(*p)->rb_left; 526 else if (new->r_tid > req->r_tid) 527 p = &(*p)->rb_right; 528 else 529 BUG(); 530 } 531 532 rb_link_node(&new->r_node, parent, p); 533 rb_insert_color(&new->r_node, &osdc->requests); 534 } 535 536 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 537 u64 tid) 538 { 539 struct ceph_osd_request *req; 540 struct rb_node *n = osdc->requests.rb_node; 541 542 while (n) { 543 req = rb_entry(n, struct ceph_osd_request, r_node); 544 if (tid < req->r_tid) 545 n = n->rb_left; 546 else if (tid > req->r_tid) 547 n = n->rb_right; 548 else 549 return req; 550 } 551 return NULL; 552 } 553 554 static struct ceph_osd_request * 555 __lookup_request_ge(struct ceph_osd_client *osdc, 556 u64 tid) 557 { 558 struct ceph_osd_request *req; 559 struct rb_node *n = osdc->requests.rb_node; 560 561 while (n) { 562 req = rb_entry(n, struct ceph_osd_request, r_node); 563 if (tid < req->r_tid) { 564 if (!n->rb_left) 565 return req; 566 n = n->rb_left; 567 } else if (tid > req->r_tid) { 568 n = n->rb_right; 569 } else { 570 return req; 571 } 572 } 573 return NULL; 574 } 575 576 /* 577 * Resubmit requests pending on the given osd. 578 */ 579 static void __kick_osd_requests(struct ceph_osd_client *osdc, 580 struct ceph_osd *osd) 581 { 582 struct ceph_osd_request *req, *nreq; 583 LIST_HEAD(resend); 584 int err; 585 586 dout("__kick_osd_requests osd%d\n", osd->o_osd); 587 err = __reset_osd(osdc, osd); 588 if (err) 589 return; 590 /* 591 * Build up a list of requests to resend by traversing the 592 * osd's list of requests. Requests for a given object are 593 * sent in tid order, and that is also the order they're 594 * kept on this list. Therefore all requests that are in 595 * flight will be found first, followed by all requests that 596 * have not yet been sent. And to resend requests while 597 * preserving this order we will want to put any sent 598 * requests back on the front of the osd client's unsent 599 * list. 600 * 601 * So we build a separate ordered list of already-sent 602 * requests for the affected osd and splice it onto the 603 * front of the osd client's unsent list. Once we've seen a 604 * request that has not yet been sent we're done. Those 605 * requests are already sitting right where they belong. 606 */ 607 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 608 if (!req->r_sent) 609 break; 610 list_move_tail(&req->r_req_lru_item, &resend); 611 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 612 osd->o_osd); 613 if (!req->r_linger) 614 req->r_flags |= CEPH_OSD_FLAG_RETRY; 615 } 616 list_splice(&resend, &osdc->req_unsent); 617 618 /* 619 * Linger requests are re-registered before sending, which 620 * sets up a new tid for each. We add them to the unsent 621 * list at the end to keep things in tid order. 622 */ 623 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 624 r_linger_osd) { 625 /* 626 * reregister request prior to unregistering linger so 627 * that r_osd is preserved. 628 */ 629 BUG_ON(!list_empty(&req->r_req_lru_item)); 630 __register_request(osdc, req); 631 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); 632 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 633 __unregister_linger_request(osdc, req); 634 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 635 osd->o_osd); 636 } 637 } 638 639 /* 640 * If the osd connection drops, we need to resubmit all requests. 641 */ 642 static void osd_reset(struct ceph_connection *con) 643 { 644 struct ceph_osd *osd = con->private; 645 struct ceph_osd_client *osdc; 646 647 if (!osd) 648 return; 649 dout("osd_reset osd%d\n", osd->o_osd); 650 osdc = osd->o_osdc; 651 down_read(&osdc->map_sem); 652 mutex_lock(&osdc->request_mutex); 653 __kick_osd_requests(osdc, osd); 654 __send_queued(osdc); 655 mutex_unlock(&osdc->request_mutex); 656 up_read(&osdc->map_sem); 657 } 658 659 /* 660 * Track open sessions with osds. 661 */ 662 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 663 { 664 struct ceph_osd *osd; 665 666 osd = kzalloc(sizeof(*osd), GFP_NOFS); 667 if (!osd) 668 return NULL; 669 670 atomic_set(&osd->o_ref, 1); 671 osd->o_osdc = osdc; 672 osd->o_osd = onum; 673 RB_CLEAR_NODE(&osd->o_node); 674 INIT_LIST_HEAD(&osd->o_requests); 675 INIT_LIST_HEAD(&osd->o_linger_requests); 676 INIT_LIST_HEAD(&osd->o_osd_lru); 677 osd->o_incarnation = 1; 678 679 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 680 681 INIT_LIST_HEAD(&osd->o_keepalive_item); 682 return osd; 683 } 684 685 static struct ceph_osd *get_osd(struct ceph_osd *osd) 686 { 687 if (atomic_inc_not_zero(&osd->o_ref)) { 688 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 689 atomic_read(&osd->o_ref)); 690 return osd; 691 } else { 692 dout("get_osd %p FAIL\n", osd); 693 return NULL; 694 } 695 } 696 697 static void put_osd(struct ceph_osd *osd) 698 { 699 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 700 atomic_read(&osd->o_ref) - 1); 701 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 702 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 703 704 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 705 kfree(osd); 706 } 707 } 708 709 /* 710 * remove an osd from our map 711 */ 712 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 713 { 714 dout("__remove_osd %p\n", osd); 715 BUG_ON(!list_empty(&osd->o_requests)); 716 rb_erase(&osd->o_node, &osdc->osds); 717 list_del_init(&osd->o_osd_lru); 718 ceph_con_close(&osd->o_con); 719 put_osd(osd); 720 } 721 722 static void remove_all_osds(struct ceph_osd_client *osdc) 723 { 724 dout("%s %p\n", __func__, osdc); 725 mutex_lock(&osdc->request_mutex); 726 while (!RB_EMPTY_ROOT(&osdc->osds)) { 727 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 728 struct ceph_osd, o_node); 729 __remove_osd(osdc, osd); 730 } 731 mutex_unlock(&osdc->request_mutex); 732 } 733 734 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 735 struct ceph_osd *osd) 736 { 737 dout("__move_osd_to_lru %p\n", osd); 738 BUG_ON(!list_empty(&osd->o_osd_lru)); 739 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 740 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 741 } 742 743 static void __remove_osd_from_lru(struct ceph_osd *osd) 744 { 745 dout("__remove_osd_from_lru %p\n", osd); 746 if (!list_empty(&osd->o_osd_lru)) 747 list_del_init(&osd->o_osd_lru); 748 } 749 750 static void remove_old_osds(struct ceph_osd_client *osdc) 751 { 752 struct ceph_osd *osd, *nosd; 753 754 dout("__remove_old_osds %p\n", osdc); 755 mutex_lock(&osdc->request_mutex); 756 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 757 if (time_before(jiffies, osd->lru_ttl)) 758 break; 759 __remove_osd(osdc, osd); 760 } 761 mutex_unlock(&osdc->request_mutex); 762 } 763 764 /* 765 * reset osd connect 766 */ 767 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 768 { 769 struct ceph_entity_addr *peer_addr; 770 771 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 772 if (list_empty(&osd->o_requests) && 773 list_empty(&osd->o_linger_requests)) { 774 __remove_osd(osdc, osd); 775 776 return -ENODEV; 777 } 778 779 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 780 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 781 !ceph_con_opened(&osd->o_con)) { 782 struct ceph_osd_request *req; 783 784 dout(" osd addr hasn't changed and connection never opened," 785 " letting msgr retry"); 786 /* touch each r_stamp for handle_timeout()'s benfit */ 787 list_for_each_entry(req, &osd->o_requests, r_osd_item) 788 req->r_stamp = jiffies; 789 790 return -EAGAIN; 791 } 792 793 ceph_con_close(&osd->o_con); 794 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 795 osd->o_incarnation++; 796 797 return 0; 798 } 799 800 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 801 { 802 struct rb_node **p = &osdc->osds.rb_node; 803 struct rb_node *parent = NULL; 804 struct ceph_osd *osd = NULL; 805 806 dout("__insert_osd %p osd%d\n", new, new->o_osd); 807 while (*p) { 808 parent = *p; 809 osd = rb_entry(parent, struct ceph_osd, o_node); 810 if (new->o_osd < osd->o_osd) 811 p = &(*p)->rb_left; 812 else if (new->o_osd > osd->o_osd) 813 p = &(*p)->rb_right; 814 else 815 BUG(); 816 } 817 818 rb_link_node(&new->o_node, parent, p); 819 rb_insert_color(&new->o_node, &osdc->osds); 820 } 821 822 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 823 { 824 struct ceph_osd *osd; 825 struct rb_node *n = osdc->osds.rb_node; 826 827 while (n) { 828 osd = rb_entry(n, struct ceph_osd, o_node); 829 if (o < osd->o_osd) 830 n = n->rb_left; 831 else if (o > osd->o_osd) 832 n = n->rb_right; 833 else 834 return osd; 835 } 836 return NULL; 837 } 838 839 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 840 { 841 schedule_delayed_work(&osdc->timeout_work, 842 osdc->client->options->osd_keepalive_timeout * HZ); 843 } 844 845 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 846 { 847 cancel_delayed_work(&osdc->timeout_work); 848 } 849 850 /* 851 * Register request, assign tid. If this is the first request, set up 852 * the timeout event. 853 */ 854 static void __register_request(struct ceph_osd_client *osdc, 855 struct ceph_osd_request *req) 856 { 857 req->r_tid = ++osdc->last_tid; 858 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 859 dout("__register_request %p tid %lld\n", req, req->r_tid); 860 __insert_request(osdc, req); 861 ceph_osdc_get_request(req); 862 osdc->num_requests++; 863 if (osdc->num_requests == 1) { 864 dout(" first request, scheduling timeout\n"); 865 __schedule_osd_timeout(osdc); 866 } 867 } 868 869 /* 870 * called under osdc->request_mutex 871 */ 872 static void __unregister_request(struct ceph_osd_client *osdc, 873 struct ceph_osd_request *req) 874 { 875 if (RB_EMPTY_NODE(&req->r_node)) { 876 dout("__unregister_request %p tid %lld not registered\n", 877 req, req->r_tid); 878 return; 879 } 880 881 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 882 rb_erase(&req->r_node, &osdc->requests); 883 osdc->num_requests--; 884 885 if (req->r_osd) { 886 /* make sure the original request isn't in flight. */ 887 ceph_msg_revoke(req->r_request); 888 889 list_del_init(&req->r_osd_item); 890 if (list_empty(&req->r_osd->o_requests) && 891 list_empty(&req->r_osd->o_linger_requests)) { 892 dout("moving osd to %p lru\n", req->r_osd); 893 __move_osd_to_lru(osdc, req->r_osd); 894 } 895 if (list_empty(&req->r_linger_item)) 896 req->r_osd = NULL; 897 } 898 899 list_del_init(&req->r_req_lru_item); 900 ceph_osdc_put_request(req); 901 902 if (osdc->num_requests == 0) { 903 dout(" no requests, canceling timeout\n"); 904 __cancel_osd_timeout(osdc); 905 } 906 } 907 908 /* 909 * Cancel a previously queued request message 910 */ 911 static void __cancel_request(struct ceph_osd_request *req) 912 { 913 if (req->r_sent && req->r_osd) { 914 ceph_msg_revoke(req->r_request); 915 req->r_sent = 0; 916 } 917 } 918 919 static void __register_linger_request(struct ceph_osd_client *osdc, 920 struct ceph_osd_request *req) 921 { 922 dout("__register_linger_request %p\n", req); 923 list_add_tail(&req->r_linger_item, &osdc->req_linger); 924 if (req->r_osd) 925 list_add_tail(&req->r_linger_osd, 926 &req->r_osd->o_linger_requests); 927 } 928 929 static void __unregister_linger_request(struct ceph_osd_client *osdc, 930 struct ceph_osd_request *req) 931 { 932 dout("__unregister_linger_request %p\n", req); 933 list_del_init(&req->r_linger_item); 934 if (req->r_osd) { 935 list_del_init(&req->r_linger_osd); 936 937 if (list_empty(&req->r_osd->o_requests) && 938 list_empty(&req->r_osd->o_linger_requests)) { 939 dout("moving osd to %p lru\n", req->r_osd); 940 __move_osd_to_lru(osdc, req->r_osd); 941 } 942 if (list_empty(&req->r_osd_item)) 943 req->r_osd = NULL; 944 } 945 } 946 947 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 948 struct ceph_osd_request *req) 949 { 950 mutex_lock(&osdc->request_mutex); 951 if (req->r_linger) { 952 __unregister_linger_request(osdc, req); 953 ceph_osdc_put_request(req); 954 } 955 mutex_unlock(&osdc->request_mutex); 956 } 957 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 958 959 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 960 struct ceph_osd_request *req) 961 { 962 if (!req->r_linger) { 963 dout("set_request_linger %p\n", req); 964 req->r_linger = 1; 965 /* 966 * caller is now responsible for calling 967 * unregister_linger_request 968 */ 969 ceph_osdc_get_request(req); 970 } 971 } 972 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 973 974 /* 975 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 976 * (as needed), and set the request r_osd appropriately. If there is 977 * no up osd, set r_osd to NULL. Move the request to the appropriate list 978 * (unsent, homeless) or leave on in-flight lru. 979 * 980 * Return 0 if unchanged, 1 if changed, or negative on error. 981 * 982 * Caller should hold map_sem for read and request_mutex. 983 */ 984 static int __map_request(struct ceph_osd_client *osdc, 985 struct ceph_osd_request *req, int force_resend) 986 { 987 struct ceph_pg pgid; 988 int acting[CEPH_PG_MAX_SIZE]; 989 int o = -1, num = 0; 990 int err; 991 992 dout("map_request %p tid %lld\n", req, req->r_tid); 993 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 994 ceph_file_layout_pg_pool(req->r_file_layout)); 995 if (err) { 996 list_move(&req->r_req_lru_item, &osdc->req_notarget); 997 return err; 998 } 999 req->r_pgid = pgid; 1000 1001 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 1002 if (err > 0) { 1003 o = acting[0]; 1004 num = err; 1005 } 1006 1007 if ((!force_resend && 1008 req->r_osd && req->r_osd->o_osd == o && 1009 req->r_sent >= req->r_osd->o_incarnation && 1010 req->r_num_pg_osds == num && 1011 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1012 (req->r_osd == NULL && o == -1)) 1013 return 0; /* no change */ 1014 1015 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1016 req->r_tid, pgid.pool, pgid.seed, o, 1017 req->r_osd ? req->r_osd->o_osd : -1); 1018 1019 /* record full pg acting set */ 1020 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1021 req->r_num_pg_osds = num; 1022 1023 if (req->r_osd) { 1024 __cancel_request(req); 1025 list_del_init(&req->r_osd_item); 1026 req->r_osd = NULL; 1027 } 1028 1029 req->r_osd = __lookup_osd(osdc, o); 1030 if (!req->r_osd && o >= 0) { 1031 err = -ENOMEM; 1032 req->r_osd = create_osd(osdc, o); 1033 if (!req->r_osd) { 1034 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1035 goto out; 1036 } 1037 1038 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1039 __insert_osd(osdc, req->r_osd); 1040 1041 ceph_con_open(&req->r_osd->o_con, 1042 CEPH_ENTITY_TYPE_OSD, o, 1043 &osdc->osdmap->osd_addr[o]); 1044 } 1045 1046 if (req->r_osd) { 1047 __remove_osd_from_lru(req->r_osd); 1048 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1049 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1050 } else { 1051 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1052 } 1053 err = 1; /* osd or pg changed */ 1054 1055 out: 1056 return err; 1057 } 1058 1059 /* 1060 * caller should hold map_sem (for read) and request_mutex 1061 */ 1062 static void __send_request(struct ceph_osd_client *osdc, 1063 struct ceph_osd_request *req) 1064 { 1065 void *p; 1066 1067 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1068 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1069 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1070 1071 /* fill in message content that changes each time we send it */ 1072 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1073 put_unaligned_le32(req->r_flags, req->r_request_flags); 1074 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1075 p = req->r_request_pgid; 1076 ceph_encode_64(&p, req->r_pgid.pool); 1077 ceph_encode_32(&p, req->r_pgid.seed); 1078 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1079 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1080 sizeof(req->r_reassert_version)); 1081 1082 req->r_stamp = jiffies; 1083 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1084 1085 ceph_msg_get(req->r_request); /* send consumes a ref */ 1086 ceph_con_send(&req->r_osd->o_con, req->r_request); 1087 req->r_sent = req->r_osd->o_incarnation; 1088 } 1089 1090 /* 1091 * Send any requests in the queue (req_unsent). 1092 */ 1093 static void __send_queued(struct ceph_osd_client *osdc) 1094 { 1095 struct ceph_osd_request *req, *tmp; 1096 1097 dout("__send_queued\n"); 1098 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1099 __send_request(osdc, req); 1100 } 1101 1102 /* 1103 * Timeout callback, called every N seconds when 1 or more osd 1104 * requests has been active for more than N seconds. When this 1105 * happens, we ping all OSDs with requests who have timed out to 1106 * ensure any communications channel reset is detected. Reset the 1107 * request timeouts another N seconds in the future as we go. 1108 * Reschedule the timeout event another N seconds in future (unless 1109 * there are no open requests). 1110 */ 1111 static void handle_timeout(struct work_struct *work) 1112 { 1113 struct ceph_osd_client *osdc = 1114 container_of(work, struct ceph_osd_client, timeout_work.work); 1115 struct ceph_osd_request *req; 1116 struct ceph_osd *osd; 1117 unsigned long keepalive = 1118 osdc->client->options->osd_keepalive_timeout * HZ; 1119 struct list_head slow_osds; 1120 dout("timeout\n"); 1121 down_read(&osdc->map_sem); 1122 1123 ceph_monc_request_next_osdmap(&osdc->client->monc); 1124 1125 mutex_lock(&osdc->request_mutex); 1126 1127 /* 1128 * ping osds that are a bit slow. this ensures that if there 1129 * is a break in the TCP connection we will notice, and reopen 1130 * a connection with that osd (from the fault callback). 1131 */ 1132 INIT_LIST_HEAD(&slow_osds); 1133 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1134 if (time_before(jiffies, req->r_stamp + keepalive)) 1135 break; 1136 1137 osd = req->r_osd; 1138 BUG_ON(!osd); 1139 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1140 req->r_tid, osd->o_osd); 1141 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1142 } 1143 while (!list_empty(&slow_osds)) { 1144 osd = list_entry(slow_osds.next, struct ceph_osd, 1145 o_keepalive_item); 1146 list_del_init(&osd->o_keepalive_item); 1147 ceph_con_keepalive(&osd->o_con); 1148 } 1149 1150 __schedule_osd_timeout(osdc); 1151 __send_queued(osdc); 1152 mutex_unlock(&osdc->request_mutex); 1153 up_read(&osdc->map_sem); 1154 } 1155 1156 static void handle_osds_timeout(struct work_struct *work) 1157 { 1158 struct ceph_osd_client *osdc = 1159 container_of(work, struct ceph_osd_client, 1160 osds_timeout_work.work); 1161 unsigned long delay = 1162 osdc->client->options->osd_idle_ttl * HZ >> 2; 1163 1164 dout("osds timeout\n"); 1165 down_read(&osdc->map_sem); 1166 remove_old_osds(osdc); 1167 up_read(&osdc->map_sem); 1168 1169 schedule_delayed_work(&osdc->osds_timeout_work, 1170 round_jiffies_relative(delay)); 1171 } 1172 1173 static void complete_request(struct ceph_osd_request *req) 1174 { 1175 if (req->r_safe_callback) 1176 req->r_safe_callback(req, NULL); 1177 complete_all(&req->r_safe_completion); /* fsync waiter */ 1178 } 1179 1180 static int __decode_pgid(void **p, void *end, struct ceph_pg *pgid) 1181 { 1182 __u8 v; 1183 1184 ceph_decode_need(p, end, 1 + 8 + 4 + 4, bad); 1185 v = ceph_decode_8(p); 1186 if (v > 1) { 1187 pr_warning("do not understand pg encoding %d > 1", v); 1188 return -EINVAL; 1189 } 1190 pgid->pool = ceph_decode_64(p); 1191 pgid->seed = ceph_decode_32(p); 1192 *p += 4; 1193 return 0; 1194 1195 bad: 1196 pr_warning("incomplete pg encoding"); 1197 return -EINVAL; 1198 } 1199 1200 /* 1201 * handle osd op reply. either call the callback if it is specified, 1202 * or do the completion to wake up the waiting thread. 1203 */ 1204 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1205 struct ceph_connection *con) 1206 { 1207 void *p, *end; 1208 struct ceph_osd_request *req; 1209 u64 tid; 1210 int object_len; 1211 int numops, payload_len, flags; 1212 s32 result; 1213 s32 retry_attempt; 1214 struct ceph_pg pg; 1215 int err; 1216 u32 reassert_epoch; 1217 u64 reassert_version; 1218 u32 osdmap_epoch; 1219 int already_completed; 1220 int i; 1221 1222 tid = le64_to_cpu(msg->hdr.tid); 1223 dout("handle_reply %p tid %llu\n", msg, tid); 1224 1225 p = msg->front.iov_base; 1226 end = p + msg->front.iov_len; 1227 1228 ceph_decode_need(&p, end, 4, bad); 1229 object_len = ceph_decode_32(&p); 1230 ceph_decode_need(&p, end, object_len, bad); 1231 p += object_len; 1232 1233 err = __decode_pgid(&p, end, &pg); 1234 if (err) 1235 goto bad; 1236 1237 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1238 flags = ceph_decode_64(&p); 1239 result = ceph_decode_32(&p); 1240 reassert_epoch = ceph_decode_32(&p); 1241 reassert_version = ceph_decode_64(&p); 1242 osdmap_epoch = ceph_decode_32(&p); 1243 1244 /* lookup */ 1245 mutex_lock(&osdc->request_mutex); 1246 req = __lookup_request(osdc, tid); 1247 if (req == NULL) { 1248 dout("handle_reply tid %llu dne\n", tid); 1249 mutex_unlock(&osdc->request_mutex); 1250 return; 1251 } 1252 ceph_osdc_get_request(req); 1253 1254 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1255 req, result); 1256 1257 ceph_decode_need(&p, end, 4, bad); 1258 numops = ceph_decode_32(&p); 1259 if (numops > CEPH_OSD_MAX_OP) 1260 goto bad_put; 1261 if (numops != req->r_num_ops) 1262 goto bad_put; 1263 payload_len = 0; 1264 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); 1265 for (i = 0; i < numops; i++) { 1266 struct ceph_osd_op *op = p; 1267 int len; 1268 1269 len = le32_to_cpu(op->payload_len); 1270 req->r_reply_op_len[i] = len; 1271 dout(" op %d has %d bytes\n", i, len); 1272 payload_len += len; 1273 p += sizeof(*op); 1274 } 1275 if (payload_len != le32_to_cpu(msg->hdr.data_len)) { 1276 pr_warning("sum of op payload lens %d != data_len %d", 1277 payload_len, le32_to_cpu(msg->hdr.data_len)); 1278 goto bad_put; 1279 } 1280 1281 ceph_decode_need(&p, end, 4 + numops * 4, bad); 1282 retry_attempt = ceph_decode_32(&p); 1283 for (i = 0; i < numops; i++) 1284 req->r_reply_op_result[i] = ceph_decode_32(&p); 1285 1286 /* 1287 * if this connection filled our message, drop our reference now, to 1288 * avoid a (safe but slower) revoke later. 1289 */ 1290 if (req->r_con_filling_msg == con && req->r_reply == msg) { 1291 dout(" dropping con_filling_msg ref %p\n", con); 1292 req->r_con_filling_msg = NULL; 1293 con->ops->put(con); 1294 } 1295 1296 if (!req->r_got_reply) { 1297 unsigned int bytes; 1298 1299 req->r_result = result; 1300 bytes = le32_to_cpu(msg->hdr.data_len); 1301 dout("handle_reply result %d bytes %d\n", req->r_result, 1302 bytes); 1303 if (req->r_result == 0) 1304 req->r_result = bytes; 1305 1306 /* in case this is a write and we need to replay, */ 1307 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1308 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1309 1310 req->r_got_reply = 1; 1311 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1312 dout("handle_reply tid %llu dup ack\n", tid); 1313 mutex_unlock(&osdc->request_mutex); 1314 goto done; 1315 } 1316 1317 dout("handle_reply tid %llu flags %d\n", tid, flags); 1318 1319 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1320 __register_linger_request(osdc, req); 1321 1322 /* either this is a read, or we got the safe response */ 1323 if (result < 0 || 1324 (flags & CEPH_OSD_FLAG_ONDISK) || 1325 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1326 __unregister_request(osdc, req); 1327 1328 already_completed = req->r_completed; 1329 req->r_completed = 1; 1330 mutex_unlock(&osdc->request_mutex); 1331 if (already_completed) 1332 goto done; 1333 1334 if (req->r_callback) 1335 req->r_callback(req, msg); 1336 else 1337 complete_all(&req->r_completion); 1338 1339 if (flags & CEPH_OSD_FLAG_ONDISK) 1340 complete_request(req); 1341 1342 done: 1343 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1344 ceph_osdc_put_request(req); 1345 return; 1346 1347 bad_put: 1348 ceph_osdc_put_request(req); 1349 bad: 1350 pr_err("corrupt osd_op_reply got %d %d\n", 1351 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1352 ceph_msg_dump(msg); 1353 } 1354 1355 static void reset_changed_osds(struct ceph_osd_client *osdc) 1356 { 1357 struct rb_node *p, *n; 1358 1359 for (p = rb_first(&osdc->osds); p; p = n) { 1360 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1361 1362 n = rb_next(p); 1363 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1364 memcmp(&osd->o_con.peer_addr, 1365 ceph_osd_addr(osdc->osdmap, 1366 osd->o_osd), 1367 sizeof(struct ceph_entity_addr)) != 0) 1368 __reset_osd(osdc, osd); 1369 } 1370 } 1371 1372 /* 1373 * Requeue requests whose mapping to an OSD has changed. If requests map to 1374 * no osd, request a new map. 1375 * 1376 * Caller should hold map_sem for read. 1377 */ 1378 static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1379 { 1380 struct ceph_osd_request *req, *nreq; 1381 struct rb_node *p; 1382 int needmap = 0; 1383 int err; 1384 1385 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1386 mutex_lock(&osdc->request_mutex); 1387 for (p = rb_first(&osdc->requests); p; ) { 1388 req = rb_entry(p, struct ceph_osd_request, r_node); 1389 p = rb_next(p); 1390 1391 /* 1392 * For linger requests that have not yet been 1393 * registered, move them to the linger list; they'll 1394 * be sent to the osd in the loop below. Unregister 1395 * the request before re-registering it as a linger 1396 * request to ensure the __map_request() below 1397 * will decide it needs to be sent. 1398 */ 1399 if (req->r_linger && list_empty(&req->r_linger_item)) { 1400 dout("%p tid %llu restart on osd%d\n", 1401 req, req->r_tid, 1402 req->r_osd ? req->r_osd->o_osd : -1); 1403 __unregister_request(osdc, req); 1404 __register_linger_request(osdc, req); 1405 continue; 1406 } 1407 1408 err = __map_request(osdc, req, force_resend); 1409 if (err < 0) 1410 continue; /* error */ 1411 if (req->r_osd == NULL) { 1412 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1413 needmap++; /* request a newer map */ 1414 } else if (err > 0) { 1415 if (!req->r_linger) { 1416 dout("%p tid %llu requeued on osd%d\n", req, 1417 req->r_tid, 1418 req->r_osd ? req->r_osd->o_osd : -1); 1419 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1420 } 1421 } 1422 } 1423 1424 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1425 r_linger_item) { 1426 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1427 1428 err = __map_request(osdc, req, force_resend); 1429 dout("__map_request returned %d\n", err); 1430 if (err == 0) 1431 continue; /* no change and no osd was specified */ 1432 if (err < 0) 1433 continue; /* hrm! */ 1434 if (req->r_osd == NULL) { 1435 dout("tid %llu maps to no valid osd\n", req->r_tid); 1436 needmap++; /* request a newer map */ 1437 continue; 1438 } 1439 1440 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1441 req->r_osd ? req->r_osd->o_osd : -1); 1442 __register_request(osdc, req); 1443 __unregister_linger_request(osdc, req); 1444 } 1445 mutex_unlock(&osdc->request_mutex); 1446 1447 if (needmap) { 1448 dout("%d requests for down osds, need new map\n", needmap); 1449 ceph_monc_request_next_osdmap(&osdc->client->monc); 1450 } 1451 reset_changed_osds(osdc); 1452 } 1453 1454 1455 /* 1456 * Process updated osd map. 1457 * 1458 * The message contains any number of incremental and full maps, normally 1459 * indicating some sort of topology change in the cluster. Kick requests 1460 * off to different OSDs as needed. 1461 */ 1462 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1463 { 1464 void *p, *end, *next; 1465 u32 nr_maps, maplen; 1466 u32 epoch; 1467 struct ceph_osdmap *newmap = NULL, *oldmap; 1468 int err; 1469 struct ceph_fsid fsid; 1470 1471 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1472 p = msg->front.iov_base; 1473 end = p + msg->front.iov_len; 1474 1475 /* verify fsid */ 1476 ceph_decode_need(&p, end, sizeof(fsid), bad); 1477 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1478 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1479 return; 1480 1481 down_write(&osdc->map_sem); 1482 1483 /* incremental maps */ 1484 ceph_decode_32_safe(&p, end, nr_maps, bad); 1485 dout(" %d inc maps\n", nr_maps); 1486 while (nr_maps > 0) { 1487 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1488 epoch = ceph_decode_32(&p); 1489 maplen = ceph_decode_32(&p); 1490 ceph_decode_need(&p, end, maplen, bad); 1491 next = p + maplen; 1492 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1493 dout("applying incremental map %u len %d\n", 1494 epoch, maplen); 1495 newmap = osdmap_apply_incremental(&p, next, 1496 osdc->osdmap, 1497 &osdc->client->msgr); 1498 if (IS_ERR(newmap)) { 1499 err = PTR_ERR(newmap); 1500 goto bad; 1501 } 1502 BUG_ON(!newmap); 1503 if (newmap != osdc->osdmap) { 1504 ceph_osdmap_destroy(osdc->osdmap); 1505 osdc->osdmap = newmap; 1506 } 1507 kick_requests(osdc, 0); 1508 } else { 1509 dout("ignoring incremental map %u len %d\n", 1510 epoch, maplen); 1511 } 1512 p = next; 1513 nr_maps--; 1514 } 1515 if (newmap) 1516 goto done; 1517 1518 /* full maps */ 1519 ceph_decode_32_safe(&p, end, nr_maps, bad); 1520 dout(" %d full maps\n", nr_maps); 1521 while (nr_maps) { 1522 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1523 epoch = ceph_decode_32(&p); 1524 maplen = ceph_decode_32(&p); 1525 ceph_decode_need(&p, end, maplen, bad); 1526 if (nr_maps > 1) { 1527 dout("skipping non-latest full map %u len %d\n", 1528 epoch, maplen); 1529 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1530 dout("skipping full map %u len %d, " 1531 "older than our %u\n", epoch, maplen, 1532 osdc->osdmap->epoch); 1533 } else { 1534 int skipped_map = 0; 1535 1536 dout("taking full map %u len %d\n", epoch, maplen); 1537 newmap = osdmap_decode(&p, p+maplen); 1538 if (IS_ERR(newmap)) { 1539 err = PTR_ERR(newmap); 1540 goto bad; 1541 } 1542 BUG_ON(!newmap); 1543 oldmap = osdc->osdmap; 1544 osdc->osdmap = newmap; 1545 if (oldmap) { 1546 if (oldmap->epoch + 1 < newmap->epoch) 1547 skipped_map = 1; 1548 ceph_osdmap_destroy(oldmap); 1549 } 1550 kick_requests(osdc, skipped_map); 1551 } 1552 p += maplen; 1553 nr_maps--; 1554 } 1555 1556 done: 1557 downgrade_write(&osdc->map_sem); 1558 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1559 1560 /* 1561 * subscribe to subsequent osdmap updates if full to ensure 1562 * we find out when we are no longer full and stop returning 1563 * ENOSPC. 1564 */ 1565 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1566 ceph_monc_request_next_osdmap(&osdc->client->monc); 1567 1568 mutex_lock(&osdc->request_mutex); 1569 __send_queued(osdc); 1570 mutex_unlock(&osdc->request_mutex); 1571 up_read(&osdc->map_sem); 1572 wake_up_all(&osdc->client->auth_wq); 1573 return; 1574 1575 bad: 1576 pr_err("osdc handle_map corrupt msg\n"); 1577 ceph_msg_dump(msg); 1578 up_write(&osdc->map_sem); 1579 return; 1580 } 1581 1582 /* 1583 * watch/notify callback event infrastructure 1584 * 1585 * These callbacks are used both for watch and notify operations. 1586 */ 1587 static void __release_event(struct kref *kref) 1588 { 1589 struct ceph_osd_event *event = 1590 container_of(kref, struct ceph_osd_event, kref); 1591 1592 dout("__release_event %p\n", event); 1593 kfree(event); 1594 } 1595 1596 static void get_event(struct ceph_osd_event *event) 1597 { 1598 kref_get(&event->kref); 1599 } 1600 1601 void ceph_osdc_put_event(struct ceph_osd_event *event) 1602 { 1603 kref_put(&event->kref, __release_event); 1604 } 1605 EXPORT_SYMBOL(ceph_osdc_put_event); 1606 1607 static void __insert_event(struct ceph_osd_client *osdc, 1608 struct ceph_osd_event *new) 1609 { 1610 struct rb_node **p = &osdc->event_tree.rb_node; 1611 struct rb_node *parent = NULL; 1612 struct ceph_osd_event *event = NULL; 1613 1614 while (*p) { 1615 parent = *p; 1616 event = rb_entry(parent, struct ceph_osd_event, node); 1617 if (new->cookie < event->cookie) 1618 p = &(*p)->rb_left; 1619 else if (new->cookie > event->cookie) 1620 p = &(*p)->rb_right; 1621 else 1622 BUG(); 1623 } 1624 1625 rb_link_node(&new->node, parent, p); 1626 rb_insert_color(&new->node, &osdc->event_tree); 1627 } 1628 1629 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1630 u64 cookie) 1631 { 1632 struct rb_node **p = &osdc->event_tree.rb_node; 1633 struct rb_node *parent = NULL; 1634 struct ceph_osd_event *event = NULL; 1635 1636 while (*p) { 1637 parent = *p; 1638 event = rb_entry(parent, struct ceph_osd_event, node); 1639 if (cookie < event->cookie) 1640 p = &(*p)->rb_left; 1641 else if (cookie > event->cookie) 1642 p = &(*p)->rb_right; 1643 else 1644 return event; 1645 } 1646 return NULL; 1647 } 1648 1649 static void __remove_event(struct ceph_osd_event *event) 1650 { 1651 struct ceph_osd_client *osdc = event->osdc; 1652 1653 if (!RB_EMPTY_NODE(&event->node)) { 1654 dout("__remove_event removed %p\n", event); 1655 rb_erase(&event->node, &osdc->event_tree); 1656 ceph_osdc_put_event(event); 1657 } else { 1658 dout("__remove_event didn't remove %p\n", event); 1659 } 1660 } 1661 1662 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1663 void (*event_cb)(u64, u64, u8, void *), 1664 void *data, struct ceph_osd_event **pevent) 1665 { 1666 struct ceph_osd_event *event; 1667 1668 event = kmalloc(sizeof(*event), GFP_NOIO); 1669 if (!event) 1670 return -ENOMEM; 1671 1672 dout("create_event %p\n", event); 1673 event->cb = event_cb; 1674 event->one_shot = 0; 1675 event->data = data; 1676 event->osdc = osdc; 1677 INIT_LIST_HEAD(&event->osd_node); 1678 RB_CLEAR_NODE(&event->node); 1679 kref_init(&event->kref); /* one ref for us */ 1680 kref_get(&event->kref); /* one ref for the caller */ 1681 1682 spin_lock(&osdc->event_lock); 1683 event->cookie = ++osdc->event_count; 1684 __insert_event(osdc, event); 1685 spin_unlock(&osdc->event_lock); 1686 1687 *pevent = event; 1688 return 0; 1689 } 1690 EXPORT_SYMBOL(ceph_osdc_create_event); 1691 1692 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1693 { 1694 struct ceph_osd_client *osdc = event->osdc; 1695 1696 dout("cancel_event %p\n", event); 1697 spin_lock(&osdc->event_lock); 1698 __remove_event(event); 1699 spin_unlock(&osdc->event_lock); 1700 ceph_osdc_put_event(event); /* caller's */ 1701 } 1702 EXPORT_SYMBOL(ceph_osdc_cancel_event); 1703 1704 1705 static void do_event_work(struct work_struct *work) 1706 { 1707 struct ceph_osd_event_work *event_work = 1708 container_of(work, struct ceph_osd_event_work, work); 1709 struct ceph_osd_event *event = event_work->event; 1710 u64 ver = event_work->ver; 1711 u64 notify_id = event_work->notify_id; 1712 u8 opcode = event_work->opcode; 1713 1714 dout("do_event_work completing %p\n", event); 1715 event->cb(ver, notify_id, opcode, event->data); 1716 dout("do_event_work completed %p\n", event); 1717 ceph_osdc_put_event(event); 1718 kfree(event_work); 1719 } 1720 1721 1722 /* 1723 * Process osd watch notifications 1724 */ 1725 static void handle_watch_notify(struct ceph_osd_client *osdc, 1726 struct ceph_msg *msg) 1727 { 1728 void *p, *end; 1729 u8 proto_ver; 1730 u64 cookie, ver, notify_id; 1731 u8 opcode; 1732 struct ceph_osd_event *event; 1733 struct ceph_osd_event_work *event_work; 1734 1735 p = msg->front.iov_base; 1736 end = p + msg->front.iov_len; 1737 1738 ceph_decode_8_safe(&p, end, proto_ver, bad); 1739 ceph_decode_8_safe(&p, end, opcode, bad); 1740 ceph_decode_64_safe(&p, end, cookie, bad); 1741 ceph_decode_64_safe(&p, end, ver, bad); 1742 ceph_decode_64_safe(&p, end, notify_id, bad); 1743 1744 spin_lock(&osdc->event_lock); 1745 event = __find_event(osdc, cookie); 1746 if (event) { 1747 BUG_ON(event->one_shot); 1748 get_event(event); 1749 } 1750 spin_unlock(&osdc->event_lock); 1751 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1752 cookie, ver, event); 1753 if (event) { 1754 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1755 if (!event_work) { 1756 dout("ERROR: could not allocate event_work\n"); 1757 goto done_err; 1758 } 1759 INIT_WORK(&event_work->work, do_event_work); 1760 event_work->event = event; 1761 event_work->ver = ver; 1762 event_work->notify_id = notify_id; 1763 event_work->opcode = opcode; 1764 if (!queue_work(osdc->notify_wq, &event_work->work)) { 1765 dout("WARNING: failed to queue notify event work\n"); 1766 goto done_err; 1767 } 1768 } 1769 1770 return; 1771 1772 done_err: 1773 ceph_osdc_put_event(event); 1774 return; 1775 1776 bad: 1777 pr_err("osdc handle_watch_notify corrupt msg\n"); 1778 return; 1779 } 1780 1781 static void ceph_osdc_msg_data_set(struct ceph_msg *msg, 1782 struct ceph_osd_data *osd_data) 1783 { 1784 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 1785 BUG_ON(osd_data->length > (u64) SIZE_MAX); 1786 if (osd_data->length) 1787 ceph_msg_data_set_pages(msg, osd_data->pages, 1788 osd_data->length, osd_data->alignment); 1789 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 1790 BUG_ON(!osd_data->pagelist->length); 1791 ceph_msg_data_set_pagelist(msg, osd_data->pagelist); 1792 #ifdef CONFIG_BLOCK 1793 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 1794 ceph_msg_data_set_bio(msg, osd_data->bio); 1795 #endif 1796 } else { 1797 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 1798 } 1799 } 1800 1801 /* 1802 * Register request, send initial attempt. 1803 */ 1804 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1805 struct ceph_osd_request *req, 1806 bool nofail) 1807 { 1808 int rc = 0; 1809 1810 /* Set up response incoming data and request outgoing data fields */ 1811 1812 ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in); 1813 ceph_osdc_msg_data_set(req->r_request, &req->r_data_out); 1814 1815 down_read(&osdc->map_sem); 1816 mutex_lock(&osdc->request_mutex); 1817 __register_request(osdc, req); 1818 WARN_ON(req->r_sent); 1819 rc = __map_request(osdc, req, 0); 1820 if (rc < 0) { 1821 if (nofail) { 1822 dout("osdc_start_request failed map, " 1823 " will retry %lld\n", req->r_tid); 1824 rc = 0; 1825 } 1826 goto out_unlock; 1827 } 1828 if (req->r_osd == NULL) { 1829 dout("send_request %p no up osds in pg\n", req); 1830 ceph_monc_request_next_osdmap(&osdc->client->monc); 1831 } else { 1832 __send_queued(osdc); 1833 } 1834 rc = 0; 1835 out_unlock: 1836 mutex_unlock(&osdc->request_mutex); 1837 up_read(&osdc->map_sem); 1838 return rc; 1839 } 1840 EXPORT_SYMBOL(ceph_osdc_start_request); 1841 1842 /* 1843 * wait for a request to complete 1844 */ 1845 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1846 struct ceph_osd_request *req) 1847 { 1848 int rc; 1849 1850 rc = wait_for_completion_interruptible(&req->r_completion); 1851 if (rc < 0) { 1852 mutex_lock(&osdc->request_mutex); 1853 __cancel_request(req); 1854 __unregister_request(osdc, req); 1855 mutex_unlock(&osdc->request_mutex); 1856 complete_request(req); 1857 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1858 return rc; 1859 } 1860 1861 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1862 return req->r_result; 1863 } 1864 EXPORT_SYMBOL(ceph_osdc_wait_request); 1865 1866 /* 1867 * sync - wait for all in-flight requests to flush. avoid starvation. 1868 */ 1869 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1870 { 1871 struct ceph_osd_request *req; 1872 u64 last_tid, next_tid = 0; 1873 1874 mutex_lock(&osdc->request_mutex); 1875 last_tid = osdc->last_tid; 1876 while (1) { 1877 req = __lookup_request_ge(osdc, next_tid); 1878 if (!req) 1879 break; 1880 if (req->r_tid > last_tid) 1881 break; 1882 1883 next_tid = req->r_tid + 1; 1884 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1885 continue; 1886 1887 ceph_osdc_get_request(req); 1888 mutex_unlock(&osdc->request_mutex); 1889 dout("sync waiting on tid %llu (last is %llu)\n", 1890 req->r_tid, last_tid); 1891 wait_for_completion(&req->r_safe_completion); 1892 mutex_lock(&osdc->request_mutex); 1893 ceph_osdc_put_request(req); 1894 } 1895 mutex_unlock(&osdc->request_mutex); 1896 dout("sync done (thru tid %llu)\n", last_tid); 1897 } 1898 EXPORT_SYMBOL(ceph_osdc_sync); 1899 1900 /* 1901 * init, shutdown 1902 */ 1903 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1904 { 1905 int err; 1906 1907 dout("init\n"); 1908 osdc->client = client; 1909 osdc->osdmap = NULL; 1910 init_rwsem(&osdc->map_sem); 1911 init_completion(&osdc->map_waiters); 1912 osdc->last_requested_map = 0; 1913 mutex_init(&osdc->request_mutex); 1914 osdc->last_tid = 0; 1915 osdc->osds = RB_ROOT; 1916 INIT_LIST_HEAD(&osdc->osd_lru); 1917 osdc->requests = RB_ROOT; 1918 INIT_LIST_HEAD(&osdc->req_lru); 1919 INIT_LIST_HEAD(&osdc->req_unsent); 1920 INIT_LIST_HEAD(&osdc->req_notarget); 1921 INIT_LIST_HEAD(&osdc->req_linger); 1922 osdc->num_requests = 0; 1923 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1924 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1925 spin_lock_init(&osdc->event_lock); 1926 osdc->event_tree = RB_ROOT; 1927 osdc->event_count = 0; 1928 1929 schedule_delayed_work(&osdc->osds_timeout_work, 1930 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1931 1932 err = -ENOMEM; 1933 osdc->req_mempool = mempool_create_kmalloc_pool(10, 1934 sizeof(struct ceph_osd_request)); 1935 if (!osdc->req_mempool) 1936 goto out; 1937 1938 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 1939 OSD_OP_FRONT_LEN, 10, true, 1940 "osd_op"); 1941 if (err < 0) 1942 goto out_mempool; 1943 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 1944 OSD_OPREPLY_FRONT_LEN, 10, true, 1945 "osd_op_reply"); 1946 if (err < 0) 1947 goto out_msgpool; 1948 1949 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 1950 if (IS_ERR(osdc->notify_wq)) { 1951 err = PTR_ERR(osdc->notify_wq); 1952 osdc->notify_wq = NULL; 1953 goto out_msgpool; 1954 } 1955 return 0; 1956 1957 out_msgpool: 1958 ceph_msgpool_destroy(&osdc->msgpool_op); 1959 out_mempool: 1960 mempool_destroy(osdc->req_mempool); 1961 out: 1962 return err; 1963 } 1964 1965 void ceph_osdc_stop(struct ceph_osd_client *osdc) 1966 { 1967 flush_workqueue(osdc->notify_wq); 1968 destroy_workqueue(osdc->notify_wq); 1969 cancel_delayed_work_sync(&osdc->timeout_work); 1970 cancel_delayed_work_sync(&osdc->osds_timeout_work); 1971 if (osdc->osdmap) { 1972 ceph_osdmap_destroy(osdc->osdmap); 1973 osdc->osdmap = NULL; 1974 } 1975 remove_all_osds(osdc); 1976 mempool_destroy(osdc->req_mempool); 1977 ceph_msgpool_destroy(&osdc->msgpool_op); 1978 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 1979 } 1980 1981 /* 1982 * Read some contiguous pages. If we cross a stripe boundary, shorten 1983 * *plen. Return number of bytes read, or error. 1984 */ 1985 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 1986 struct ceph_vino vino, struct ceph_file_layout *layout, 1987 u64 off, u64 *plen, 1988 u32 truncate_seq, u64 truncate_size, 1989 struct page **pages, int num_pages, int page_align) 1990 { 1991 struct ceph_osd_request *req; 1992 struct ceph_osd_data *osd_data; 1993 int rc = 0; 1994 1995 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 1996 vino.snap, off, *plen); 1997 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1998 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 1999 NULL, 0, truncate_seq, truncate_size, NULL, 2000 false); 2001 if (IS_ERR(req)) 2002 return PTR_ERR(req); 2003 2004 /* it may be a short read due to an object boundary */ 2005 2006 osd_data = &req->r_data_in; 2007 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2008 osd_data->pages = pages; 2009 osd_data->length = *plen; 2010 osd_data->alignment = page_align; 2011 2012 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2013 off, *plen, osd_data->length, page_align); 2014 2015 rc = ceph_osdc_start_request(osdc, req, false); 2016 if (!rc) 2017 rc = ceph_osdc_wait_request(osdc, req); 2018 2019 ceph_osdc_put_request(req); 2020 dout("readpages result %d\n", rc); 2021 return rc; 2022 } 2023 EXPORT_SYMBOL(ceph_osdc_readpages); 2024 2025 /* 2026 * do a synchronous write on N pages 2027 */ 2028 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2029 struct ceph_file_layout *layout, 2030 struct ceph_snap_context *snapc, 2031 u64 off, u64 len, 2032 u32 truncate_seq, u64 truncate_size, 2033 struct timespec *mtime, 2034 struct page **pages, int num_pages) 2035 { 2036 struct ceph_osd_request *req; 2037 struct ceph_osd_data *osd_data; 2038 int rc = 0; 2039 int page_align = off & ~PAGE_MASK; 2040 2041 BUG_ON(vino.snap != CEPH_NOSNAP); 2042 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 2043 CEPH_OSD_OP_WRITE, 2044 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2045 snapc, 0, 2046 truncate_seq, truncate_size, mtime, 2047 true); 2048 if (IS_ERR(req)) 2049 return PTR_ERR(req); 2050 2051 /* it may be a short write due to an object boundary */ 2052 osd_data = &req->r_data_out; 2053 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2054 osd_data->pages = pages; 2055 osd_data->length = len; 2056 osd_data->alignment = page_align; 2057 dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length); 2058 2059 rc = ceph_osdc_start_request(osdc, req, true); 2060 if (!rc) 2061 rc = ceph_osdc_wait_request(osdc, req); 2062 2063 ceph_osdc_put_request(req); 2064 if (rc == 0) 2065 rc = len; 2066 dout("writepages result %d\n", rc); 2067 return rc; 2068 } 2069 EXPORT_SYMBOL(ceph_osdc_writepages); 2070 2071 /* 2072 * handle incoming message 2073 */ 2074 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2075 { 2076 struct ceph_osd *osd = con->private; 2077 struct ceph_osd_client *osdc; 2078 int type = le16_to_cpu(msg->hdr.type); 2079 2080 if (!osd) 2081 goto out; 2082 osdc = osd->o_osdc; 2083 2084 switch (type) { 2085 case CEPH_MSG_OSD_MAP: 2086 ceph_osdc_handle_map(osdc, msg); 2087 break; 2088 case CEPH_MSG_OSD_OPREPLY: 2089 handle_reply(osdc, msg, con); 2090 break; 2091 case CEPH_MSG_WATCH_NOTIFY: 2092 handle_watch_notify(osdc, msg); 2093 break; 2094 2095 default: 2096 pr_err("received unknown message type %d %s\n", type, 2097 ceph_msg_type_name(type)); 2098 } 2099 out: 2100 ceph_msg_put(msg); 2101 } 2102 2103 /* 2104 * lookup and return message for incoming reply. set up reply message 2105 * pages. 2106 */ 2107 static struct ceph_msg *get_reply(struct ceph_connection *con, 2108 struct ceph_msg_header *hdr, 2109 int *skip) 2110 { 2111 struct ceph_osd *osd = con->private; 2112 struct ceph_osd_client *osdc = osd->o_osdc; 2113 struct ceph_msg *m; 2114 struct ceph_osd_request *req; 2115 int front = le32_to_cpu(hdr->front_len); 2116 int data_len = le32_to_cpu(hdr->data_len); 2117 u64 tid; 2118 2119 tid = le64_to_cpu(hdr->tid); 2120 mutex_lock(&osdc->request_mutex); 2121 req = __lookup_request(osdc, tid); 2122 if (!req) { 2123 *skip = 1; 2124 m = NULL; 2125 dout("get_reply unknown tid %llu from osd%d\n", tid, 2126 osd->o_osd); 2127 goto out; 2128 } 2129 2130 if (req->r_con_filling_msg) { 2131 dout("%s revoking msg %p from old con %p\n", __func__, 2132 req->r_reply, req->r_con_filling_msg); 2133 ceph_msg_revoke_incoming(req->r_reply); 2134 req->r_con_filling_msg->ops->put(req->r_con_filling_msg); 2135 req->r_con_filling_msg = NULL; 2136 } 2137 2138 if (front > req->r_reply->front.iov_len) { 2139 pr_warning("get_reply front %d > preallocated %d\n", 2140 front, (int)req->r_reply->front.iov_len); 2141 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2142 if (!m) 2143 goto out; 2144 ceph_msg_put(req->r_reply); 2145 req->r_reply = m; 2146 } 2147 m = ceph_msg_get(req->r_reply); 2148 2149 if (data_len > 0) { 2150 struct ceph_osd_data *osd_data = &req->r_data_in; 2151 2152 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2153 if (osd_data->pages && 2154 unlikely(osd_data->length < data_len)) { 2155 2156 pr_warning("tid %lld reply has %d bytes " 2157 "we had only %llu bytes ready\n", 2158 tid, data_len, osd_data->length); 2159 *skip = 1; 2160 ceph_msg_put(m); 2161 m = NULL; 2162 goto out; 2163 } 2164 } 2165 } 2166 *skip = 0; 2167 req->r_con_filling_msg = con->ops->get(con); 2168 dout("get_reply tid %lld %p\n", tid, m); 2169 2170 out: 2171 mutex_unlock(&osdc->request_mutex); 2172 return m; 2173 2174 } 2175 2176 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2177 struct ceph_msg_header *hdr, 2178 int *skip) 2179 { 2180 struct ceph_osd *osd = con->private; 2181 int type = le16_to_cpu(hdr->type); 2182 int front = le32_to_cpu(hdr->front_len); 2183 2184 *skip = 0; 2185 switch (type) { 2186 case CEPH_MSG_OSD_MAP: 2187 case CEPH_MSG_WATCH_NOTIFY: 2188 return ceph_msg_new(type, front, GFP_NOFS, false); 2189 case CEPH_MSG_OSD_OPREPLY: 2190 return get_reply(con, hdr, skip); 2191 default: 2192 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2193 osd->o_osd); 2194 *skip = 1; 2195 return NULL; 2196 } 2197 } 2198 2199 /* 2200 * Wrappers to refcount containing ceph_osd struct 2201 */ 2202 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2203 { 2204 struct ceph_osd *osd = con->private; 2205 if (get_osd(osd)) 2206 return con; 2207 return NULL; 2208 } 2209 2210 static void put_osd_con(struct ceph_connection *con) 2211 { 2212 struct ceph_osd *osd = con->private; 2213 put_osd(osd); 2214 } 2215 2216 /* 2217 * authentication 2218 */ 2219 /* 2220 * Note: returned pointer is the address of a structure that's 2221 * managed separately. Caller must *not* attempt to free it. 2222 */ 2223 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2224 int *proto, int force_new) 2225 { 2226 struct ceph_osd *o = con->private; 2227 struct ceph_osd_client *osdc = o->o_osdc; 2228 struct ceph_auth_client *ac = osdc->client->monc.auth; 2229 struct ceph_auth_handshake *auth = &o->o_auth; 2230 2231 if (force_new && auth->authorizer) { 2232 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2233 auth->authorizer = NULL; 2234 } 2235 if (!auth->authorizer) { 2236 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2237 auth); 2238 if (ret) 2239 return ERR_PTR(ret); 2240 } else { 2241 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2242 auth); 2243 if (ret) 2244 return ERR_PTR(ret); 2245 } 2246 *proto = ac->protocol; 2247 2248 return auth; 2249 } 2250 2251 2252 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2253 { 2254 struct ceph_osd *o = con->private; 2255 struct ceph_osd_client *osdc = o->o_osdc; 2256 struct ceph_auth_client *ac = osdc->client->monc.auth; 2257 2258 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2259 } 2260 2261 static int invalidate_authorizer(struct ceph_connection *con) 2262 { 2263 struct ceph_osd *o = con->private; 2264 struct ceph_osd_client *osdc = o->o_osdc; 2265 struct ceph_auth_client *ac = osdc->client->monc.auth; 2266 2267 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2268 return ceph_monc_validate_auth(&osdc->client->monc); 2269 } 2270 2271 static const struct ceph_connection_operations osd_con_ops = { 2272 .get = get_osd_con, 2273 .put = put_osd_con, 2274 .dispatch = dispatch, 2275 .get_authorizer = get_authorizer, 2276 .verify_authorizer_reply = verify_authorizer_reply, 2277 .invalidate_authorizer = invalidate_authorizer, 2278 .alloc_msg = alloc_msg, 2279 .fault = osd_reset, 2280 }; 2281