1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 26 static void __send_queued(struct ceph_osd_client *osdc); 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 static void __register_request(struct ceph_osd_client *osdc, 29 struct ceph_osd_request *req); 30 static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __send_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 35 /* 36 * Implement client access to distributed object storage cluster. 37 * 38 * All data objects are stored within a cluster/cloud of OSDs, or 39 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 40 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 41 * remote daemons serving up and coordinating consistent and safe 42 * access to storage. 43 * 44 * Cluster membership and the mapping of data objects onto storage devices 45 * are described by the osd map. 46 * 47 * We keep track of pending OSD requests (read, write), resubmit 48 * requests to different OSDs when the cluster topology/data layout 49 * change, or retry the affected requests when the communications 50 * channel with an OSD is reset. 51 */ 52 53 /* 54 * calculate the mapping of a file extent onto an object, and fill out the 55 * request accordingly. shorten extent as necessary if it crosses an 56 * object boundary. 57 * 58 * fill osd op in request message. 59 */ 60 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 61 u64 *objnum, u64 *objoff, u64 *objlen) 62 { 63 u64 orig_len = *plen; 64 int r; 65 66 /* object extent? */ 67 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 68 objoff, objlen); 69 if (r < 0) 70 return r; 71 if (*objlen < orig_len) { 72 *plen = *objlen; 73 dout(" skipping last %llu, final file extent %llu~%llu\n", 74 orig_len - *plen, off, *plen); 75 } 76 77 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 78 79 return 0; 80 } 81 82 /* 83 * requests 84 */ 85 void ceph_osdc_release_request(struct kref *kref) 86 { 87 int num_pages; 88 struct ceph_osd_request *req = container_of(kref, 89 struct ceph_osd_request, 90 r_kref); 91 92 if (req->r_request) 93 ceph_msg_put(req->r_request); 94 if (req->r_reply) { 95 ceph_msg_revoke_incoming(req->r_reply); 96 ceph_msg_put(req->r_reply); 97 } 98 99 if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES && 100 req->r_data_in.own_pages) { 101 num_pages = calc_pages_for((u64)req->r_data_in.alignment, 102 (u64)req->r_data_in.length); 103 ceph_release_page_vector(req->r_data_in.pages, num_pages); 104 } 105 if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES && 106 req->r_data_out.own_pages) { 107 num_pages = calc_pages_for((u64)req->r_data_out.alignment, 108 (u64)req->r_data_out.length); 109 ceph_release_page_vector(req->r_data_out.pages, num_pages); 110 } 111 112 ceph_put_snap_context(req->r_snapc); 113 if (req->r_mempool) 114 mempool_free(req, req->r_osdc->req_mempool); 115 else 116 kfree(req); 117 } 118 EXPORT_SYMBOL(ceph_osdc_release_request); 119 120 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 121 struct ceph_snap_context *snapc, 122 unsigned int num_ops, 123 bool use_mempool, 124 gfp_t gfp_flags) 125 { 126 struct ceph_osd_request *req; 127 struct ceph_msg *msg; 128 size_t msg_size; 129 130 msg_size = 4 + 4 + 8 + 8 + 4+8; 131 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 132 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 133 msg_size += 4 + MAX_OBJ_NAME_SIZE; 134 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 135 msg_size += 8; /* snapid */ 136 msg_size += 8; /* snap_seq */ 137 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 138 msg_size += 4; 139 140 if (use_mempool) { 141 req = mempool_alloc(osdc->req_mempool, gfp_flags); 142 memset(req, 0, sizeof(*req)); 143 } else { 144 req = kzalloc(sizeof(*req), gfp_flags); 145 } 146 if (req == NULL) 147 return NULL; 148 149 req->r_osdc = osdc; 150 req->r_mempool = use_mempool; 151 152 kref_init(&req->r_kref); 153 init_completion(&req->r_completion); 154 init_completion(&req->r_safe_completion); 155 RB_CLEAR_NODE(&req->r_node); 156 INIT_LIST_HEAD(&req->r_unsafe_item); 157 INIT_LIST_HEAD(&req->r_linger_item); 158 INIT_LIST_HEAD(&req->r_linger_osd); 159 INIT_LIST_HEAD(&req->r_req_lru_item); 160 INIT_LIST_HEAD(&req->r_osd_item); 161 162 /* create reply message */ 163 if (use_mempool) 164 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 165 else 166 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 167 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 168 if (!msg) { 169 ceph_osdc_put_request(req); 170 return NULL; 171 } 172 req->r_reply = msg; 173 174 req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE; 175 req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE; 176 177 /* create request message; allow space for oid */ 178 if (use_mempool) 179 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 180 else 181 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 182 if (!msg) { 183 ceph_osdc_put_request(req); 184 return NULL; 185 } 186 187 memset(msg->front.iov_base, 0, msg->front.iov_len); 188 189 req->r_request = msg; 190 191 return req; 192 } 193 EXPORT_SYMBOL(ceph_osdc_alloc_request); 194 195 static bool osd_req_opcode_valid(u16 opcode) 196 { 197 switch (opcode) { 198 case CEPH_OSD_OP_READ: 199 case CEPH_OSD_OP_STAT: 200 case CEPH_OSD_OP_MAPEXT: 201 case CEPH_OSD_OP_MASKTRUNC: 202 case CEPH_OSD_OP_SPARSE_READ: 203 case CEPH_OSD_OP_NOTIFY: 204 case CEPH_OSD_OP_NOTIFY_ACK: 205 case CEPH_OSD_OP_ASSERT_VER: 206 case CEPH_OSD_OP_WRITE: 207 case CEPH_OSD_OP_WRITEFULL: 208 case CEPH_OSD_OP_TRUNCATE: 209 case CEPH_OSD_OP_ZERO: 210 case CEPH_OSD_OP_DELETE: 211 case CEPH_OSD_OP_APPEND: 212 case CEPH_OSD_OP_STARTSYNC: 213 case CEPH_OSD_OP_SETTRUNC: 214 case CEPH_OSD_OP_TRIMTRUNC: 215 case CEPH_OSD_OP_TMAPUP: 216 case CEPH_OSD_OP_TMAPPUT: 217 case CEPH_OSD_OP_TMAPGET: 218 case CEPH_OSD_OP_CREATE: 219 case CEPH_OSD_OP_ROLLBACK: 220 case CEPH_OSD_OP_WATCH: 221 case CEPH_OSD_OP_OMAPGETKEYS: 222 case CEPH_OSD_OP_OMAPGETVALS: 223 case CEPH_OSD_OP_OMAPGETHEADER: 224 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 225 case CEPH_OSD_OP_OMAPSETVALS: 226 case CEPH_OSD_OP_OMAPSETHEADER: 227 case CEPH_OSD_OP_OMAPCLEAR: 228 case CEPH_OSD_OP_OMAPRMKEYS: 229 case CEPH_OSD_OP_OMAP_CMP: 230 case CEPH_OSD_OP_CLONERANGE: 231 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 232 case CEPH_OSD_OP_SRC_CMPXATTR: 233 case CEPH_OSD_OP_GETXATTR: 234 case CEPH_OSD_OP_GETXATTRS: 235 case CEPH_OSD_OP_CMPXATTR: 236 case CEPH_OSD_OP_SETXATTR: 237 case CEPH_OSD_OP_SETXATTRS: 238 case CEPH_OSD_OP_RESETXATTRS: 239 case CEPH_OSD_OP_RMXATTR: 240 case CEPH_OSD_OP_PULL: 241 case CEPH_OSD_OP_PUSH: 242 case CEPH_OSD_OP_BALANCEREADS: 243 case CEPH_OSD_OP_UNBALANCEREADS: 244 case CEPH_OSD_OP_SCRUB: 245 case CEPH_OSD_OP_SCRUB_RESERVE: 246 case CEPH_OSD_OP_SCRUB_UNRESERVE: 247 case CEPH_OSD_OP_SCRUB_STOP: 248 case CEPH_OSD_OP_SCRUB_MAP: 249 case CEPH_OSD_OP_WRLOCK: 250 case CEPH_OSD_OP_WRUNLOCK: 251 case CEPH_OSD_OP_RDLOCK: 252 case CEPH_OSD_OP_RDUNLOCK: 253 case CEPH_OSD_OP_UPLOCK: 254 case CEPH_OSD_OP_DNLOCK: 255 case CEPH_OSD_OP_CALL: 256 case CEPH_OSD_OP_PGLS: 257 case CEPH_OSD_OP_PGLS_FILTER: 258 return true; 259 default: 260 return false; 261 } 262 } 263 264 /* 265 * This is an osd op init function for opcodes that have no data or 266 * other information associated with them. It also serves as a 267 * common init routine for all the other init functions, below. 268 */ 269 void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode) 270 { 271 BUG_ON(!osd_req_opcode_valid(opcode)); 272 273 memset(op, 0, sizeof (*op)); 274 275 op->op = opcode; 276 } 277 278 void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, 279 u64 offset, u64 length, 280 u64 truncate_size, u32 truncate_seq) 281 { 282 size_t payload_len = 0; 283 284 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 285 286 osd_req_op_init(op, opcode); 287 288 op->extent.offset = offset; 289 op->extent.length = length; 290 op->extent.truncate_size = truncate_size; 291 op->extent.truncate_seq = truncate_seq; 292 if (opcode == CEPH_OSD_OP_WRITE) 293 payload_len += length; 294 295 op->payload_len = payload_len; 296 } 297 EXPORT_SYMBOL(osd_req_op_extent_init); 298 299 void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, 300 const char *class, const char *method, 301 const void *request_data, size_t request_data_size) 302 { 303 size_t payload_len = 0; 304 size_t size; 305 306 BUG_ON(opcode != CEPH_OSD_OP_CALL); 307 308 osd_req_op_init(op, opcode); 309 310 op->cls.class_name = class; 311 size = strlen(class); 312 BUG_ON(size > (size_t) U8_MAX); 313 op->cls.class_len = size; 314 payload_len += size; 315 316 op->cls.method_name = method; 317 size = strlen(method); 318 BUG_ON(size > (size_t) U8_MAX); 319 op->cls.method_len = size; 320 payload_len += size; 321 322 op->cls.indata = request_data; 323 BUG_ON(request_data_size > (size_t) U32_MAX); 324 op->cls.indata_len = (u32) request_data_size; 325 payload_len += request_data_size; 326 327 op->cls.argc = 0; /* currently unused */ 328 329 op->payload_len = payload_len; 330 } 331 EXPORT_SYMBOL(osd_req_op_cls_init); 332 333 void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, 334 u64 cookie, u64 version, int flag) 335 { 336 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 337 338 osd_req_op_init(op, opcode); 339 340 op->watch.cookie = cookie; 341 /* op->watch.ver = version; */ /* XXX 3847 */ 342 op->watch.ver = cpu_to_le64(version); 343 if (opcode == CEPH_OSD_OP_WATCH && flag) 344 op->watch.flag = (u8) 1; 345 } 346 EXPORT_SYMBOL(osd_req_op_watch_init); 347 348 static u64 osd_req_encode_op(struct ceph_osd_request *req, 349 struct ceph_osd_op *dst, 350 struct ceph_osd_req_op *src) 351 { 352 u64 out_data_len = 0; 353 struct ceph_pagelist *pagelist; 354 355 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 356 pr_err("unrecognized osd opcode %d\n", src->op); 357 358 return 0; 359 } 360 361 switch (src->op) { 362 case CEPH_OSD_OP_STAT: 363 break; 364 case CEPH_OSD_OP_READ: 365 case CEPH_OSD_OP_WRITE: 366 if (src->op == CEPH_OSD_OP_WRITE) 367 out_data_len = src->extent.length; 368 dst->extent.offset = cpu_to_le64(src->extent.offset); 369 dst->extent.length = cpu_to_le64(src->extent.length); 370 dst->extent.truncate_size = 371 cpu_to_le64(src->extent.truncate_size); 372 dst->extent.truncate_seq = 373 cpu_to_le32(src->extent.truncate_seq); 374 break; 375 case CEPH_OSD_OP_CALL: 376 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 377 BUG_ON(!pagelist); 378 ceph_pagelist_init(pagelist); 379 380 dst->cls.class_len = src->cls.class_len; 381 dst->cls.method_len = src->cls.method_len; 382 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 383 ceph_pagelist_append(pagelist, src->cls.class_name, 384 src->cls.class_len); 385 ceph_pagelist_append(pagelist, src->cls.method_name, 386 src->cls.method_len); 387 ceph_pagelist_append(pagelist, src->cls.indata, 388 src->cls.indata_len); 389 390 req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGELIST; 391 req->r_data_out.pagelist = pagelist; 392 out_data_len = pagelist->length; 393 break; 394 case CEPH_OSD_OP_STARTSYNC: 395 break; 396 case CEPH_OSD_OP_NOTIFY_ACK: 397 case CEPH_OSD_OP_WATCH: 398 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 399 dst->watch.ver = cpu_to_le64(src->watch.ver); 400 dst->watch.flag = src->watch.flag; 401 break; 402 default: 403 pr_err("unsupported osd opcode %s\n", 404 ceph_osd_op_name(src->op)); 405 WARN_ON(1); 406 407 return 0; 408 } 409 dst->op = cpu_to_le16(src->op); 410 dst->payload_len = cpu_to_le32(src->payload_len); 411 412 return out_data_len; 413 } 414 415 /* 416 * build new request AND message 417 * 418 */ 419 void ceph_osdc_build_request(struct ceph_osd_request *req, 420 u64 off, unsigned int num_ops, 421 struct ceph_osd_req_op *src_ops, 422 struct ceph_snap_context *snapc, u64 snap_id, 423 struct timespec *mtime) 424 { 425 struct ceph_msg *msg = req->r_request; 426 struct ceph_osd_req_op *src_op; 427 void *p; 428 size_t msg_size; 429 int flags = req->r_flags; 430 u64 data_len; 431 int i; 432 433 req->r_num_ops = num_ops; 434 req->r_snapid = snap_id; 435 req->r_snapc = ceph_get_snap_context(snapc); 436 437 /* encode request */ 438 msg->hdr.version = cpu_to_le16(4); 439 440 p = msg->front.iov_base; 441 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 442 req->r_request_osdmap_epoch = p; 443 p += 4; 444 req->r_request_flags = p; 445 p += 4; 446 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 447 ceph_encode_timespec(p, mtime); 448 p += sizeof(struct ceph_timespec); 449 req->r_request_reassert_version = p; 450 p += sizeof(struct ceph_eversion); /* will get filled in */ 451 452 /* oloc */ 453 ceph_encode_8(&p, 4); 454 ceph_encode_8(&p, 4); 455 ceph_encode_32(&p, 8 + 4 + 4); 456 req->r_request_pool = p; 457 p += 8; 458 ceph_encode_32(&p, -1); /* preferred */ 459 ceph_encode_32(&p, 0); /* key len */ 460 461 ceph_encode_8(&p, 1); 462 req->r_request_pgid = p; 463 p += 8 + 4; 464 ceph_encode_32(&p, -1); /* preferred */ 465 466 /* oid */ 467 ceph_encode_32(&p, req->r_oid_len); 468 memcpy(p, req->r_oid, req->r_oid_len); 469 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 470 p += req->r_oid_len; 471 472 /* ops--can imply data */ 473 ceph_encode_16(&p, num_ops); 474 src_op = src_ops; 475 req->r_request_ops = p; 476 data_len = 0; 477 for (i = 0; i < num_ops; i++, src_op++) { 478 data_len += osd_req_encode_op(req, p, src_op); 479 p += sizeof(struct ceph_osd_op); 480 } 481 482 /* snaps */ 483 ceph_encode_64(&p, req->r_snapid); 484 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 485 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 486 if (req->r_snapc) { 487 for (i = 0; i < snapc->num_snaps; i++) { 488 ceph_encode_64(&p, req->r_snapc->snaps[i]); 489 } 490 } 491 492 req->r_request_attempts = p; 493 p += 4; 494 495 /* data */ 496 if (flags & CEPH_OSD_FLAG_WRITE) { 497 u16 data_off; 498 499 /* 500 * The header "data_off" is a hint to the receiver 501 * allowing it to align received data into its 502 * buffers such that there's no need to re-copy 503 * it before writing it to disk (direct I/O). 504 */ 505 data_off = (u16) (off & 0xffff); 506 req->r_request->hdr.data_off = cpu_to_le16(data_off); 507 } 508 req->r_request->hdr.data_len = cpu_to_le32(data_len); 509 510 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 511 msg_size = p - msg->front.iov_base; 512 msg->front.iov_len = msg_size; 513 msg->hdr.front_len = cpu_to_le32(msg_size); 514 515 dout("build_request msg_size was %d num_ops %d\n", (int)msg_size, 516 num_ops); 517 return; 518 } 519 EXPORT_SYMBOL(ceph_osdc_build_request); 520 521 /* 522 * build new request AND message, calculate layout, and adjust file 523 * extent as needed. 524 * 525 * if the file was recently truncated, we include information about its 526 * old and new size so that the object can be updated appropriately. (we 527 * avoid synchronously deleting truncated objects because it's slow.) 528 * 529 * if @do_sync, include a 'startsync' command so that the osd will flush 530 * data quickly. 531 */ 532 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 533 struct ceph_file_layout *layout, 534 struct ceph_vino vino, 535 u64 off, u64 *plen, 536 int opcode, int flags, 537 struct ceph_snap_context *snapc, 538 int do_sync, 539 u32 truncate_seq, 540 u64 truncate_size, 541 struct timespec *mtime, 542 bool use_mempool) 543 { 544 struct ceph_osd_req_op ops[2]; 545 struct ceph_osd_request *req; 546 unsigned int num_op = do_sync ? 2 : 1; 547 u64 objnum = 0; 548 u64 objoff = 0; 549 u64 objlen = 0; 550 u32 object_size; 551 u64 object_base; 552 int r; 553 554 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 555 556 req = ceph_osdc_alloc_request(osdc, snapc, num_op, use_mempool, 557 GFP_NOFS); 558 if (!req) 559 return ERR_PTR(-ENOMEM); 560 req->r_flags = flags; 561 562 /* calculate max write size */ 563 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 564 if (r < 0) { 565 ceph_osdc_put_request(req); 566 return ERR_PTR(r); 567 } 568 569 object_size = le32_to_cpu(layout->fl_object_size); 570 object_base = off - objoff; 571 if (truncate_size <= object_base) { 572 truncate_size = 0; 573 } else { 574 truncate_size -= object_base; 575 if (truncate_size > object_size) 576 truncate_size = object_size; 577 } 578 579 osd_req_op_extent_init(&ops[0], opcode, objoff, objlen, 580 truncate_size, truncate_seq); 581 if (do_sync) 582 osd_req_op_init(&ops[1], CEPH_OSD_OP_STARTSYNC); 583 584 req->r_file_layout = *layout; /* keep a copy */ 585 586 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 587 vino.ino, objnum); 588 req->r_oid_len = strlen(req->r_oid); 589 590 ceph_osdc_build_request(req, off, num_op, ops, 591 snapc, vino.snap, mtime); 592 593 return req; 594 } 595 EXPORT_SYMBOL(ceph_osdc_new_request); 596 597 /* 598 * We keep osd requests in an rbtree, sorted by ->r_tid. 599 */ 600 static void __insert_request(struct ceph_osd_client *osdc, 601 struct ceph_osd_request *new) 602 { 603 struct rb_node **p = &osdc->requests.rb_node; 604 struct rb_node *parent = NULL; 605 struct ceph_osd_request *req = NULL; 606 607 while (*p) { 608 parent = *p; 609 req = rb_entry(parent, struct ceph_osd_request, r_node); 610 if (new->r_tid < req->r_tid) 611 p = &(*p)->rb_left; 612 else if (new->r_tid > req->r_tid) 613 p = &(*p)->rb_right; 614 else 615 BUG(); 616 } 617 618 rb_link_node(&new->r_node, parent, p); 619 rb_insert_color(&new->r_node, &osdc->requests); 620 } 621 622 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 623 u64 tid) 624 { 625 struct ceph_osd_request *req; 626 struct rb_node *n = osdc->requests.rb_node; 627 628 while (n) { 629 req = rb_entry(n, struct ceph_osd_request, r_node); 630 if (tid < req->r_tid) 631 n = n->rb_left; 632 else if (tid > req->r_tid) 633 n = n->rb_right; 634 else 635 return req; 636 } 637 return NULL; 638 } 639 640 static struct ceph_osd_request * 641 __lookup_request_ge(struct ceph_osd_client *osdc, 642 u64 tid) 643 { 644 struct ceph_osd_request *req; 645 struct rb_node *n = osdc->requests.rb_node; 646 647 while (n) { 648 req = rb_entry(n, struct ceph_osd_request, r_node); 649 if (tid < req->r_tid) { 650 if (!n->rb_left) 651 return req; 652 n = n->rb_left; 653 } else if (tid > req->r_tid) { 654 n = n->rb_right; 655 } else { 656 return req; 657 } 658 } 659 return NULL; 660 } 661 662 /* 663 * Resubmit requests pending on the given osd. 664 */ 665 static void __kick_osd_requests(struct ceph_osd_client *osdc, 666 struct ceph_osd *osd) 667 { 668 struct ceph_osd_request *req, *nreq; 669 LIST_HEAD(resend); 670 int err; 671 672 dout("__kick_osd_requests osd%d\n", osd->o_osd); 673 err = __reset_osd(osdc, osd); 674 if (err) 675 return; 676 /* 677 * Build up a list of requests to resend by traversing the 678 * osd's list of requests. Requests for a given object are 679 * sent in tid order, and that is also the order they're 680 * kept on this list. Therefore all requests that are in 681 * flight will be found first, followed by all requests that 682 * have not yet been sent. And to resend requests while 683 * preserving this order we will want to put any sent 684 * requests back on the front of the osd client's unsent 685 * list. 686 * 687 * So we build a separate ordered list of already-sent 688 * requests for the affected osd and splice it onto the 689 * front of the osd client's unsent list. Once we've seen a 690 * request that has not yet been sent we're done. Those 691 * requests are already sitting right where they belong. 692 */ 693 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 694 if (!req->r_sent) 695 break; 696 list_move_tail(&req->r_req_lru_item, &resend); 697 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 698 osd->o_osd); 699 if (!req->r_linger) 700 req->r_flags |= CEPH_OSD_FLAG_RETRY; 701 } 702 list_splice(&resend, &osdc->req_unsent); 703 704 /* 705 * Linger requests are re-registered before sending, which 706 * sets up a new tid for each. We add them to the unsent 707 * list at the end to keep things in tid order. 708 */ 709 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 710 r_linger_osd) { 711 /* 712 * reregister request prior to unregistering linger so 713 * that r_osd is preserved. 714 */ 715 BUG_ON(!list_empty(&req->r_req_lru_item)); 716 __register_request(osdc, req); 717 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); 718 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 719 __unregister_linger_request(osdc, req); 720 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 721 osd->o_osd); 722 } 723 } 724 725 /* 726 * If the osd connection drops, we need to resubmit all requests. 727 */ 728 static void osd_reset(struct ceph_connection *con) 729 { 730 struct ceph_osd *osd = con->private; 731 struct ceph_osd_client *osdc; 732 733 if (!osd) 734 return; 735 dout("osd_reset osd%d\n", osd->o_osd); 736 osdc = osd->o_osdc; 737 down_read(&osdc->map_sem); 738 mutex_lock(&osdc->request_mutex); 739 __kick_osd_requests(osdc, osd); 740 __send_queued(osdc); 741 mutex_unlock(&osdc->request_mutex); 742 up_read(&osdc->map_sem); 743 } 744 745 /* 746 * Track open sessions with osds. 747 */ 748 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 749 { 750 struct ceph_osd *osd; 751 752 osd = kzalloc(sizeof(*osd), GFP_NOFS); 753 if (!osd) 754 return NULL; 755 756 atomic_set(&osd->o_ref, 1); 757 osd->o_osdc = osdc; 758 osd->o_osd = onum; 759 RB_CLEAR_NODE(&osd->o_node); 760 INIT_LIST_HEAD(&osd->o_requests); 761 INIT_LIST_HEAD(&osd->o_linger_requests); 762 INIT_LIST_HEAD(&osd->o_osd_lru); 763 osd->o_incarnation = 1; 764 765 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 766 767 INIT_LIST_HEAD(&osd->o_keepalive_item); 768 return osd; 769 } 770 771 static struct ceph_osd *get_osd(struct ceph_osd *osd) 772 { 773 if (atomic_inc_not_zero(&osd->o_ref)) { 774 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 775 atomic_read(&osd->o_ref)); 776 return osd; 777 } else { 778 dout("get_osd %p FAIL\n", osd); 779 return NULL; 780 } 781 } 782 783 static void put_osd(struct ceph_osd *osd) 784 { 785 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 786 atomic_read(&osd->o_ref) - 1); 787 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 788 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 789 790 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 791 kfree(osd); 792 } 793 } 794 795 /* 796 * remove an osd from our map 797 */ 798 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 799 { 800 dout("__remove_osd %p\n", osd); 801 BUG_ON(!list_empty(&osd->o_requests)); 802 rb_erase(&osd->o_node, &osdc->osds); 803 list_del_init(&osd->o_osd_lru); 804 ceph_con_close(&osd->o_con); 805 put_osd(osd); 806 } 807 808 static void remove_all_osds(struct ceph_osd_client *osdc) 809 { 810 dout("%s %p\n", __func__, osdc); 811 mutex_lock(&osdc->request_mutex); 812 while (!RB_EMPTY_ROOT(&osdc->osds)) { 813 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 814 struct ceph_osd, o_node); 815 __remove_osd(osdc, osd); 816 } 817 mutex_unlock(&osdc->request_mutex); 818 } 819 820 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 821 struct ceph_osd *osd) 822 { 823 dout("__move_osd_to_lru %p\n", osd); 824 BUG_ON(!list_empty(&osd->o_osd_lru)); 825 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 826 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 827 } 828 829 static void __remove_osd_from_lru(struct ceph_osd *osd) 830 { 831 dout("__remove_osd_from_lru %p\n", osd); 832 if (!list_empty(&osd->o_osd_lru)) 833 list_del_init(&osd->o_osd_lru); 834 } 835 836 static void remove_old_osds(struct ceph_osd_client *osdc) 837 { 838 struct ceph_osd *osd, *nosd; 839 840 dout("__remove_old_osds %p\n", osdc); 841 mutex_lock(&osdc->request_mutex); 842 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 843 if (time_before(jiffies, osd->lru_ttl)) 844 break; 845 __remove_osd(osdc, osd); 846 } 847 mutex_unlock(&osdc->request_mutex); 848 } 849 850 /* 851 * reset osd connect 852 */ 853 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 854 { 855 struct ceph_entity_addr *peer_addr; 856 857 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 858 if (list_empty(&osd->o_requests) && 859 list_empty(&osd->o_linger_requests)) { 860 __remove_osd(osdc, osd); 861 862 return -ENODEV; 863 } 864 865 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 866 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 867 !ceph_con_opened(&osd->o_con)) { 868 struct ceph_osd_request *req; 869 870 dout(" osd addr hasn't changed and connection never opened," 871 " letting msgr retry"); 872 /* touch each r_stamp for handle_timeout()'s benfit */ 873 list_for_each_entry(req, &osd->o_requests, r_osd_item) 874 req->r_stamp = jiffies; 875 876 return -EAGAIN; 877 } 878 879 ceph_con_close(&osd->o_con); 880 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 881 osd->o_incarnation++; 882 883 return 0; 884 } 885 886 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 887 { 888 struct rb_node **p = &osdc->osds.rb_node; 889 struct rb_node *parent = NULL; 890 struct ceph_osd *osd = NULL; 891 892 dout("__insert_osd %p osd%d\n", new, new->o_osd); 893 while (*p) { 894 parent = *p; 895 osd = rb_entry(parent, struct ceph_osd, o_node); 896 if (new->o_osd < osd->o_osd) 897 p = &(*p)->rb_left; 898 else if (new->o_osd > osd->o_osd) 899 p = &(*p)->rb_right; 900 else 901 BUG(); 902 } 903 904 rb_link_node(&new->o_node, parent, p); 905 rb_insert_color(&new->o_node, &osdc->osds); 906 } 907 908 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 909 { 910 struct ceph_osd *osd; 911 struct rb_node *n = osdc->osds.rb_node; 912 913 while (n) { 914 osd = rb_entry(n, struct ceph_osd, o_node); 915 if (o < osd->o_osd) 916 n = n->rb_left; 917 else if (o > osd->o_osd) 918 n = n->rb_right; 919 else 920 return osd; 921 } 922 return NULL; 923 } 924 925 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 926 { 927 schedule_delayed_work(&osdc->timeout_work, 928 osdc->client->options->osd_keepalive_timeout * HZ); 929 } 930 931 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 932 { 933 cancel_delayed_work(&osdc->timeout_work); 934 } 935 936 /* 937 * Register request, assign tid. If this is the first request, set up 938 * the timeout event. 939 */ 940 static void __register_request(struct ceph_osd_client *osdc, 941 struct ceph_osd_request *req) 942 { 943 req->r_tid = ++osdc->last_tid; 944 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 945 dout("__register_request %p tid %lld\n", req, req->r_tid); 946 __insert_request(osdc, req); 947 ceph_osdc_get_request(req); 948 osdc->num_requests++; 949 if (osdc->num_requests == 1) { 950 dout(" first request, scheduling timeout\n"); 951 __schedule_osd_timeout(osdc); 952 } 953 } 954 955 /* 956 * called under osdc->request_mutex 957 */ 958 static void __unregister_request(struct ceph_osd_client *osdc, 959 struct ceph_osd_request *req) 960 { 961 if (RB_EMPTY_NODE(&req->r_node)) { 962 dout("__unregister_request %p tid %lld not registered\n", 963 req, req->r_tid); 964 return; 965 } 966 967 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 968 rb_erase(&req->r_node, &osdc->requests); 969 osdc->num_requests--; 970 971 if (req->r_osd) { 972 /* make sure the original request isn't in flight. */ 973 ceph_msg_revoke(req->r_request); 974 975 list_del_init(&req->r_osd_item); 976 if (list_empty(&req->r_osd->o_requests) && 977 list_empty(&req->r_osd->o_linger_requests)) { 978 dout("moving osd to %p lru\n", req->r_osd); 979 __move_osd_to_lru(osdc, req->r_osd); 980 } 981 if (list_empty(&req->r_linger_item)) 982 req->r_osd = NULL; 983 } 984 985 list_del_init(&req->r_req_lru_item); 986 ceph_osdc_put_request(req); 987 988 if (osdc->num_requests == 0) { 989 dout(" no requests, canceling timeout\n"); 990 __cancel_osd_timeout(osdc); 991 } 992 } 993 994 /* 995 * Cancel a previously queued request message 996 */ 997 static void __cancel_request(struct ceph_osd_request *req) 998 { 999 if (req->r_sent && req->r_osd) { 1000 ceph_msg_revoke(req->r_request); 1001 req->r_sent = 0; 1002 } 1003 } 1004 1005 static void __register_linger_request(struct ceph_osd_client *osdc, 1006 struct ceph_osd_request *req) 1007 { 1008 dout("__register_linger_request %p\n", req); 1009 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1010 if (req->r_osd) 1011 list_add_tail(&req->r_linger_osd, 1012 &req->r_osd->o_linger_requests); 1013 } 1014 1015 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1016 struct ceph_osd_request *req) 1017 { 1018 dout("__unregister_linger_request %p\n", req); 1019 list_del_init(&req->r_linger_item); 1020 if (req->r_osd) { 1021 list_del_init(&req->r_linger_osd); 1022 1023 if (list_empty(&req->r_osd->o_requests) && 1024 list_empty(&req->r_osd->o_linger_requests)) { 1025 dout("moving osd to %p lru\n", req->r_osd); 1026 __move_osd_to_lru(osdc, req->r_osd); 1027 } 1028 if (list_empty(&req->r_osd_item)) 1029 req->r_osd = NULL; 1030 } 1031 } 1032 1033 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 1034 struct ceph_osd_request *req) 1035 { 1036 mutex_lock(&osdc->request_mutex); 1037 if (req->r_linger) { 1038 __unregister_linger_request(osdc, req); 1039 ceph_osdc_put_request(req); 1040 } 1041 mutex_unlock(&osdc->request_mutex); 1042 } 1043 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 1044 1045 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1046 struct ceph_osd_request *req) 1047 { 1048 if (!req->r_linger) { 1049 dout("set_request_linger %p\n", req); 1050 req->r_linger = 1; 1051 /* 1052 * caller is now responsible for calling 1053 * unregister_linger_request 1054 */ 1055 ceph_osdc_get_request(req); 1056 } 1057 } 1058 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1059 1060 /* 1061 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1062 * (as needed), and set the request r_osd appropriately. If there is 1063 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1064 * (unsent, homeless) or leave on in-flight lru. 1065 * 1066 * Return 0 if unchanged, 1 if changed, or negative on error. 1067 * 1068 * Caller should hold map_sem for read and request_mutex. 1069 */ 1070 static int __map_request(struct ceph_osd_client *osdc, 1071 struct ceph_osd_request *req, int force_resend) 1072 { 1073 struct ceph_pg pgid; 1074 int acting[CEPH_PG_MAX_SIZE]; 1075 int o = -1, num = 0; 1076 int err; 1077 1078 dout("map_request %p tid %lld\n", req, req->r_tid); 1079 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1080 ceph_file_layout_pg_pool(req->r_file_layout)); 1081 if (err) { 1082 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1083 return err; 1084 } 1085 req->r_pgid = pgid; 1086 1087 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 1088 if (err > 0) { 1089 o = acting[0]; 1090 num = err; 1091 } 1092 1093 if ((!force_resend && 1094 req->r_osd && req->r_osd->o_osd == o && 1095 req->r_sent >= req->r_osd->o_incarnation && 1096 req->r_num_pg_osds == num && 1097 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1098 (req->r_osd == NULL && o == -1)) 1099 return 0; /* no change */ 1100 1101 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1102 req->r_tid, pgid.pool, pgid.seed, o, 1103 req->r_osd ? req->r_osd->o_osd : -1); 1104 1105 /* record full pg acting set */ 1106 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1107 req->r_num_pg_osds = num; 1108 1109 if (req->r_osd) { 1110 __cancel_request(req); 1111 list_del_init(&req->r_osd_item); 1112 req->r_osd = NULL; 1113 } 1114 1115 req->r_osd = __lookup_osd(osdc, o); 1116 if (!req->r_osd && o >= 0) { 1117 err = -ENOMEM; 1118 req->r_osd = create_osd(osdc, o); 1119 if (!req->r_osd) { 1120 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1121 goto out; 1122 } 1123 1124 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1125 __insert_osd(osdc, req->r_osd); 1126 1127 ceph_con_open(&req->r_osd->o_con, 1128 CEPH_ENTITY_TYPE_OSD, o, 1129 &osdc->osdmap->osd_addr[o]); 1130 } 1131 1132 if (req->r_osd) { 1133 __remove_osd_from_lru(req->r_osd); 1134 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1135 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1136 } else { 1137 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1138 } 1139 err = 1; /* osd or pg changed */ 1140 1141 out: 1142 return err; 1143 } 1144 1145 /* 1146 * caller should hold map_sem (for read) and request_mutex 1147 */ 1148 static void __send_request(struct ceph_osd_client *osdc, 1149 struct ceph_osd_request *req) 1150 { 1151 void *p; 1152 1153 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1154 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1155 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1156 1157 /* fill in message content that changes each time we send it */ 1158 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1159 put_unaligned_le32(req->r_flags, req->r_request_flags); 1160 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1161 p = req->r_request_pgid; 1162 ceph_encode_64(&p, req->r_pgid.pool); 1163 ceph_encode_32(&p, req->r_pgid.seed); 1164 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1165 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1166 sizeof(req->r_reassert_version)); 1167 1168 req->r_stamp = jiffies; 1169 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1170 1171 ceph_msg_get(req->r_request); /* send consumes a ref */ 1172 ceph_con_send(&req->r_osd->o_con, req->r_request); 1173 req->r_sent = req->r_osd->o_incarnation; 1174 } 1175 1176 /* 1177 * Send any requests in the queue (req_unsent). 1178 */ 1179 static void __send_queued(struct ceph_osd_client *osdc) 1180 { 1181 struct ceph_osd_request *req, *tmp; 1182 1183 dout("__send_queued\n"); 1184 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1185 __send_request(osdc, req); 1186 } 1187 1188 /* 1189 * Timeout callback, called every N seconds when 1 or more osd 1190 * requests has been active for more than N seconds. When this 1191 * happens, we ping all OSDs with requests who have timed out to 1192 * ensure any communications channel reset is detected. Reset the 1193 * request timeouts another N seconds in the future as we go. 1194 * Reschedule the timeout event another N seconds in future (unless 1195 * there are no open requests). 1196 */ 1197 static void handle_timeout(struct work_struct *work) 1198 { 1199 struct ceph_osd_client *osdc = 1200 container_of(work, struct ceph_osd_client, timeout_work.work); 1201 struct ceph_osd_request *req; 1202 struct ceph_osd *osd; 1203 unsigned long keepalive = 1204 osdc->client->options->osd_keepalive_timeout * HZ; 1205 struct list_head slow_osds; 1206 dout("timeout\n"); 1207 down_read(&osdc->map_sem); 1208 1209 ceph_monc_request_next_osdmap(&osdc->client->monc); 1210 1211 mutex_lock(&osdc->request_mutex); 1212 1213 /* 1214 * ping osds that are a bit slow. this ensures that if there 1215 * is a break in the TCP connection we will notice, and reopen 1216 * a connection with that osd (from the fault callback). 1217 */ 1218 INIT_LIST_HEAD(&slow_osds); 1219 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1220 if (time_before(jiffies, req->r_stamp + keepalive)) 1221 break; 1222 1223 osd = req->r_osd; 1224 BUG_ON(!osd); 1225 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1226 req->r_tid, osd->o_osd); 1227 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1228 } 1229 while (!list_empty(&slow_osds)) { 1230 osd = list_entry(slow_osds.next, struct ceph_osd, 1231 o_keepalive_item); 1232 list_del_init(&osd->o_keepalive_item); 1233 ceph_con_keepalive(&osd->o_con); 1234 } 1235 1236 __schedule_osd_timeout(osdc); 1237 __send_queued(osdc); 1238 mutex_unlock(&osdc->request_mutex); 1239 up_read(&osdc->map_sem); 1240 } 1241 1242 static void handle_osds_timeout(struct work_struct *work) 1243 { 1244 struct ceph_osd_client *osdc = 1245 container_of(work, struct ceph_osd_client, 1246 osds_timeout_work.work); 1247 unsigned long delay = 1248 osdc->client->options->osd_idle_ttl * HZ >> 2; 1249 1250 dout("osds timeout\n"); 1251 down_read(&osdc->map_sem); 1252 remove_old_osds(osdc); 1253 up_read(&osdc->map_sem); 1254 1255 schedule_delayed_work(&osdc->osds_timeout_work, 1256 round_jiffies_relative(delay)); 1257 } 1258 1259 static void complete_request(struct ceph_osd_request *req) 1260 { 1261 if (req->r_safe_callback) 1262 req->r_safe_callback(req, NULL); 1263 complete_all(&req->r_safe_completion); /* fsync waiter */ 1264 } 1265 1266 /* 1267 * handle osd op reply. either call the callback if it is specified, 1268 * or do the completion to wake up the waiting thread. 1269 */ 1270 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1271 struct ceph_connection *con) 1272 { 1273 void *p, *end; 1274 struct ceph_osd_request *req; 1275 u64 tid; 1276 int object_len; 1277 int numops, payload_len, flags; 1278 s32 result; 1279 s32 retry_attempt; 1280 struct ceph_pg pg; 1281 int err; 1282 u32 reassert_epoch; 1283 u64 reassert_version; 1284 u32 osdmap_epoch; 1285 int already_completed; 1286 int i; 1287 1288 tid = le64_to_cpu(msg->hdr.tid); 1289 dout("handle_reply %p tid %llu\n", msg, tid); 1290 1291 p = msg->front.iov_base; 1292 end = p + msg->front.iov_len; 1293 1294 ceph_decode_need(&p, end, 4, bad); 1295 object_len = ceph_decode_32(&p); 1296 ceph_decode_need(&p, end, object_len, bad); 1297 p += object_len; 1298 1299 err = ceph_decode_pgid(&p, end, &pg); 1300 if (err) 1301 goto bad; 1302 1303 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1304 flags = ceph_decode_64(&p); 1305 result = ceph_decode_32(&p); 1306 reassert_epoch = ceph_decode_32(&p); 1307 reassert_version = ceph_decode_64(&p); 1308 osdmap_epoch = ceph_decode_32(&p); 1309 1310 /* lookup */ 1311 mutex_lock(&osdc->request_mutex); 1312 req = __lookup_request(osdc, tid); 1313 if (req == NULL) { 1314 dout("handle_reply tid %llu dne\n", tid); 1315 goto bad_mutex; 1316 } 1317 ceph_osdc_get_request(req); 1318 1319 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1320 req, result); 1321 1322 ceph_decode_need(&p, end, 4, bad); 1323 numops = ceph_decode_32(&p); 1324 if (numops > CEPH_OSD_MAX_OP) 1325 goto bad_put; 1326 if (numops != req->r_num_ops) 1327 goto bad_put; 1328 payload_len = 0; 1329 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); 1330 for (i = 0; i < numops; i++) { 1331 struct ceph_osd_op *op = p; 1332 int len; 1333 1334 len = le32_to_cpu(op->payload_len); 1335 req->r_reply_op_len[i] = len; 1336 dout(" op %d has %d bytes\n", i, len); 1337 payload_len += len; 1338 p += sizeof(*op); 1339 } 1340 if (payload_len != le32_to_cpu(msg->hdr.data_len)) { 1341 pr_warning("sum of op payload lens %d != data_len %d", 1342 payload_len, le32_to_cpu(msg->hdr.data_len)); 1343 goto bad_put; 1344 } 1345 1346 ceph_decode_need(&p, end, 4 + numops * 4, bad); 1347 retry_attempt = ceph_decode_32(&p); 1348 for (i = 0; i < numops; i++) 1349 req->r_reply_op_result[i] = ceph_decode_32(&p); 1350 1351 if (!req->r_got_reply) { 1352 unsigned int bytes; 1353 1354 req->r_result = result; 1355 bytes = le32_to_cpu(msg->hdr.data_len); 1356 dout("handle_reply result %d bytes %d\n", req->r_result, 1357 bytes); 1358 if (req->r_result == 0) 1359 req->r_result = bytes; 1360 1361 /* in case this is a write and we need to replay, */ 1362 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1363 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1364 1365 req->r_got_reply = 1; 1366 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1367 dout("handle_reply tid %llu dup ack\n", tid); 1368 mutex_unlock(&osdc->request_mutex); 1369 goto done; 1370 } 1371 1372 dout("handle_reply tid %llu flags %d\n", tid, flags); 1373 1374 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1375 __register_linger_request(osdc, req); 1376 1377 /* either this is a read, or we got the safe response */ 1378 if (result < 0 || 1379 (flags & CEPH_OSD_FLAG_ONDISK) || 1380 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1381 __unregister_request(osdc, req); 1382 1383 already_completed = req->r_completed; 1384 req->r_completed = 1; 1385 mutex_unlock(&osdc->request_mutex); 1386 if (already_completed) 1387 goto done; 1388 1389 if (req->r_callback) 1390 req->r_callback(req, msg); 1391 else 1392 complete_all(&req->r_completion); 1393 1394 if (flags & CEPH_OSD_FLAG_ONDISK) 1395 complete_request(req); 1396 1397 done: 1398 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1399 ceph_osdc_put_request(req); 1400 return; 1401 1402 bad_put: 1403 ceph_osdc_put_request(req); 1404 bad_mutex: 1405 mutex_unlock(&osdc->request_mutex); 1406 bad: 1407 pr_err("corrupt osd_op_reply got %d %d\n", 1408 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1409 ceph_msg_dump(msg); 1410 } 1411 1412 static void reset_changed_osds(struct ceph_osd_client *osdc) 1413 { 1414 struct rb_node *p, *n; 1415 1416 for (p = rb_first(&osdc->osds); p; p = n) { 1417 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1418 1419 n = rb_next(p); 1420 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1421 memcmp(&osd->o_con.peer_addr, 1422 ceph_osd_addr(osdc->osdmap, 1423 osd->o_osd), 1424 sizeof(struct ceph_entity_addr)) != 0) 1425 __reset_osd(osdc, osd); 1426 } 1427 } 1428 1429 /* 1430 * Requeue requests whose mapping to an OSD has changed. If requests map to 1431 * no osd, request a new map. 1432 * 1433 * Caller should hold map_sem for read. 1434 */ 1435 static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1436 { 1437 struct ceph_osd_request *req, *nreq; 1438 struct rb_node *p; 1439 int needmap = 0; 1440 int err; 1441 1442 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1443 mutex_lock(&osdc->request_mutex); 1444 for (p = rb_first(&osdc->requests); p; ) { 1445 req = rb_entry(p, struct ceph_osd_request, r_node); 1446 p = rb_next(p); 1447 1448 /* 1449 * For linger requests that have not yet been 1450 * registered, move them to the linger list; they'll 1451 * be sent to the osd in the loop below. Unregister 1452 * the request before re-registering it as a linger 1453 * request to ensure the __map_request() below 1454 * will decide it needs to be sent. 1455 */ 1456 if (req->r_linger && list_empty(&req->r_linger_item)) { 1457 dout("%p tid %llu restart on osd%d\n", 1458 req, req->r_tid, 1459 req->r_osd ? req->r_osd->o_osd : -1); 1460 __unregister_request(osdc, req); 1461 __register_linger_request(osdc, req); 1462 continue; 1463 } 1464 1465 err = __map_request(osdc, req, force_resend); 1466 if (err < 0) 1467 continue; /* error */ 1468 if (req->r_osd == NULL) { 1469 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1470 needmap++; /* request a newer map */ 1471 } else if (err > 0) { 1472 if (!req->r_linger) { 1473 dout("%p tid %llu requeued on osd%d\n", req, 1474 req->r_tid, 1475 req->r_osd ? req->r_osd->o_osd : -1); 1476 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1477 } 1478 } 1479 } 1480 1481 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1482 r_linger_item) { 1483 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1484 1485 err = __map_request(osdc, req, force_resend); 1486 dout("__map_request returned %d\n", err); 1487 if (err == 0) 1488 continue; /* no change and no osd was specified */ 1489 if (err < 0) 1490 continue; /* hrm! */ 1491 if (req->r_osd == NULL) { 1492 dout("tid %llu maps to no valid osd\n", req->r_tid); 1493 needmap++; /* request a newer map */ 1494 continue; 1495 } 1496 1497 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1498 req->r_osd ? req->r_osd->o_osd : -1); 1499 __register_request(osdc, req); 1500 __unregister_linger_request(osdc, req); 1501 } 1502 mutex_unlock(&osdc->request_mutex); 1503 1504 if (needmap) { 1505 dout("%d requests for down osds, need new map\n", needmap); 1506 ceph_monc_request_next_osdmap(&osdc->client->monc); 1507 } 1508 reset_changed_osds(osdc); 1509 } 1510 1511 1512 /* 1513 * Process updated osd map. 1514 * 1515 * The message contains any number of incremental and full maps, normally 1516 * indicating some sort of topology change in the cluster. Kick requests 1517 * off to different OSDs as needed. 1518 */ 1519 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1520 { 1521 void *p, *end, *next; 1522 u32 nr_maps, maplen; 1523 u32 epoch; 1524 struct ceph_osdmap *newmap = NULL, *oldmap; 1525 int err; 1526 struct ceph_fsid fsid; 1527 1528 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1529 p = msg->front.iov_base; 1530 end = p + msg->front.iov_len; 1531 1532 /* verify fsid */ 1533 ceph_decode_need(&p, end, sizeof(fsid), bad); 1534 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1535 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1536 return; 1537 1538 down_write(&osdc->map_sem); 1539 1540 /* incremental maps */ 1541 ceph_decode_32_safe(&p, end, nr_maps, bad); 1542 dout(" %d inc maps\n", nr_maps); 1543 while (nr_maps > 0) { 1544 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1545 epoch = ceph_decode_32(&p); 1546 maplen = ceph_decode_32(&p); 1547 ceph_decode_need(&p, end, maplen, bad); 1548 next = p + maplen; 1549 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1550 dout("applying incremental map %u len %d\n", 1551 epoch, maplen); 1552 newmap = osdmap_apply_incremental(&p, next, 1553 osdc->osdmap, 1554 &osdc->client->msgr); 1555 if (IS_ERR(newmap)) { 1556 err = PTR_ERR(newmap); 1557 goto bad; 1558 } 1559 BUG_ON(!newmap); 1560 if (newmap != osdc->osdmap) { 1561 ceph_osdmap_destroy(osdc->osdmap); 1562 osdc->osdmap = newmap; 1563 } 1564 kick_requests(osdc, 0); 1565 } else { 1566 dout("ignoring incremental map %u len %d\n", 1567 epoch, maplen); 1568 } 1569 p = next; 1570 nr_maps--; 1571 } 1572 if (newmap) 1573 goto done; 1574 1575 /* full maps */ 1576 ceph_decode_32_safe(&p, end, nr_maps, bad); 1577 dout(" %d full maps\n", nr_maps); 1578 while (nr_maps) { 1579 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1580 epoch = ceph_decode_32(&p); 1581 maplen = ceph_decode_32(&p); 1582 ceph_decode_need(&p, end, maplen, bad); 1583 if (nr_maps > 1) { 1584 dout("skipping non-latest full map %u len %d\n", 1585 epoch, maplen); 1586 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1587 dout("skipping full map %u len %d, " 1588 "older than our %u\n", epoch, maplen, 1589 osdc->osdmap->epoch); 1590 } else { 1591 int skipped_map = 0; 1592 1593 dout("taking full map %u len %d\n", epoch, maplen); 1594 newmap = osdmap_decode(&p, p+maplen); 1595 if (IS_ERR(newmap)) { 1596 err = PTR_ERR(newmap); 1597 goto bad; 1598 } 1599 BUG_ON(!newmap); 1600 oldmap = osdc->osdmap; 1601 osdc->osdmap = newmap; 1602 if (oldmap) { 1603 if (oldmap->epoch + 1 < newmap->epoch) 1604 skipped_map = 1; 1605 ceph_osdmap_destroy(oldmap); 1606 } 1607 kick_requests(osdc, skipped_map); 1608 } 1609 p += maplen; 1610 nr_maps--; 1611 } 1612 1613 done: 1614 downgrade_write(&osdc->map_sem); 1615 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1616 1617 /* 1618 * subscribe to subsequent osdmap updates if full to ensure 1619 * we find out when we are no longer full and stop returning 1620 * ENOSPC. 1621 */ 1622 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1623 ceph_monc_request_next_osdmap(&osdc->client->monc); 1624 1625 mutex_lock(&osdc->request_mutex); 1626 __send_queued(osdc); 1627 mutex_unlock(&osdc->request_mutex); 1628 up_read(&osdc->map_sem); 1629 wake_up_all(&osdc->client->auth_wq); 1630 return; 1631 1632 bad: 1633 pr_err("osdc handle_map corrupt msg\n"); 1634 ceph_msg_dump(msg); 1635 up_write(&osdc->map_sem); 1636 return; 1637 } 1638 1639 /* 1640 * watch/notify callback event infrastructure 1641 * 1642 * These callbacks are used both for watch and notify operations. 1643 */ 1644 static void __release_event(struct kref *kref) 1645 { 1646 struct ceph_osd_event *event = 1647 container_of(kref, struct ceph_osd_event, kref); 1648 1649 dout("__release_event %p\n", event); 1650 kfree(event); 1651 } 1652 1653 static void get_event(struct ceph_osd_event *event) 1654 { 1655 kref_get(&event->kref); 1656 } 1657 1658 void ceph_osdc_put_event(struct ceph_osd_event *event) 1659 { 1660 kref_put(&event->kref, __release_event); 1661 } 1662 EXPORT_SYMBOL(ceph_osdc_put_event); 1663 1664 static void __insert_event(struct ceph_osd_client *osdc, 1665 struct ceph_osd_event *new) 1666 { 1667 struct rb_node **p = &osdc->event_tree.rb_node; 1668 struct rb_node *parent = NULL; 1669 struct ceph_osd_event *event = NULL; 1670 1671 while (*p) { 1672 parent = *p; 1673 event = rb_entry(parent, struct ceph_osd_event, node); 1674 if (new->cookie < event->cookie) 1675 p = &(*p)->rb_left; 1676 else if (new->cookie > event->cookie) 1677 p = &(*p)->rb_right; 1678 else 1679 BUG(); 1680 } 1681 1682 rb_link_node(&new->node, parent, p); 1683 rb_insert_color(&new->node, &osdc->event_tree); 1684 } 1685 1686 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1687 u64 cookie) 1688 { 1689 struct rb_node **p = &osdc->event_tree.rb_node; 1690 struct rb_node *parent = NULL; 1691 struct ceph_osd_event *event = NULL; 1692 1693 while (*p) { 1694 parent = *p; 1695 event = rb_entry(parent, struct ceph_osd_event, node); 1696 if (cookie < event->cookie) 1697 p = &(*p)->rb_left; 1698 else if (cookie > event->cookie) 1699 p = &(*p)->rb_right; 1700 else 1701 return event; 1702 } 1703 return NULL; 1704 } 1705 1706 static void __remove_event(struct ceph_osd_event *event) 1707 { 1708 struct ceph_osd_client *osdc = event->osdc; 1709 1710 if (!RB_EMPTY_NODE(&event->node)) { 1711 dout("__remove_event removed %p\n", event); 1712 rb_erase(&event->node, &osdc->event_tree); 1713 ceph_osdc_put_event(event); 1714 } else { 1715 dout("__remove_event didn't remove %p\n", event); 1716 } 1717 } 1718 1719 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1720 void (*event_cb)(u64, u64, u8, void *), 1721 void *data, struct ceph_osd_event **pevent) 1722 { 1723 struct ceph_osd_event *event; 1724 1725 event = kmalloc(sizeof(*event), GFP_NOIO); 1726 if (!event) 1727 return -ENOMEM; 1728 1729 dout("create_event %p\n", event); 1730 event->cb = event_cb; 1731 event->one_shot = 0; 1732 event->data = data; 1733 event->osdc = osdc; 1734 INIT_LIST_HEAD(&event->osd_node); 1735 RB_CLEAR_NODE(&event->node); 1736 kref_init(&event->kref); /* one ref for us */ 1737 kref_get(&event->kref); /* one ref for the caller */ 1738 1739 spin_lock(&osdc->event_lock); 1740 event->cookie = ++osdc->event_count; 1741 __insert_event(osdc, event); 1742 spin_unlock(&osdc->event_lock); 1743 1744 *pevent = event; 1745 return 0; 1746 } 1747 EXPORT_SYMBOL(ceph_osdc_create_event); 1748 1749 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1750 { 1751 struct ceph_osd_client *osdc = event->osdc; 1752 1753 dout("cancel_event %p\n", event); 1754 spin_lock(&osdc->event_lock); 1755 __remove_event(event); 1756 spin_unlock(&osdc->event_lock); 1757 ceph_osdc_put_event(event); /* caller's */ 1758 } 1759 EXPORT_SYMBOL(ceph_osdc_cancel_event); 1760 1761 1762 static void do_event_work(struct work_struct *work) 1763 { 1764 struct ceph_osd_event_work *event_work = 1765 container_of(work, struct ceph_osd_event_work, work); 1766 struct ceph_osd_event *event = event_work->event; 1767 u64 ver = event_work->ver; 1768 u64 notify_id = event_work->notify_id; 1769 u8 opcode = event_work->opcode; 1770 1771 dout("do_event_work completing %p\n", event); 1772 event->cb(ver, notify_id, opcode, event->data); 1773 dout("do_event_work completed %p\n", event); 1774 ceph_osdc_put_event(event); 1775 kfree(event_work); 1776 } 1777 1778 1779 /* 1780 * Process osd watch notifications 1781 */ 1782 static void handle_watch_notify(struct ceph_osd_client *osdc, 1783 struct ceph_msg *msg) 1784 { 1785 void *p, *end; 1786 u8 proto_ver; 1787 u64 cookie, ver, notify_id; 1788 u8 opcode; 1789 struct ceph_osd_event *event; 1790 struct ceph_osd_event_work *event_work; 1791 1792 p = msg->front.iov_base; 1793 end = p + msg->front.iov_len; 1794 1795 ceph_decode_8_safe(&p, end, proto_ver, bad); 1796 ceph_decode_8_safe(&p, end, opcode, bad); 1797 ceph_decode_64_safe(&p, end, cookie, bad); 1798 ceph_decode_64_safe(&p, end, ver, bad); 1799 ceph_decode_64_safe(&p, end, notify_id, bad); 1800 1801 spin_lock(&osdc->event_lock); 1802 event = __find_event(osdc, cookie); 1803 if (event) { 1804 BUG_ON(event->one_shot); 1805 get_event(event); 1806 } 1807 spin_unlock(&osdc->event_lock); 1808 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1809 cookie, ver, event); 1810 if (event) { 1811 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1812 if (!event_work) { 1813 dout("ERROR: could not allocate event_work\n"); 1814 goto done_err; 1815 } 1816 INIT_WORK(&event_work->work, do_event_work); 1817 event_work->event = event; 1818 event_work->ver = ver; 1819 event_work->notify_id = notify_id; 1820 event_work->opcode = opcode; 1821 if (!queue_work(osdc->notify_wq, &event_work->work)) { 1822 dout("WARNING: failed to queue notify event work\n"); 1823 goto done_err; 1824 } 1825 } 1826 1827 return; 1828 1829 done_err: 1830 ceph_osdc_put_event(event); 1831 return; 1832 1833 bad: 1834 pr_err("osdc handle_watch_notify corrupt msg\n"); 1835 return; 1836 } 1837 1838 static void ceph_osdc_msg_data_set(struct ceph_msg *msg, 1839 struct ceph_osd_data *osd_data) 1840 { 1841 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 1842 BUG_ON(osd_data->length > (u64) SIZE_MAX); 1843 if (osd_data->length) 1844 ceph_msg_data_set_pages(msg, osd_data->pages, 1845 osd_data->length, osd_data->alignment); 1846 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 1847 BUG_ON(!osd_data->pagelist->length); 1848 ceph_msg_data_set_pagelist(msg, osd_data->pagelist); 1849 #ifdef CONFIG_BLOCK 1850 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 1851 ceph_msg_data_set_bio(msg, osd_data->bio); 1852 #endif 1853 } else { 1854 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 1855 } 1856 } 1857 1858 /* 1859 * Register request, send initial attempt. 1860 */ 1861 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1862 struct ceph_osd_request *req, 1863 bool nofail) 1864 { 1865 int rc = 0; 1866 1867 /* Set up response incoming data and request outgoing data fields */ 1868 1869 ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in); 1870 ceph_osdc_msg_data_set(req->r_request, &req->r_data_out); 1871 1872 down_read(&osdc->map_sem); 1873 mutex_lock(&osdc->request_mutex); 1874 __register_request(osdc, req); 1875 WARN_ON(req->r_sent); 1876 rc = __map_request(osdc, req, 0); 1877 if (rc < 0) { 1878 if (nofail) { 1879 dout("osdc_start_request failed map, " 1880 " will retry %lld\n", req->r_tid); 1881 rc = 0; 1882 } 1883 goto out_unlock; 1884 } 1885 if (req->r_osd == NULL) { 1886 dout("send_request %p no up osds in pg\n", req); 1887 ceph_monc_request_next_osdmap(&osdc->client->monc); 1888 } else { 1889 __send_queued(osdc); 1890 } 1891 rc = 0; 1892 out_unlock: 1893 mutex_unlock(&osdc->request_mutex); 1894 up_read(&osdc->map_sem); 1895 return rc; 1896 } 1897 EXPORT_SYMBOL(ceph_osdc_start_request); 1898 1899 /* 1900 * wait for a request to complete 1901 */ 1902 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1903 struct ceph_osd_request *req) 1904 { 1905 int rc; 1906 1907 rc = wait_for_completion_interruptible(&req->r_completion); 1908 if (rc < 0) { 1909 mutex_lock(&osdc->request_mutex); 1910 __cancel_request(req); 1911 __unregister_request(osdc, req); 1912 mutex_unlock(&osdc->request_mutex); 1913 complete_request(req); 1914 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1915 return rc; 1916 } 1917 1918 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1919 return req->r_result; 1920 } 1921 EXPORT_SYMBOL(ceph_osdc_wait_request); 1922 1923 /* 1924 * sync - wait for all in-flight requests to flush. avoid starvation. 1925 */ 1926 void ceph_osdc_sync(struct ceph_osd_client *osdc) 1927 { 1928 struct ceph_osd_request *req; 1929 u64 last_tid, next_tid = 0; 1930 1931 mutex_lock(&osdc->request_mutex); 1932 last_tid = osdc->last_tid; 1933 while (1) { 1934 req = __lookup_request_ge(osdc, next_tid); 1935 if (!req) 1936 break; 1937 if (req->r_tid > last_tid) 1938 break; 1939 1940 next_tid = req->r_tid + 1; 1941 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 1942 continue; 1943 1944 ceph_osdc_get_request(req); 1945 mutex_unlock(&osdc->request_mutex); 1946 dout("sync waiting on tid %llu (last is %llu)\n", 1947 req->r_tid, last_tid); 1948 wait_for_completion(&req->r_safe_completion); 1949 mutex_lock(&osdc->request_mutex); 1950 ceph_osdc_put_request(req); 1951 } 1952 mutex_unlock(&osdc->request_mutex); 1953 dout("sync done (thru tid %llu)\n", last_tid); 1954 } 1955 EXPORT_SYMBOL(ceph_osdc_sync); 1956 1957 /* 1958 * init, shutdown 1959 */ 1960 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 1961 { 1962 int err; 1963 1964 dout("init\n"); 1965 osdc->client = client; 1966 osdc->osdmap = NULL; 1967 init_rwsem(&osdc->map_sem); 1968 init_completion(&osdc->map_waiters); 1969 osdc->last_requested_map = 0; 1970 mutex_init(&osdc->request_mutex); 1971 osdc->last_tid = 0; 1972 osdc->osds = RB_ROOT; 1973 INIT_LIST_HEAD(&osdc->osd_lru); 1974 osdc->requests = RB_ROOT; 1975 INIT_LIST_HEAD(&osdc->req_lru); 1976 INIT_LIST_HEAD(&osdc->req_unsent); 1977 INIT_LIST_HEAD(&osdc->req_notarget); 1978 INIT_LIST_HEAD(&osdc->req_linger); 1979 osdc->num_requests = 0; 1980 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 1981 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 1982 spin_lock_init(&osdc->event_lock); 1983 osdc->event_tree = RB_ROOT; 1984 osdc->event_count = 0; 1985 1986 schedule_delayed_work(&osdc->osds_timeout_work, 1987 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 1988 1989 err = -ENOMEM; 1990 osdc->req_mempool = mempool_create_kmalloc_pool(10, 1991 sizeof(struct ceph_osd_request)); 1992 if (!osdc->req_mempool) 1993 goto out; 1994 1995 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 1996 OSD_OP_FRONT_LEN, 10, true, 1997 "osd_op"); 1998 if (err < 0) 1999 goto out_mempool; 2000 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2001 OSD_OPREPLY_FRONT_LEN, 10, true, 2002 "osd_op_reply"); 2003 if (err < 0) 2004 goto out_msgpool; 2005 2006 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2007 if (IS_ERR(osdc->notify_wq)) { 2008 err = PTR_ERR(osdc->notify_wq); 2009 osdc->notify_wq = NULL; 2010 goto out_msgpool; 2011 } 2012 return 0; 2013 2014 out_msgpool: 2015 ceph_msgpool_destroy(&osdc->msgpool_op); 2016 out_mempool: 2017 mempool_destroy(osdc->req_mempool); 2018 out: 2019 return err; 2020 } 2021 2022 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2023 { 2024 flush_workqueue(osdc->notify_wq); 2025 destroy_workqueue(osdc->notify_wq); 2026 cancel_delayed_work_sync(&osdc->timeout_work); 2027 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2028 if (osdc->osdmap) { 2029 ceph_osdmap_destroy(osdc->osdmap); 2030 osdc->osdmap = NULL; 2031 } 2032 remove_all_osds(osdc); 2033 mempool_destroy(osdc->req_mempool); 2034 ceph_msgpool_destroy(&osdc->msgpool_op); 2035 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2036 } 2037 2038 /* 2039 * Read some contiguous pages. If we cross a stripe boundary, shorten 2040 * *plen. Return number of bytes read, or error. 2041 */ 2042 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2043 struct ceph_vino vino, struct ceph_file_layout *layout, 2044 u64 off, u64 *plen, 2045 u32 truncate_seq, u64 truncate_size, 2046 struct page **pages, int num_pages, int page_align) 2047 { 2048 struct ceph_osd_request *req; 2049 struct ceph_osd_data *osd_data; 2050 int rc = 0; 2051 2052 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2053 vino.snap, off, *plen); 2054 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 2055 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2056 NULL, 0, truncate_seq, truncate_size, NULL, 2057 false); 2058 if (IS_ERR(req)) 2059 return PTR_ERR(req); 2060 2061 /* it may be a short read due to an object boundary */ 2062 2063 osd_data = &req->r_data_in; 2064 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2065 osd_data->pages = pages; 2066 osd_data->length = *plen; 2067 osd_data->alignment = page_align; 2068 2069 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2070 off, *plen, osd_data->length, page_align); 2071 2072 rc = ceph_osdc_start_request(osdc, req, false); 2073 if (!rc) 2074 rc = ceph_osdc_wait_request(osdc, req); 2075 2076 ceph_osdc_put_request(req); 2077 dout("readpages result %d\n", rc); 2078 return rc; 2079 } 2080 EXPORT_SYMBOL(ceph_osdc_readpages); 2081 2082 /* 2083 * do a synchronous write on N pages 2084 */ 2085 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2086 struct ceph_file_layout *layout, 2087 struct ceph_snap_context *snapc, 2088 u64 off, u64 len, 2089 u32 truncate_seq, u64 truncate_size, 2090 struct timespec *mtime, 2091 struct page **pages, int num_pages) 2092 { 2093 struct ceph_osd_request *req; 2094 struct ceph_osd_data *osd_data; 2095 int rc = 0; 2096 int page_align = off & ~PAGE_MASK; 2097 2098 BUG_ON(vino.snap != CEPH_NOSNAP); 2099 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 2100 CEPH_OSD_OP_WRITE, 2101 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2102 snapc, 0, 2103 truncate_seq, truncate_size, mtime, 2104 true); 2105 if (IS_ERR(req)) 2106 return PTR_ERR(req); 2107 2108 /* it may be a short write due to an object boundary */ 2109 osd_data = &req->r_data_out; 2110 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 2111 osd_data->pages = pages; 2112 osd_data->length = len; 2113 osd_data->alignment = page_align; 2114 dout("writepages %llu~%llu (%llu bytes)\n", off, len, osd_data->length); 2115 2116 rc = ceph_osdc_start_request(osdc, req, true); 2117 if (!rc) 2118 rc = ceph_osdc_wait_request(osdc, req); 2119 2120 ceph_osdc_put_request(req); 2121 if (rc == 0) 2122 rc = len; 2123 dout("writepages result %d\n", rc); 2124 return rc; 2125 } 2126 EXPORT_SYMBOL(ceph_osdc_writepages); 2127 2128 /* 2129 * handle incoming message 2130 */ 2131 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2132 { 2133 struct ceph_osd *osd = con->private; 2134 struct ceph_osd_client *osdc; 2135 int type = le16_to_cpu(msg->hdr.type); 2136 2137 if (!osd) 2138 goto out; 2139 osdc = osd->o_osdc; 2140 2141 switch (type) { 2142 case CEPH_MSG_OSD_MAP: 2143 ceph_osdc_handle_map(osdc, msg); 2144 break; 2145 case CEPH_MSG_OSD_OPREPLY: 2146 handle_reply(osdc, msg, con); 2147 break; 2148 case CEPH_MSG_WATCH_NOTIFY: 2149 handle_watch_notify(osdc, msg); 2150 break; 2151 2152 default: 2153 pr_err("received unknown message type %d %s\n", type, 2154 ceph_msg_type_name(type)); 2155 } 2156 out: 2157 ceph_msg_put(msg); 2158 } 2159 2160 /* 2161 * lookup and return message for incoming reply. set up reply message 2162 * pages. 2163 */ 2164 static struct ceph_msg *get_reply(struct ceph_connection *con, 2165 struct ceph_msg_header *hdr, 2166 int *skip) 2167 { 2168 struct ceph_osd *osd = con->private; 2169 struct ceph_osd_client *osdc = osd->o_osdc; 2170 struct ceph_msg *m; 2171 struct ceph_osd_request *req; 2172 int front = le32_to_cpu(hdr->front_len); 2173 int data_len = le32_to_cpu(hdr->data_len); 2174 u64 tid; 2175 2176 tid = le64_to_cpu(hdr->tid); 2177 mutex_lock(&osdc->request_mutex); 2178 req = __lookup_request(osdc, tid); 2179 if (!req) { 2180 *skip = 1; 2181 m = NULL; 2182 dout("get_reply unknown tid %llu from osd%d\n", tid, 2183 osd->o_osd); 2184 goto out; 2185 } 2186 2187 if (req->r_reply->con) 2188 dout("%s revoking msg %p from old con %p\n", __func__, 2189 req->r_reply, req->r_reply->con); 2190 ceph_msg_revoke_incoming(req->r_reply); 2191 2192 if (front > req->r_reply->front.iov_len) { 2193 pr_warning("get_reply front %d > preallocated %d\n", 2194 front, (int)req->r_reply->front.iov_len); 2195 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2196 if (!m) 2197 goto out; 2198 ceph_msg_put(req->r_reply); 2199 req->r_reply = m; 2200 } 2201 m = ceph_msg_get(req->r_reply); 2202 2203 if (data_len > 0) { 2204 struct ceph_osd_data *osd_data = &req->r_data_in; 2205 2206 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2207 if (osd_data->pages && 2208 unlikely(osd_data->length < data_len)) { 2209 2210 pr_warning("tid %lld reply has %d bytes " 2211 "we had only %llu bytes ready\n", 2212 tid, data_len, osd_data->length); 2213 *skip = 1; 2214 ceph_msg_put(m); 2215 m = NULL; 2216 goto out; 2217 } 2218 } 2219 } 2220 *skip = 0; 2221 dout("get_reply tid %lld %p\n", tid, m); 2222 2223 out: 2224 mutex_unlock(&osdc->request_mutex); 2225 return m; 2226 2227 } 2228 2229 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2230 struct ceph_msg_header *hdr, 2231 int *skip) 2232 { 2233 struct ceph_osd *osd = con->private; 2234 int type = le16_to_cpu(hdr->type); 2235 int front = le32_to_cpu(hdr->front_len); 2236 2237 *skip = 0; 2238 switch (type) { 2239 case CEPH_MSG_OSD_MAP: 2240 case CEPH_MSG_WATCH_NOTIFY: 2241 return ceph_msg_new(type, front, GFP_NOFS, false); 2242 case CEPH_MSG_OSD_OPREPLY: 2243 return get_reply(con, hdr, skip); 2244 default: 2245 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2246 osd->o_osd); 2247 *skip = 1; 2248 return NULL; 2249 } 2250 } 2251 2252 /* 2253 * Wrappers to refcount containing ceph_osd struct 2254 */ 2255 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2256 { 2257 struct ceph_osd *osd = con->private; 2258 if (get_osd(osd)) 2259 return con; 2260 return NULL; 2261 } 2262 2263 static void put_osd_con(struct ceph_connection *con) 2264 { 2265 struct ceph_osd *osd = con->private; 2266 put_osd(osd); 2267 } 2268 2269 /* 2270 * authentication 2271 */ 2272 /* 2273 * Note: returned pointer is the address of a structure that's 2274 * managed separately. Caller must *not* attempt to free it. 2275 */ 2276 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2277 int *proto, int force_new) 2278 { 2279 struct ceph_osd *o = con->private; 2280 struct ceph_osd_client *osdc = o->o_osdc; 2281 struct ceph_auth_client *ac = osdc->client->monc.auth; 2282 struct ceph_auth_handshake *auth = &o->o_auth; 2283 2284 if (force_new && auth->authorizer) { 2285 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2286 auth->authorizer = NULL; 2287 } 2288 if (!auth->authorizer) { 2289 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2290 auth); 2291 if (ret) 2292 return ERR_PTR(ret); 2293 } else { 2294 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2295 auth); 2296 if (ret) 2297 return ERR_PTR(ret); 2298 } 2299 *proto = ac->protocol; 2300 2301 return auth; 2302 } 2303 2304 2305 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2306 { 2307 struct ceph_osd *o = con->private; 2308 struct ceph_osd_client *osdc = o->o_osdc; 2309 struct ceph_auth_client *ac = osdc->client->monc.auth; 2310 2311 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2312 } 2313 2314 static int invalidate_authorizer(struct ceph_connection *con) 2315 { 2316 struct ceph_osd *o = con->private; 2317 struct ceph_osd_client *osdc = o->o_osdc; 2318 struct ceph_auth_client *ac = osdc->client->monc.auth; 2319 2320 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2321 return ceph_monc_validate_auth(&osdc->client->monc); 2322 } 2323 2324 static const struct ceph_connection_operations osd_con_ops = { 2325 .get = get_osd_con, 2326 .put = put_osd_con, 2327 .dispatch = dispatch, 2328 .get_authorizer = get_authorizer, 2329 .verify_authorizer_reply = verify_authorizer_reply, 2330 .invalidate_authorizer = invalidate_authorizer, 2331 .alloc_msg = alloc_msg, 2332 .fault = osd_reset, 2333 }; 2334