1 #include <linux/ceph/ceph_debug.h> 2 3 #include <linux/module.h> 4 #include <linux/err.h> 5 #include <linux/highmem.h> 6 #include <linux/mm.h> 7 #include <linux/pagemap.h> 8 #include <linux/slab.h> 9 #include <linux/uaccess.h> 10 #ifdef CONFIG_BLOCK 11 #include <linux/bio.h> 12 #endif 13 14 #include <linux/ceph/libceph.h> 15 #include <linux/ceph/osd_client.h> 16 #include <linux/ceph/messenger.h> 17 #include <linux/ceph/decode.h> 18 #include <linux/ceph/auth.h> 19 #include <linux/ceph/pagelist.h> 20 21 #define OSD_OP_FRONT_LEN 4096 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static const struct ceph_connection_operations osd_con_ops; 25 26 static void __send_queued(struct ceph_osd_client *osdc); 27 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 28 static void __register_request(struct ceph_osd_client *osdc, 29 struct ceph_osd_request *req); 30 static void __unregister_linger_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __send_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 35 /* 36 * Implement client access to distributed object storage cluster. 37 * 38 * All data objects are stored within a cluster/cloud of OSDs, or 39 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 40 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 41 * remote daemons serving up and coordinating consistent and safe 42 * access to storage. 43 * 44 * Cluster membership and the mapping of data objects onto storage devices 45 * are described by the osd map. 46 * 47 * We keep track of pending OSD requests (read, write), resubmit 48 * requests to different OSDs when the cluster topology/data layout 49 * change, or retry the affected requests when the communications 50 * channel with an OSD is reset. 51 */ 52 53 /* 54 * calculate the mapping of a file extent onto an object, and fill out the 55 * request accordingly. shorten extent as necessary if it crosses an 56 * object boundary. 57 * 58 * fill osd op in request message. 59 */ 60 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 61 u64 *objnum, u64 *objoff, u64 *objlen) 62 { 63 u64 orig_len = *plen; 64 int r; 65 66 /* object extent? */ 67 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 68 objoff, objlen); 69 if (r < 0) 70 return r; 71 if (*objlen < orig_len) { 72 *plen = *objlen; 73 dout(" skipping last %llu, final file extent %llu~%llu\n", 74 orig_len - *plen, off, *plen); 75 } 76 77 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 78 79 return 0; 80 } 81 82 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 83 { 84 memset(osd_data, 0, sizeof (*osd_data)); 85 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 86 } 87 88 void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 89 struct page **pages, u64 length, u32 alignment, 90 bool pages_from_pool, bool own_pages) 91 { 92 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 93 osd_data->pages = pages; 94 osd_data->length = length; 95 osd_data->alignment = alignment; 96 osd_data->pages_from_pool = pages_from_pool; 97 osd_data->own_pages = own_pages; 98 } 99 EXPORT_SYMBOL(ceph_osd_data_pages_init); 100 101 void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 102 struct ceph_pagelist *pagelist) 103 { 104 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 105 osd_data->pagelist = pagelist; 106 } 107 EXPORT_SYMBOL(ceph_osd_data_pagelist_init); 108 109 #ifdef CONFIG_BLOCK 110 void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 111 struct bio *bio, size_t bio_length) 112 { 113 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 114 osd_data->bio = bio; 115 osd_data->bio_length = bio_length; 116 } 117 EXPORT_SYMBOL(ceph_osd_data_bio_init); 118 #endif /* CONFIG_BLOCK */ 119 120 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 121 { 122 switch (osd_data->type) { 123 case CEPH_OSD_DATA_TYPE_NONE: 124 return 0; 125 case CEPH_OSD_DATA_TYPE_PAGES: 126 return osd_data->length; 127 case CEPH_OSD_DATA_TYPE_PAGELIST: 128 return (u64)osd_data->pagelist->length; 129 #ifdef CONFIG_BLOCK 130 case CEPH_OSD_DATA_TYPE_BIO: 131 return (u64)osd_data->bio_length; 132 #endif /* CONFIG_BLOCK */ 133 default: 134 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 135 return 0; 136 } 137 } 138 139 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 140 { 141 if (osd_data->type != CEPH_OSD_DATA_TYPE_PAGES) 142 return; 143 144 if (osd_data->own_pages) { 145 int num_pages; 146 147 num_pages = calc_pages_for((u64)osd_data->alignment, 148 (u64)osd_data->length); 149 ceph_release_page_vector(osd_data->pages, num_pages); 150 } 151 } 152 153 /* 154 * requests 155 */ 156 void ceph_osdc_release_request(struct kref *kref) 157 { 158 struct ceph_osd_request *req; 159 160 req = container_of(kref, struct ceph_osd_request, r_kref); 161 if (req->r_request) 162 ceph_msg_put(req->r_request); 163 if (req->r_reply) { 164 ceph_msg_revoke_incoming(req->r_reply); 165 ceph_msg_put(req->r_reply); 166 } 167 168 ceph_osd_data_release(&req->r_data_in); 169 ceph_osd_data_release(&req->r_data_out); 170 171 ceph_put_snap_context(req->r_snapc); 172 if (req->r_mempool) 173 mempool_free(req, req->r_osdc->req_mempool); 174 else 175 kfree(req); 176 } 177 EXPORT_SYMBOL(ceph_osdc_release_request); 178 179 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 180 struct ceph_snap_context *snapc, 181 unsigned int num_ops, 182 bool use_mempool, 183 gfp_t gfp_flags) 184 { 185 struct ceph_osd_request *req; 186 struct ceph_msg *msg; 187 size_t msg_size; 188 189 BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX); 190 BUG_ON(num_ops > CEPH_OSD_MAX_OP); 191 192 msg_size = 4 + 4 + 8 + 8 + 4+8; 193 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 194 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 195 msg_size += 4 + MAX_OBJ_NAME_SIZE; 196 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 197 msg_size += 8; /* snapid */ 198 msg_size += 8; /* snap_seq */ 199 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 200 msg_size += 4; 201 202 if (use_mempool) { 203 req = mempool_alloc(osdc->req_mempool, gfp_flags); 204 memset(req, 0, sizeof(*req)); 205 } else { 206 req = kzalloc(sizeof(*req), gfp_flags); 207 } 208 if (req == NULL) 209 return NULL; 210 211 req->r_osdc = osdc; 212 req->r_mempool = use_mempool; 213 req->r_num_ops = num_ops; 214 215 kref_init(&req->r_kref); 216 init_completion(&req->r_completion); 217 init_completion(&req->r_safe_completion); 218 RB_CLEAR_NODE(&req->r_node); 219 INIT_LIST_HEAD(&req->r_unsafe_item); 220 INIT_LIST_HEAD(&req->r_linger_item); 221 INIT_LIST_HEAD(&req->r_linger_osd); 222 INIT_LIST_HEAD(&req->r_req_lru_item); 223 INIT_LIST_HEAD(&req->r_osd_item); 224 225 /* create reply message */ 226 if (use_mempool) 227 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 228 else 229 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 230 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 231 if (!msg) { 232 ceph_osdc_put_request(req); 233 return NULL; 234 } 235 req->r_reply = msg; 236 237 ceph_osd_data_init(&req->r_data_in); 238 ceph_osd_data_init(&req->r_data_out); 239 240 /* create request message; allow space for oid */ 241 if (use_mempool) 242 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 243 else 244 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 245 if (!msg) { 246 ceph_osdc_put_request(req); 247 return NULL; 248 } 249 250 memset(msg->front.iov_base, 0, msg->front.iov_len); 251 252 req->r_request = msg; 253 254 return req; 255 } 256 EXPORT_SYMBOL(ceph_osdc_alloc_request); 257 258 static bool osd_req_opcode_valid(u16 opcode) 259 { 260 switch (opcode) { 261 case CEPH_OSD_OP_READ: 262 case CEPH_OSD_OP_STAT: 263 case CEPH_OSD_OP_MAPEXT: 264 case CEPH_OSD_OP_MASKTRUNC: 265 case CEPH_OSD_OP_SPARSE_READ: 266 case CEPH_OSD_OP_NOTIFY: 267 case CEPH_OSD_OP_NOTIFY_ACK: 268 case CEPH_OSD_OP_ASSERT_VER: 269 case CEPH_OSD_OP_WRITE: 270 case CEPH_OSD_OP_WRITEFULL: 271 case CEPH_OSD_OP_TRUNCATE: 272 case CEPH_OSD_OP_ZERO: 273 case CEPH_OSD_OP_DELETE: 274 case CEPH_OSD_OP_APPEND: 275 case CEPH_OSD_OP_STARTSYNC: 276 case CEPH_OSD_OP_SETTRUNC: 277 case CEPH_OSD_OP_TRIMTRUNC: 278 case CEPH_OSD_OP_TMAPUP: 279 case CEPH_OSD_OP_TMAPPUT: 280 case CEPH_OSD_OP_TMAPGET: 281 case CEPH_OSD_OP_CREATE: 282 case CEPH_OSD_OP_ROLLBACK: 283 case CEPH_OSD_OP_WATCH: 284 case CEPH_OSD_OP_OMAPGETKEYS: 285 case CEPH_OSD_OP_OMAPGETVALS: 286 case CEPH_OSD_OP_OMAPGETHEADER: 287 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 288 case CEPH_OSD_OP_OMAPSETVALS: 289 case CEPH_OSD_OP_OMAPSETHEADER: 290 case CEPH_OSD_OP_OMAPCLEAR: 291 case CEPH_OSD_OP_OMAPRMKEYS: 292 case CEPH_OSD_OP_OMAP_CMP: 293 case CEPH_OSD_OP_CLONERANGE: 294 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 295 case CEPH_OSD_OP_SRC_CMPXATTR: 296 case CEPH_OSD_OP_GETXATTR: 297 case CEPH_OSD_OP_GETXATTRS: 298 case CEPH_OSD_OP_CMPXATTR: 299 case CEPH_OSD_OP_SETXATTR: 300 case CEPH_OSD_OP_SETXATTRS: 301 case CEPH_OSD_OP_RESETXATTRS: 302 case CEPH_OSD_OP_RMXATTR: 303 case CEPH_OSD_OP_PULL: 304 case CEPH_OSD_OP_PUSH: 305 case CEPH_OSD_OP_BALANCEREADS: 306 case CEPH_OSD_OP_UNBALANCEREADS: 307 case CEPH_OSD_OP_SCRUB: 308 case CEPH_OSD_OP_SCRUB_RESERVE: 309 case CEPH_OSD_OP_SCRUB_UNRESERVE: 310 case CEPH_OSD_OP_SCRUB_STOP: 311 case CEPH_OSD_OP_SCRUB_MAP: 312 case CEPH_OSD_OP_WRLOCK: 313 case CEPH_OSD_OP_WRUNLOCK: 314 case CEPH_OSD_OP_RDLOCK: 315 case CEPH_OSD_OP_RDUNLOCK: 316 case CEPH_OSD_OP_UPLOCK: 317 case CEPH_OSD_OP_DNLOCK: 318 case CEPH_OSD_OP_CALL: 319 case CEPH_OSD_OP_PGLS: 320 case CEPH_OSD_OP_PGLS_FILTER: 321 return true; 322 default: 323 return false; 324 } 325 } 326 327 /* 328 * This is an osd op init function for opcodes that have no data or 329 * other information associated with them. It also serves as a 330 * common init routine for all the other init functions, below. 331 */ 332 void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode) 333 { 334 BUG_ON(!osd_req_opcode_valid(opcode)); 335 336 memset(op, 0, sizeof (*op)); 337 338 op->op = opcode; 339 } 340 341 void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode, 342 u64 offset, u64 length, 343 u64 truncate_size, u32 truncate_seq) 344 { 345 size_t payload_len = 0; 346 347 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 348 349 osd_req_op_init(op, opcode); 350 351 op->extent.offset = offset; 352 op->extent.length = length; 353 op->extent.truncate_size = truncate_size; 354 op->extent.truncate_seq = truncate_seq; 355 if (opcode == CEPH_OSD_OP_WRITE) 356 payload_len += length; 357 358 op->payload_len = payload_len; 359 } 360 EXPORT_SYMBOL(osd_req_op_extent_init); 361 362 void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length) 363 { 364 u64 previous = op->extent.length; 365 366 if (length == previous) 367 return; /* Nothing to do */ 368 BUG_ON(length > previous); 369 370 op->extent.length = length; 371 op->payload_len -= previous - length; 372 } 373 EXPORT_SYMBOL(osd_req_op_extent_update); 374 375 void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode, 376 const char *class, const char *method, 377 const void *request_data, size_t request_data_size) 378 { 379 size_t payload_len = 0; 380 size_t size; 381 382 BUG_ON(opcode != CEPH_OSD_OP_CALL); 383 384 osd_req_op_init(op, opcode); 385 386 op->cls.class_name = class; 387 size = strlen(class); 388 BUG_ON(size > (size_t) U8_MAX); 389 op->cls.class_len = size; 390 payload_len += size; 391 392 op->cls.method_name = method; 393 size = strlen(method); 394 BUG_ON(size > (size_t) U8_MAX); 395 op->cls.method_len = size; 396 payload_len += size; 397 398 op->cls.request_data = request_data; 399 BUG_ON(request_data_size > (size_t) U32_MAX); 400 op->cls.request_data_len = (u32) request_data_size; 401 payload_len += request_data_size; 402 403 op->cls.argc = 0; /* currently unused */ 404 405 op->payload_len = payload_len; 406 } 407 EXPORT_SYMBOL(osd_req_op_cls_init); 408 409 void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode, 410 u64 cookie, u64 version, int flag) 411 { 412 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 413 414 osd_req_op_init(op, opcode); 415 416 op->watch.cookie = cookie; 417 /* op->watch.ver = version; */ /* XXX 3847 */ 418 op->watch.ver = cpu_to_le64(version); 419 if (opcode == CEPH_OSD_OP_WATCH && flag) 420 op->watch.flag = (u8) 1; 421 } 422 EXPORT_SYMBOL(osd_req_op_watch_init); 423 424 static u64 osd_req_encode_op(struct ceph_osd_request *req, 425 struct ceph_osd_op *dst, unsigned int which) 426 { 427 struct ceph_osd_req_op *src; 428 u64 request_data_len = 0; 429 struct ceph_pagelist *pagelist; 430 431 BUG_ON(which >= req->r_num_ops); 432 src = &req->r_ops[which]; 433 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 434 pr_err("unrecognized osd opcode %d\n", src->op); 435 436 return 0; 437 } 438 439 switch (src->op) { 440 case CEPH_OSD_OP_STAT: 441 break; 442 case CEPH_OSD_OP_READ: 443 case CEPH_OSD_OP_WRITE: 444 if (src->op == CEPH_OSD_OP_WRITE) 445 request_data_len = src->extent.length; 446 dst->extent.offset = cpu_to_le64(src->extent.offset); 447 dst->extent.length = cpu_to_le64(src->extent.length); 448 dst->extent.truncate_size = 449 cpu_to_le64(src->extent.truncate_size); 450 dst->extent.truncate_seq = 451 cpu_to_le32(src->extent.truncate_seq); 452 break; 453 case CEPH_OSD_OP_CALL: 454 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 455 BUG_ON(!pagelist); 456 ceph_pagelist_init(pagelist); 457 458 dst->cls.class_len = src->cls.class_len; 459 dst->cls.method_len = src->cls.method_len; 460 dst->cls.indata_len = cpu_to_le32(src->cls.request_data_len); 461 ceph_pagelist_append(pagelist, src->cls.class_name, 462 src->cls.class_len); 463 ceph_pagelist_append(pagelist, src->cls.method_name, 464 src->cls.method_len); 465 ceph_pagelist_append(pagelist, src->cls.request_data, 466 src->cls.request_data_len); 467 468 ceph_osd_data_pagelist_init(&req->r_data_out, pagelist); 469 request_data_len = pagelist->length; 470 break; 471 case CEPH_OSD_OP_STARTSYNC: 472 break; 473 case CEPH_OSD_OP_NOTIFY_ACK: 474 case CEPH_OSD_OP_WATCH: 475 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 476 dst->watch.ver = cpu_to_le64(src->watch.ver); 477 dst->watch.flag = src->watch.flag; 478 break; 479 default: 480 pr_err("unsupported osd opcode %s\n", 481 ceph_osd_op_name(src->op)); 482 WARN_ON(1); 483 484 return 0; 485 } 486 dst->op = cpu_to_le16(src->op); 487 dst->payload_len = cpu_to_le32(src->payload_len); 488 489 return request_data_len; 490 } 491 492 /* 493 * build new request AND message 494 * 495 */ 496 void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, 497 struct ceph_snap_context *snapc, u64 snap_id, 498 struct timespec *mtime) 499 { 500 struct ceph_msg *msg = req->r_request; 501 void *p; 502 size_t msg_size; 503 int flags = req->r_flags; 504 u64 data_len; 505 unsigned int i; 506 507 req->r_snapid = snap_id; 508 req->r_snapc = ceph_get_snap_context(snapc); 509 510 /* encode request */ 511 msg->hdr.version = cpu_to_le16(4); 512 513 p = msg->front.iov_base; 514 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 515 req->r_request_osdmap_epoch = p; 516 p += 4; 517 req->r_request_flags = p; 518 p += 4; 519 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 520 ceph_encode_timespec(p, mtime); 521 p += sizeof(struct ceph_timespec); 522 req->r_request_reassert_version = p; 523 p += sizeof(struct ceph_eversion); /* will get filled in */ 524 525 /* oloc */ 526 ceph_encode_8(&p, 4); 527 ceph_encode_8(&p, 4); 528 ceph_encode_32(&p, 8 + 4 + 4); 529 req->r_request_pool = p; 530 p += 8; 531 ceph_encode_32(&p, -1); /* preferred */ 532 ceph_encode_32(&p, 0); /* key len */ 533 534 ceph_encode_8(&p, 1); 535 req->r_request_pgid = p; 536 p += 8 + 4; 537 ceph_encode_32(&p, -1); /* preferred */ 538 539 /* oid */ 540 ceph_encode_32(&p, req->r_oid_len); 541 memcpy(p, req->r_oid, req->r_oid_len); 542 dout("oid '%.*s' len %d\n", req->r_oid_len, req->r_oid, req->r_oid_len); 543 p += req->r_oid_len; 544 545 /* ops--can imply data */ 546 ceph_encode_16(&p, (u16)req->r_num_ops); 547 data_len = 0; 548 for (i = 0; i < req->r_num_ops; i++) { 549 data_len += osd_req_encode_op(req, p, i); 550 p += sizeof(struct ceph_osd_op); 551 } 552 553 /* snaps */ 554 ceph_encode_64(&p, req->r_snapid); 555 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 556 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 557 if (req->r_snapc) { 558 for (i = 0; i < snapc->num_snaps; i++) { 559 ceph_encode_64(&p, req->r_snapc->snaps[i]); 560 } 561 } 562 563 req->r_request_attempts = p; 564 p += 4; 565 566 /* data */ 567 if (flags & CEPH_OSD_FLAG_WRITE) { 568 u16 data_off; 569 570 /* 571 * The header "data_off" is a hint to the receiver 572 * allowing it to align received data into its 573 * buffers such that there's no need to re-copy 574 * it before writing it to disk (direct I/O). 575 */ 576 data_off = (u16) (off & 0xffff); 577 req->r_request->hdr.data_off = cpu_to_le16(data_off); 578 } 579 req->r_request->hdr.data_len = cpu_to_le32(data_len); 580 581 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 582 msg_size = p - msg->front.iov_base; 583 msg->front.iov_len = msg_size; 584 msg->hdr.front_len = cpu_to_le32(msg_size); 585 586 dout("build_request msg_size was %d\n", (int)msg_size); 587 } 588 EXPORT_SYMBOL(ceph_osdc_build_request); 589 590 /* 591 * build new request AND message, calculate layout, and adjust file 592 * extent as needed. 593 * 594 * if the file was recently truncated, we include information about its 595 * old and new size so that the object can be updated appropriately. (we 596 * avoid synchronously deleting truncated objects because it's slow.) 597 * 598 * if @do_sync, include a 'startsync' command so that the osd will flush 599 * data quickly. 600 */ 601 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 602 struct ceph_file_layout *layout, 603 struct ceph_vino vino, 604 u64 off, u64 *plen, int num_ops, 605 int opcode, int flags, 606 struct ceph_snap_context *snapc, 607 u32 truncate_seq, 608 u64 truncate_size, 609 bool use_mempool) 610 { 611 struct ceph_osd_request *req; 612 struct ceph_osd_req_op *op; 613 u64 objnum = 0; 614 u64 objoff = 0; 615 u64 objlen = 0; 616 u32 object_size; 617 u64 object_base; 618 int r; 619 620 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE); 621 622 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 623 GFP_NOFS); 624 if (!req) 625 return ERR_PTR(-ENOMEM); 626 627 req->r_flags = flags; 628 629 /* calculate max write size */ 630 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 631 if (r < 0) { 632 ceph_osdc_put_request(req); 633 return ERR_PTR(r); 634 } 635 636 object_size = le32_to_cpu(layout->fl_object_size); 637 object_base = off - objoff; 638 if (truncate_size <= object_base) { 639 truncate_size = 0; 640 } else { 641 truncate_size -= object_base; 642 if (truncate_size > object_size) 643 truncate_size = object_size; 644 } 645 646 op = &req->r_ops[0]; 647 osd_req_op_extent_init(op, opcode, objoff, objlen, 648 truncate_size, truncate_seq); 649 /* 650 * A second op in the ops array means the caller wants to 651 * also issue a include a 'startsync' command so that the 652 * osd will flush data quickly. 653 */ 654 if (num_ops > 1) 655 osd_req_op_init(++op, CEPH_OSD_OP_STARTSYNC); 656 657 req->r_file_layout = *layout; /* keep a copy */ 658 659 snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", 660 vino.ino, objnum); 661 req->r_oid_len = strlen(req->r_oid); 662 663 return req; 664 } 665 EXPORT_SYMBOL(ceph_osdc_new_request); 666 667 /* 668 * We keep osd requests in an rbtree, sorted by ->r_tid. 669 */ 670 static void __insert_request(struct ceph_osd_client *osdc, 671 struct ceph_osd_request *new) 672 { 673 struct rb_node **p = &osdc->requests.rb_node; 674 struct rb_node *parent = NULL; 675 struct ceph_osd_request *req = NULL; 676 677 while (*p) { 678 parent = *p; 679 req = rb_entry(parent, struct ceph_osd_request, r_node); 680 if (new->r_tid < req->r_tid) 681 p = &(*p)->rb_left; 682 else if (new->r_tid > req->r_tid) 683 p = &(*p)->rb_right; 684 else 685 BUG(); 686 } 687 688 rb_link_node(&new->r_node, parent, p); 689 rb_insert_color(&new->r_node, &osdc->requests); 690 } 691 692 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 693 u64 tid) 694 { 695 struct ceph_osd_request *req; 696 struct rb_node *n = osdc->requests.rb_node; 697 698 while (n) { 699 req = rb_entry(n, struct ceph_osd_request, r_node); 700 if (tid < req->r_tid) 701 n = n->rb_left; 702 else if (tid > req->r_tid) 703 n = n->rb_right; 704 else 705 return req; 706 } 707 return NULL; 708 } 709 710 static struct ceph_osd_request * 711 __lookup_request_ge(struct ceph_osd_client *osdc, 712 u64 tid) 713 { 714 struct ceph_osd_request *req; 715 struct rb_node *n = osdc->requests.rb_node; 716 717 while (n) { 718 req = rb_entry(n, struct ceph_osd_request, r_node); 719 if (tid < req->r_tid) { 720 if (!n->rb_left) 721 return req; 722 n = n->rb_left; 723 } else if (tid > req->r_tid) { 724 n = n->rb_right; 725 } else { 726 return req; 727 } 728 } 729 return NULL; 730 } 731 732 /* 733 * Resubmit requests pending on the given osd. 734 */ 735 static void __kick_osd_requests(struct ceph_osd_client *osdc, 736 struct ceph_osd *osd) 737 { 738 struct ceph_osd_request *req, *nreq; 739 LIST_HEAD(resend); 740 int err; 741 742 dout("__kick_osd_requests osd%d\n", osd->o_osd); 743 err = __reset_osd(osdc, osd); 744 if (err) 745 return; 746 /* 747 * Build up a list of requests to resend by traversing the 748 * osd's list of requests. Requests for a given object are 749 * sent in tid order, and that is also the order they're 750 * kept on this list. Therefore all requests that are in 751 * flight will be found first, followed by all requests that 752 * have not yet been sent. And to resend requests while 753 * preserving this order we will want to put any sent 754 * requests back on the front of the osd client's unsent 755 * list. 756 * 757 * So we build a separate ordered list of already-sent 758 * requests for the affected osd and splice it onto the 759 * front of the osd client's unsent list. Once we've seen a 760 * request that has not yet been sent we're done. Those 761 * requests are already sitting right where they belong. 762 */ 763 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 764 if (!req->r_sent) 765 break; 766 list_move_tail(&req->r_req_lru_item, &resend); 767 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 768 osd->o_osd); 769 if (!req->r_linger) 770 req->r_flags |= CEPH_OSD_FLAG_RETRY; 771 } 772 list_splice(&resend, &osdc->req_unsent); 773 774 /* 775 * Linger requests are re-registered before sending, which 776 * sets up a new tid for each. We add them to the unsent 777 * list at the end to keep things in tid order. 778 */ 779 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 780 r_linger_osd) { 781 /* 782 * reregister request prior to unregistering linger so 783 * that r_osd is preserved. 784 */ 785 BUG_ON(!list_empty(&req->r_req_lru_item)); 786 __register_request(osdc, req); 787 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); 788 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 789 __unregister_linger_request(osdc, req); 790 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 791 osd->o_osd); 792 } 793 } 794 795 /* 796 * If the osd connection drops, we need to resubmit all requests. 797 */ 798 static void osd_reset(struct ceph_connection *con) 799 { 800 struct ceph_osd *osd = con->private; 801 struct ceph_osd_client *osdc; 802 803 if (!osd) 804 return; 805 dout("osd_reset osd%d\n", osd->o_osd); 806 osdc = osd->o_osdc; 807 down_read(&osdc->map_sem); 808 mutex_lock(&osdc->request_mutex); 809 __kick_osd_requests(osdc, osd); 810 __send_queued(osdc); 811 mutex_unlock(&osdc->request_mutex); 812 up_read(&osdc->map_sem); 813 } 814 815 /* 816 * Track open sessions with osds. 817 */ 818 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 819 { 820 struct ceph_osd *osd; 821 822 osd = kzalloc(sizeof(*osd), GFP_NOFS); 823 if (!osd) 824 return NULL; 825 826 atomic_set(&osd->o_ref, 1); 827 osd->o_osdc = osdc; 828 osd->o_osd = onum; 829 RB_CLEAR_NODE(&osd->o_node); 830 INIT_LIST_HEAD(&osd->o_requests); 831 INIT_LIST_HEAD(&osd->o_linger_requests); 832 INIT_LIST_HEAD(&osd->o_osd_lru); 833 osd->o_incarnation = 1; 834 835 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 836 837 INIT_LIST_HEAD(&osd->o_keepalive_item); 838 return osd; 839 } 840 841 static struct ceph_osd *get_osd(struct ceph_osd *osd) 842 { 843 if (atomic_inc_not_zero(&osd->o_ref)) { 844 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 845 atomic_read(&osd->o_ref)); 846 return osd; 847 } else { 848 dout("get_osd %p FAIL\n", osd); 849 return NULL; 850 } 851 } 852 853 static void put_osd(struct ceph_osd *osd) 854 { 855 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 856 atomic_read(&osd->o_ref) - 1); 857 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 858 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 859 860 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 861 kfree(osd); 862 } 863 } 864 865 /* 866 * remove an osd from our map 867 */ 868 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 869 { 870 dout("__remove_osd %p\n", osd); 871 BUG_ON(!list_empty(&osd->o_requests)); 872 rb_erase(&osd->o_node, &osdc->osds); 873 list_del_init(&osd->o_osd_lru); 874 ceph_con_close(&osd->o_con); 875 put_osd(osd); 876 } 877 878 static void remove_all_osds(struct ceph_osd_client *osdc) 879 { 880 dout("%s %p\n", __func__, osdc); 881 mutex_lock(&osdc->request_mutex); 882 while (!RB_EMPTY_ROOT(&osdc->osds)) { 883 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 884 struct ceph_osd, o_node); 885 __remove_osd(osdc, osd); 886 } 887 mutex_unlock(&osdc->request_mutex); 888 } 889 890 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 891 struct ceph_osd *osd) 892 { 893 dout("__move_osd_to_lru %p\n", osd); 894 BUG_ON(!list_empty(&osd->o_osd_lru)); 895 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 896 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 897 } 898 899 static void __remove_osd_from_lru(struct ceph_osd *osd) 900 { 901 dout("__remove_osd_from_lru %p\n", osd); 902 if (!list_empty(&osd->o_osd_lru)) 903 list_del_init(&osd->o_osd_lru); 904 } 905 906 static void remove_old_osds(struct ceph_osd_client *osdc) 907 { 908 struct ceph_osd *osd, *nosd; 909 910 dout("__remove_old_osds %p\n", osdc); 911 mutex_lock(&osdc->request_mutex); 912 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 913 if (time_before(jiffies, osd->lru_ttl)) 914 break; 915 __remove_osd(osdc, osd); 916 } 917 mutex_unlock(&osdc->request_mutex); 918 } 919 920 /* 921 * reset osd connect 922 */ 923 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 924 { 925 struct ceph_entity_addr *peer_addr; 926 927 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 928 if (list_empty(&osd->o_requests) && 929 list_empty(&osd->o_linger_requests)) { 930 __remove_osd(osdc, osd); 931 932 return -ENODEV; 933 } 934 935 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 936 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 937 !ceph_con_opened(&osd->o_con)) { 938 struct ceph_osd_request *req; 939 940 dout(" osd addr hasn't changed and connection never opened," 941 " letting msgr retry"); 942 /* touch each r_stamp for handle_timeout()'s benfit */ 943 list_for_each_entry(req, &osd->o_requests, r_osd_item) 944 req->r_stamp = jiffies; 945 946 return -EAGAIN; 947 } 948 949 ceph_con_close(&osd->o_con); 950 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 951 osd->o_incarnation++; 952 953 return 0; 954 } 955 956 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 957 { 958 struct rb_node **p = &osdc->osds.rb_node; 959 struct rb_node *parent = NULL; 960 struct ceph_osd *osd = NULL; 961 962 dout("__insert_osd %p osd%d\n", new, new->o_osd); 963 while (*p) { 964 parent = *p; 965 osd = rb_entry(parent, struct ceph_osd, o_node); 966 if (new->o_osd < osd->o_osd) 967 p = &(*p)->rb_left; 968 else if (new->o_osd > osd->o_osd) 969 p = &(*p)->rb_right; 970 else 971 BUG(); 972 } 973 974 rb_link_node(&new->o_node, parent, p); 975 rb_insert_color(&new->o_node, &osdc->osds); 976 } 977 978 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 979 { 980 struct ceph_osd *osd; 981 struct rb_node *n = osdc->osds.rb_node; 982 983 while (n) { 984 osd = rb_entry(n, struct ceph_osd, o_node); 985 if (o < osd->o_osd) 986 n = n->rb_left; 987 else if (o > osd->o_osd) 988 n = n->rb_right; 989 else 990 return osd; 991 } 992 return NULL; 993 } 994 995 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 996 { 997 schedule_delayed_work(&osdc->timeout_work, 998 osdc->client->options->osd_keepalive_timeout * HZ); 999 } 1000 1001 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 1002 { 1003 cancel_delayed_work(&osdc->timeout_work); 1004 } 1005 1006 /* 1007 * Register request, assign tid. If this is the first request, set up 1008 * the timeout event. 1009 */ 1010 static void __register_request(struct ceph_osd_client *osdc, 1011 struct ceph_osd_request *req) 1012 { 1013 req->r_tid = ++osdc->last_tid; 1014 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 1015 dout("__register_request %p tid %lld\n", req, req->r_tid); 1016 __insert_request(osdc, req); 1017 ceph_osdc_get_request(req); 1018 osdc->num_requests++; 1019 if (osdc->num_requests == 1) { 1020 dout(" first request, scheduling timeout\n"); 1021 __schedule_osd_timeout(osdc); 1022 } 1023 } 1024 1025 /* 1026 * called under osdc->request_mutex 1027 */ 1028 static void __unregister_request(struct ceph_osd_client *osdc, 1029 struct ceph_osd_request *req) 1030 { 1031 if (RB_EMPTY_NODE(&req->r_node)) { 1032 dout("__unregister_request %p tid %lld not registered\n", 1033 req, req->r_tid); 1034 return; 1035 } 1036 1037 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1038 rb_erase(&req->r_node, &osdc->requests); 1039 osdc->num_requests--; 1040 1041 if (req->r_osd) { 1042 /* make sure the original request isn't in flight. */ 1043 ceph_msg_revoke(req->r_request); 1044 1045 list_del_init(&req->r_osd_item); 1046 if (list_empty(&req->r_osd->o_requests) && 1047 list_empty(&req->r_osd->o_linger_requests)) { 1048 dout("moving osd to %p lru\n", req->r_osd); 1049 __move_osd_to_lru(osdc, req->r_osd); 1050 } 1051 if (list_empty(&req->r_linger_item)) 1052 req->r_osd = NULL; 1053 } 1054 1055 list_del_init(&req->r_req_lru_item); 1056 ceph_osdc_put_request(req); 1057 1058 if (osdc->num_requests == 0) { 1059 dout(" no requests, canceling timeout\n"); 1060 __cancel_osd_timeout(osdc); 1061 } 1062 } 1063 1064 /* 1065 * Cancel a previously queued request message 1066 */ 1067 static void __cancel_request(struct ceph_osd_request *req) 1068 { 1069 if (req->r_sent && req->r_osd) { 1070 ceph_msg_revoke(req->r_request); 1071 req->r_sent = 0; 1072 } 1073 } 1074 1075 static void __register_linger_request(struct ceph_osd_client *osdc, 1076 struct ceph_osd_request *req) 1077 { 1078 dout("__register_linger_request %p\n", req); 1079 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1080 if (req->r_osd) 1081 list_add_tail(&req->r_linger_osd, 1082 &req->r_osd->o_linger_requests); 1083 } 1084 1085 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1086 struct ceph_osd_request *req) 1087 { 1088 dout("__unregister_linger_request %p\n", req); 1089 list_del_init(&req->r_linger_item); 1090 if (req->r_osd) { 1091 list_del_init(&req->r_linger_osd); 1092 1093 if (list_empty(&req->r_osd->o_requests) && 1094 list_empty(&req->r_osd->o_linger_requests)) { 1095 dout("moving osd to %p lru\n", req->r_osd); 1096 __move_osd_to_lru(osdc, req->r_osd); 1097 } 1098 if (list_empty(&req->r_osd_item)) 1099 req->r_osd = NULL; 1100 } 1101 } 1102 1103 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 1104 struct ceph_osd_request *req) 1105 { 1106 mutex_lock(&osdc->request_mutex); 1107 if (req->r_linger) { 1108 __unregister_linger_request(osdc, req); 1109 ceph_osdc_put_request(req); 1110 } 1111 mutex_unlock(&osdc->request_mutex); 1112 } 1113 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 1114 1115 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1116 struct ceph_osd_request *req) 1117 { 1118 if (!req->r_linger) { 1119 dout("set_request_linger %p\n", req); 1120 req->r_linger = 1; 1121 /* 1122 * caller is now responsible for calling 1123 * unregister_linger_request 1124 */ 1125 ceph_osdc_get_request(req); 1126 } 1127 } 1128 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1129 1130 /* 1131 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1132 * (as needed), and set the request r_osd appropriately. If there is 1133 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1134 * (unsent, homeless) or leave on in-flight lru. 1135 * 1136 * Return 0 if unchanged, 1 if changed, or negative on error. 1137 * 1138 * Caller should hold map_sem for read and request_mutex. 1139 */ 1140 static int __map_request(struct ceph_osd_client *osdc, 1141 struct ceph_osd_request *req, int force_resend) 1142 { 1143 struct ceph_pg pgid; 1144 int acting[CEPH_PG_MAX_SIZE]; 1145 int o = -1, num = 0; 1146 int err; 1147 1148 dout("map_request %p tid %lld\n", req, req->r_tid); 1149 err = ceph_calc_ceph_pg(&pgid, req->r_oid, osdc->osdmap, 1150 ceph_file_layout_pg_pool(req->r_file_layout)); 1151 if (err) { 1152 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1153 return err; 1154 } 1155 req->r_pgid = pgid; 1156 1157 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting); 1158 if (err > 0) { 1159 o = acting[0]; 1160 num = err; 1161 } 1162 1163 if ((!force_resend && 1164 req->r_osd && req->r_osd->o_osd == o && 1165 req->r_sent >= req->r_osd->o_incarnation && 1166 req->r_num_pg_osds == num && 1167 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1168 (req->r_osd == NULL && o == -1)) 1169 return 0; /* no change */ 1170 1171 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1172 req->r_tid, pgid.pool, pgid.seed, o, 1173 req->r_osd ? req->r_osd->o_osd : -1); 1174 1175 /* record full pg acting set */ 1176 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1177 req->r_num_pg_osds = num; 1178 1179 if (req->r_osd) { 1180 __cancel_request(req); 1181 list_del_init(&req->r_osd_item); 1182 req->r_osd = NULL; 1183 } 1184 1185 req->r_osd = __lookup_osd(osdc, o); 1186 if (!req->r_osd && o >= 0) { 1187 err = -ENOMEM; 1188 req->r_osd = create_osd(osdc, o); 1189 if (!req->r_osd) { 1190 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1191 goto out; 1192 } 1193 1194 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1195 __insert_osd(osdc, req->r_osd); 1196 1197 ceph_con_open(&req->r_osd->o_con, 1198 CEPH_ENTITY_TYPE_OSD, o, 1199 &osdc->osdmap->osd_addr[o]); 1200 } 1201 1202 if (req->r_osd) { 1203 __remove_osd_from_lru(req->r_osd); 1204 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1205 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1206 } else { 1207 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1208 } 1209 err = 1; /* osd or pg changed */ 1210 1211 out: 1212 return err; 1213 } 1214 1215 /* 1216 * caller should hold map_sem (for read) and request_mutex 1217 */ 1218 static void __send_request(struct ceph_osd_client *osdc, 1219 struct ceph_osd_request *req) 1220 { 1221 void *p; 1222 1223 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1224 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1225 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1226 1227 /* fill in message content that changes each time we send it */ 1228 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1229 put_unaligned_le32(req->r_flags, req->r_request_flags); 1230 put_unaligned_le64(req->r_pgid.pool, req->r_request_pool); 1231 p = req->r_request_pgid; 1232 ceph_encode_64(&p, req->r_pgid.pool); 1233 ceph_encode_32(&p, req->r_pgid.seed); 1234 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1235 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1236 sizeof(req->r_reassert_version)); 1237 1238 req->r_stamp = jiffies; 1239 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1240 1241 ceph_msg_get(req->r_request); /* send consumes a ref */ 1242 ceph_con_send(&req->r_osd->o_con, req->r_request); 1243 req->r_sent = req->r_osd->o_incarnation; 1244 } 1245 1246 /* 1247 * Send any requests in the queue (req_unsent). 1248 */ 1249 static void __send_queued(struct ceph_osd_client *osdc) 1250 { 1251 struct ceph_osd_request *req, *tmp; 1252 1253 dout("__send_queued\n"); 1254 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1255 __send_request(osdc, req); 1256 } 1257 1258 /* 1259 * Timeout callback, called every N seconds when 1 or more osd 1260 * requests has been active for more than N seconds. When this 1261 * happens, we ping all OSDs with requests who have timed out to 1262 * ensure any communications channel reset is detected. Reset the 1263 * request timeouts another N seconds in the future as we go. 1264 * Reschedule the timeout event another N seconds in future (unless 1265 * there are no open requests). 1266 */ 1267 static void handle_timeout(struct work_struct *work) 1268 { 1269 struct ceph_osd_client *osdc = 1270 container_of(work, struct ceph_osd_client, timeout_work.work); 1271 struct ceph_osd_request *req; 1272 struct ceph_osd *osd; 1273 unsigned long keepalive = 1274 osdc->client->options->osd_keepalive_timeout * HZ; 1275 struct list_head slow_osds; 1276 dout("timeout\n"); 1277 down_read(&osdc->map_sem); 1278 1279 ceph_monc_request_next_osdmap(&osdc->client->monc); 1280 1281 mutex_lock(&osdc->request_mutex); 1282 1283 /* 1284 * ping osds that are a bit slow. this ensures that if there 1285 * is a break in the TCP connection we will notice, and reopen 1286 * a connection with that osd (from the fault callback). 1287 */ 1288 INIT_LIST_HEAD(&slow_osds); 1289 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1290 if (time_before(jiffies, req->r_stamp + keepalive)) 1291 break; 1292 1293 osd = req->r_osd; 1294 BUG_ON(!osd); 1295 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1296 req->r_tid, osd->o_osd); 1297 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1298 } 1299 while (!list_empty(&slow_osds)) { 1300 osd = list_entry(slow_osds.next, struct ceph_osd, 1301 o_keepalive_item); 1302 list_del_init(&osd->o_keepalive_item); 1303 ceph_con_keepalive(&osd->o_con); 1304 } 1305 1306 __schedule_osd_timeout(osdc); 1307 __send_queued(osdc); 1308 mutex_unlock(&osdc->request_mutex); 1309 up_read(&osdc->map_sem); 1310 } 1311 1312 static void handle_osds_timeout(struct work_struct *work) 1313 { 1314 struct ceph_osd_client *osdc = 1315 container_of(work, struct ceph_osd_client, 1316 osds_timeout_work.work); 1317 unsigned long delay = 1318 osdc->client->options->osd_idle_ttl * HZ >> 2; 1319 1320 dout("osds timeout\n"); 1321 down_read(&osdc->map_sem); 1322 remove_old_osds(osdc); 1323 up_read(&osdc->map_sem); 1324 1325 schedule_delayed_work(&osdc->osds_timeout_work, 1326 round_jiffies_relative(delay)); 1327 } 1328 1329 static void complete_request(struct ceph_osd_request *req) 1330 { 1331 if (req->r_safe_callback) 1332 req->r_safe_callback(req, NULL); 1333 complete_all(&req->r_safe_completion); /* fsync waiter */ 1334 } 1335 1336 /* 1337 * handle osd op reply. either call the callback if it is specified, 1338 * or do the completion to wake up the waiting thread. 1339 */ 1340 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1341 struct ceph_connection *con) 1342 { 1343 void *p, *end; 1344 struct ceph_osd_request *req; 1345 u64 tid; 1346 int object_len; 1347 unsigned int numops; 1348 int payload_len, flags; 1349 s32 result; 1350 s32 retry_attempt; 1351 struct ceph_pg pg; 1352 int err; 1353 u32 reassert_epoch; 1354 u64 reassert_version; 1355 u32 osdmap_epoch; 1356 int already_completed; 1357 u32 bytes; 1358 unsigned int i; 1359 1360 tid = le64_to_cpu(msg->hdr.tid); 1361 dout("handle_reply %p tid %llu\n", msg, tid); 1362 1363 p = msg->front.iov_base; 1364 end = p + msg->front.iov_len; 1365 1366 ceph_decode_need(&p, end, 4, bad); 1367 object_len = ceph_decode_32(&p); 1368 ceph_decode_need(&p, end, object_len, bad); 1369 p += object_len; 1370 1371 err = ceph_decode_pgid(&p, end, &pg); 1372 if (err) 1373 goto bad; 1374 1375 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1376 flags = ceph_decode_64(&p); 1377 result = ceph_decode_32(&p); 1378 reassert_epoch = ceph_decode_32(&p); 1379 reassert_version = ceph_decode_64(&p); 1380 osdmap_epoch = ceph_decode_32(&p); 1381 1382 /* lookup */ 1383 mutex_lock(&osdc->request_mutex); 1384 req = __lookup_request(osdc, tid); 1385 if (req == NULL) { 1386 dout("handle_reply tid %llu dne\n", tid); 1387 goto bad_mutex; 1388 } 1389 ceph_osdc_get_request(req); 1390 1391 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1392 req, result); 1393 1394 ceph_decode_need(&p, end, 4, bad); 1395 numops = ceph_decode_32(&p); 1396 if (numops > CEPH_OSD_MAX_OP) 1397 goto bad_put; 1398 if (numops != req->r_num_ops) 1399 goto bad_put; 1400 payload_len = 0; 1401 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad); 1402 for (i = 0; i < numops; i++) { 1403 struct ceph_osd_op *op = p; 1404 int len; 1405 1406 len = le32_to_cpu(op->payload_len); 1407 req->r_reply_op_len[i] = len; 1408 dout(" op %d has %d bytes\n", i, len); 1409 payload_len += len; 1410 p += sizeof(*op); 1411 } 1412 bytes = le32_to_cpu(msg->hdr.data_len); 1413 if (payload_len != bytes) { 1414 pr_warning("sum of op payload lens %d != data_len %d", 1415 payload_len, bytes); 1416 goto bad_put; 1417 } 1418 1419 ceph_decode_need(&p, end, 4 + numops * 4, bad); 1420 retry_attempt = ceph_decode_32(&p); 1421 for (i = 0; i < numops; i++) 1422 req->r_reply_op_result[i] = ceph_decode_32(&p); 1423 1424 if (!req->r_got_reply) { 1425 1426 req->r_result = result; 1427 dout("handle_reply result %d bytes %d\n", req->r_result, 1428 bytes); 1429 if (req->r_result == 0) 1430 req->r_result = bytes; 1431 1432 /* in case this is a write and we need to replay, */ 1433 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1434 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1435 1436 req->r_got_reply = 1; 1437 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1438 dout("handle_reply tid %llu dup ack\n", tid); 1439 mutex_unlock(&osdc->request_mutex); 1440 goto done; 1441 } 1442 1443 dout("handle_reply tid %llu flags %d\n", tid, flags); 1444 1445 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1446 __register_linger_request(osdc, req); 1447 1448 /* either this is a read, or we got the safe response */ 1449 if (result < 0 || 1450 (flags & CEPH_OSD_FLAG_ONDISK) || 1451 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1452 __unregister_request(osdc, req); 1453 1454 already_completed = req->r_completed; 1455 req->r_completed = 1; 1456 mutex_unlock(&osdc->request_mutex); 1457 if (already_completed) 1458 goto done; 1459 1460 if (req->r_callback) 1461 req->r_callback(req, msg); 1462 else 1463 complete_all(&req->r_completion); 1464 1465 if (flags & CEPH_OSD_FLAG_ONDISK) 1466 complete_request(req); 1467 1468 done: 1469 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1470 ceph_osdc_put_request(req); 1471 return; 1472 1473 bad_put: 1474 ceph_osdc_put_request(req); 1475 bad_mutex: 1476 mutex_unlock(&osdc->request_mutex); 1477 bad: 1478 pr_err("corrupt osd_op_reply got %d %d\n", 1479 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1480 ceph_msg_dump(msg); 1481 } 1482 1483 static void reset_changed_osds(struct ceph_osd_client *osdc) 1484 { 1485 struct rb_node *p, *n; 1486 1487 for (p = rb_first(&osdc->osds); p; p = n) { 1488 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1489 1490 n = rb_next(p); 1491 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1492 memcmp(&osd->o_con.peer_addr, 1493 ceph_osd_addr(osdc->osdmap, 1494 osd->o_osd), 1495 sizeof(struct ceph_entity_addr)) != 0) 1496 __reset_osd(osdc, osd); 1497 } 1498 } 1499 1500 /* 1501 * Requeue requests whose mapping to an OSD has changed. If requests map to 1502 * no osd, request a new map. 1503 * 1504 * Caller should hold map_sem for read. 1505 */ 1506 static void kick_requests(struct ceph_osd_client *osdc, int force_resend) 1507 { 1508 struct ceph_osd_request *req, *nreq; 1509 struct rb_node *p; 1510 int needmap = 0; 1511 int err; 1512 1513 dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); 1514 mutex_lock(&osdc->request_mutex); 1515 for (p = rb_first(&osdc->requests); p; ) { 1516 req = rb_entry(p, struct ceph_osd_request, r_node); 1517 p = rb_next(p); 1518 1519 /* 1520 * For linger requests that have not yet been 1521 * registered, move them to the linger list; they'll 1522 * be sent to the osd in the loop below. Unregister 1523 * the request before re-registering it as a linger 1524 * request to ensure the __map_request() below 1525 * will decide it needs to be sent. 1526 */ 1527 if (req->r_linger && list_empty(&req->r_linger_item)) { 1528 dout("%p tid %llu restart on osd%d\n", 1529 req, req->r_tid, 1530 req->r_osd ? req->r_osd->o_osd : -1); 1531 __unregister_request(osdc, req); 1532 __register_linger_request(osdc, req); 1533 continue; 1534 } 1535 1536 err = __map_request(osdc, req, force_resend); 1537 if (err < 0) 1538 continue; /* error */ 1539 if (req->r_osd == NULL) { 1540 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1541 needmap++; /* request a newer map */ 1542 } else if (err > 0) { 1543 if (!req->r_linger) { 1544 dout("%p tid %llu requeued on osd%d\n", req, 1545 req->r_tid, 1546 req->r_osd ? req->r_osd->o_osd : -1); 1547 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1548 } 1549 } 1550 } 1551 1552 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1553 r_linger_item) { 1554 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1555 1556 err = __map_request(osdc, req, force_resend); 1557 dout("__map_request returned %d\n", err); 1558 if (err == 0) 1559 continue; /* no change and no osd was specified */ 1560 if (err < 0) 1561 continue; /* hrm! */ 1562 if (req->r_osd == NULL) { 1563 dout("tid %llu maps to no valid osd\n", req->r_tid); 1564 needmap++; /* request a newer map */ 1565 continue; 1566 } 1567 1568 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1569 req->r_osd ? req->r_osd->o_osd : -1); 1570 __register_request(osdc, req); 1571 __unregister_linger_request(osdc, req); 1572 } 1573 mutex_unlock(&osdc->request_mutex); 1574 1575 if (needmap) { 1576 dout("%d requests for down osds, need new map\n", needmap); 1577 ceph_monc_request_next_osdmap(&osdc->client->monc); 1578 } 1579 reset_changed_osds(osdc); 1580 } 1581 1582 1583 /* 1584 * Process updated osd map. 1585 * 1586 * The message contains any number of incremental and full maps, normally 1587 * indicating some sort of topology change in the cluster. Kick requests 1588 * off to different OSDs as needed. 1589 */ 1590 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1591 { 1592 void *p, *end, *next; 1593 u32 nr_maps, maplen; 1594 u32 epoch; 1595 struct ceph_osdmap *newmap = NULL, *oldmap; 1596 int err; 1597 struct ceph_fsid fsid; 1598 1599 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1600 p = msg->front.iov_base; 1601 end = p + msg->front.iov_len; 1602 1603 /* verify fsid */ 1604 ceph_decode_need(&p, end, sizeof(fsid), bad); 1605 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1606 if (ceph_check_fsid(osdc->client, &fsid) < 0) 1607 return; 1608 1609 down_write(&osdc->map_sem); 1610 1611 /* incremental maps */ 1612 ceph_decode_32_safe(&p, end, nr_maps, bad); 1613 dout(" %d inc maps\n", nr_maps); 1614 while (nr_maps > 0) { 1615 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1616 epoch = ceph_decode_32(&p); 1617 maplen = ceph_decode_32(&p); 1618 ceph_decode_need(&p, end, maplen, bad); 1619 next = p + maplen; 1620 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 1621 dout("applying incremental map %u len %d\n", 1622 epoch, maplen); 1623 newmap = osdmap_apply_incremental(&p, next, 1624 osdc->osdmap, 1625 &osdc->client->msgr); 1626 if (IS_ERR(newmap)) { 1627 err = PTR_ERR(newmap); 1628 goto bad; 1629 } 1630 BUG_ON(!newmap); 1631 if (newmap != osdc->osdmap) { 1632 ceph_osdmap_destroy(osdc->osdmap); 1633 osdc->osdmap = newmap; 1634 } 1635 kick_requests(osdc, 0); 1636 } else { 1637 dout("ignoring incremental map %u len %d\n", 1638 epoch, maplen); 1639 } 1640 p = next; 1641 nr_maps--; 1642 } 1643 if (newmap) 1644 goto done; 1645 1646 /* full maps */ 1647 ceph_decode_32_safe(&p, end, nr_maps, bad); 1648 dout(" %d full maps\n", nr_maps); 1649 while (nr_maps) { 1650 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 1651 epoch = ceph_decode_32(&p); 1652 maplen = ceph_decode_32(&p); 1653 ceph_decode_need(&p, end, maplen, bad); 1654 if (nr_maps > 1) { 1655 dout("skipping non-latest full map %u len %d\n", 1656 epoch, maplen); 1657 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 1658 dout("skipping full map %u len %d, " 1659 "older than our %u\n", epoch, maplen, 1660 osdc->osdmap->epoch); 1661 } else { 1662 int skipped_map = 0; 1663 1664 dout("taking full map %u len %d\n", epoch, maplen); 1665 newmap = osdmap_decode(&p, p+maplen); 1666 if (IS_ERR(newmap)) { 1667 err = PTR_ERR(newmap); 1668 goto bad; 1669 } 1670 BUG_ON(!newmap); 1671 oldmap = osdc->osdmap; 1672 osdc->osdmap = newmap; 1673 if (oldmap) { 1674 if (oldmap->epoch + 1 < newmap->epoch) 1675 skipped_map = 1; 1676 ceph_osdmap_destroy(oldmap); 1677 } 1678 kick_requests(osdc, skipped_map); 1679 } 1680 p += maplen; 1681 nr_maps--; 1682 } 1683 1684 done: 1685 downgrade_write(&osdc->map_sem); 1686 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 1687 1688 /* 1689 * subscribe to subsequent osdmap updates if full to ensure 1690 * we find out when we are no longer full and stop returning 1691 * ENOSPC. 1692 */ 1693 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) 1694 ceph_monc_request_next_osdmap(&osdc->client->monc); 1695 1696 mutex_lock(&osdc->request_mutex); 1697 __send_queued(osdc); 1698 mutex_unlock(&osdc->request_mutex); 1699 up_read(&osdc->map_sem); 1700 wake_up_all(&osdc->client->auth_wq); 1701 return; 1702 1703 bad: 1704 pr_err("osdc handle_map corrupt msg\n"); 1705 ceph_msg_dump(msg); 1706 up_write(&osdc->map_sem); 1707 return; 1708 } 1709 1710 /* 1711 * watch/notify callback event infrastructure 1712 * 1713 * These callbacks are used both for watch and notify operations. 1714 */ 1715 static void __release_event(struct kref *kref) 1716 { 1717 struct ceph_osd_event *event = 1718 container_of(kref, struct ceph_osd_event, kref); 1719 1720 dout("__release_event %p\n", event); 1721 kfree(event); 1722 } 1723 1724 static void get_event(struct ceph_osd_event *event) 1725 { 1726 kref_get(&event->kref); 1727 } 1728 1729 void ceph_osdc_put_event(struct ceph_osd_event *event) 1730 { 1731 kref_put(&event->kref, __release_event); 1732 } 1733 EXPORT_SYMBOL(ceph_osdc_put_event); 1734 1735 static void __insert_event(struct ceph_osd_client *osdc, 1736 struct ceph_osd_event *new) 1737 { 1738 struct rb_node **p = &osdc->event_tree.rb_node; 1739 struct rb_node *parent = NULL; 1740 struct ceph_osd_event *event = NULL; 1741 1742 while (*p) { 1743 parent = *p; 1744 event = rb_entry(parent, struct ceph_osd_event, node); 1745 if (new->cookie < event->cookie) 1746 p = &(*p)->rb_left; 1747 else if (new->cookie > event->cookie) 1748 p = &(*p)->rb_right; 1749 else 1750 BUG(); 1751 } 1752 1753 rb_link_node(&new->node, parent, p); 1754 rb_insert_color(&new->node, &osdc->event_tree); 1755 } 1756 1757 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 1758 u64 cookie) 1759 { 1760 struct rb_node **p = &osdc->event_tree.rb_node; 1761 struct rb_node *parent = NULL; 1762 struct ceph_osd_event *event = NULL; 1763 1764 while (*p) { 1765 parent = *p; 1766 event = rb_entry(parent, struct ceph_osd_event, node); 1767 if (cookie < event->cookie) 1768 p = &(*p)->rb_left; 1769 else if (cookie > event->cookie) 1770 p = &(*p)->rb_right; 1771 else 1772 return event; 1773 } 1774 return NULL; 1775 } 1776 1777 static void __remove_event(struct ceph_osd_event *event) 1778 { 1779 struct ceph_osd_client *osdc = event->osdc; 1780 1781 if (!RB_EMPTY_NODE(&event->node)) { 1782 dout("__remove_event removed %p\n", event); 1783 rb_erase(&event->node, &osdc->event_tree); 1784 ceph_osdc_put_event(event); 1785 } else { 1786 dout("__remove_event didn't remove %p\n", event); 1787 } 1788 } 1789 1790 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 1791 void (*event_cb)(u64, u64, u8, void *), 1792 void *data, struct ceph_osd_event **pevent) 1793 { 1794 struct ceph_osd_event *event; 1795 1796 event = kmalloc(sizeof(*event), GFP_NOIO); 1797 if (!event) 1798 return -ENOMEM; 1799 1800 dout("create_event %p\n", event); 1801 event->cb = event_cb; 1802 event->one_shot = 0; 1803 event->data = data; 1804 event->osdc = osdc; 1805 INIT_LIST_HEAD(&event->osd_node); 1806 RB_CLEAR_NODE(&event->node); 1807 kref_init(&event->kref); /* one ref for us */ 1808 kref_get(&event->kref); /* one ref for the caller */ 1809 1810 spin_lock(&osdc->event_lock); 1811 event->cookie = ++osdc->event_count; 1812 __insert_event(osdc, event); 1813 spin_unlock(&osdc->event_lock); 1814 1815 *pevent = event; 1816 return 0; 1817 } 1818 EXPORT_SYMBOL(ceph_osdc_create_event); 1819 1820 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 1821 { 1822 struct ceph_osd_client *osdc = event->osdc; 1823 1824 dout("cancel_event %p\n", event); 1825 spin_lock(&osdc->event_lock); 1826 __remove_event(event); 1827 spin_unlock(&osdc->event_lock); 1828 ceph_osdc_put_event(event); /* caller's */ 1829 } 1830 EXPORT_SYMBOL(ceph_osdc_cancel_event); 1831 1832 1833 static void do_event_work(struct work_struct *work) 1834 { 1835 struct ceph_osd_event_work *event_work = 1836 container_of(work, struct ceph_osd_event_work, work); 1837 struct ceph_osd_event *event = event_work->event; 1838 u64 ver = event_work->ver; 1839 u64 notify_id = event_work->notify_id; 1840 u8 opcode = event_work->opcode; 1841 1842 dout("do_event_work completing %p\n", event); 1843 event->cb(ver, notify_id, opcode, event->data); 1844 dout("do_event_work completed %p\n", event); 1845 ceph_osdc_put_event(event); 1846 kfree(event_work); 1847 } 1848 1849 1850 /* 1851 * Process osd watch notifications 1852 */ 1853 static void handle_watch_notify(struct ceph_osd_client *osdc, 1854 struct ceph_msg *msg) 1855 { 1856 void *p, *end; 1857 u8 proto_ver; 1858 u64 cookie, ver, notify_id; 1859 u8 opcode; 1860 struct ceph_osd_event *event; 1861 struct ceph_osd_event_work *event_work; 1862 1863 p = msg->front.iov_base; 1864 end = p + msg->front.iov_len; 1865 1866 ceph_decode_8_safe(&p, end, proto_ver, bad); 1867 ceph_decode_8_safe(&p, end, opcode, bad); 1868 ceph_decode_64_safe(&p, end, cookie, bad); 1869 ceph_decode_64_safe(&p, end, ver, bad); 1870 ceph_decode_64_safe(&p, end, notify_id, bad); 1871 1872 spin_lock(&osdc->event_lock); 1873 event = __find_event(osdc, cookie); 1874 if (event) { 1875 BUG_ON(event->one_shot); 1876 get_event(event); 1877 } 1878 spin_unlock(&osdc->event_lock); 1879 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 1880 cookie, ver, event); 1881 if (event) { 1882 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 1883 if (!event_work) { 1884 dout("ERROR: could not allocate event_work\n"); 1885 goto done_err; 1886 } 1887 INIT_WORK(&event_work->work, do_event_work); 1888 event_work->event = event; 1889 event_work->ver = ver; 1890 event_work->notify_id = notify_id; 1891 event_work->opcode = opcode; 1892 if (!queue_work(osdc->notify_wq, &event_work->work)) { 1893 dout("WARNING: failed to queue notify event work\n"); 1894 goto done_err; 1895 } 1896 } 1897 1898 return; 1899 1900 done_err: 1901 ceph_osdc_put_event(event); 1902 return; 1903 1904 bad: 1905 pr_err("osdc handle_watch_notify corrupt msg\n"); 1906 return; 1907 } 1908 1909 static void ceph_osdc_msg_data_set(struct ceph_msg *msg, 1910 struct ceph_osd_data *osd_data) 1911 { 1912 u64 length = ceph_osd_data_length(osd_data); 1913 1914 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 1915 BUG_ON(length > (u64) SIZE_MAX); 1916 if (length) 1917 ceph_msg_data_set_pages(msg, osd_data->pages, 1918 length, osd_data->alignment); 1919 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 1920 BUG_ON(!length); 1921 ceph_msg_data_set_pagelist(msg, osd_data->pagelist); 1922 #ifdef CONFIG_BLOCK 1923 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 1924 ceph_msg_data_set_bio(msg, osd_data->bio, length); 1925 #endif 1926 } else { 1927 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 1928 } 1929 } 1930 1931 /* 1932 * Register request, send initial attempt. 1933 */ 1934 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 1935 struct ceph_osd_request *req, 1936 bool nofail) 1937 { 1938 int rc = 0; 1939 1940 /* Set up response incoming data and request outgoing data fields */ 1941 1942 ceph_osdc_msg_data_set(req->r_reply, &req->r_data_in); 1943 ceph_osdc_msg_data_set(req->r_request, &req->r_data_out); 1944 1945 down_read(&osdc->map_sem); 1946 mutex_lock(&osdc->request_mutex); 1947 __register_request(osdc, req); 1948 WARN_ON(req->r_sent); 1949 rc = __map_request(osdc, req, 0); 1950 if (rc < 0) { 1951 if (nofail) { 1952 dout("osdc_start_request failed map, " 1953 " will retry %lld\n", req->r_tid); 1954 rc = 0; 1955 } 1956 goto out_unlock; 1957 } 1958 if (req->r_osd == NULL) { 1959 dout("send_request %p no up osds in pg\n", req); 1960 ceph_monc_request_next_osdmap(&osdc->client->monc); 1961 } else { 1962 __send_queued(osdc); 1963 } 1964 rc = 0; 1965 out_unlock: 1966 mutex_unlock(&osdc->request_mutex); 1967 up_read(&osdc->map_sem); 1968 return rc; 1969 } 1970 EXPORT_SYMBOL(ceph_osdc_start_request); 1971 1972 /* 1973 * wait for a request to complete 1974 */ 1975 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 1976 struct ceph_osd_request *req) 1977 { 1978 int rc; 1979 1980 rc = wait_for_completion_interruptible(&req->r_completion); 1981 if (rc < 0) { 1982 mutex_lock(&osdc->request_mutex); 1983 __cancel_request(req); 1984 __unregister_request(osdc, req); 1985 mutex_unlock(&osdc->request_mutex); 1986 complete_request(req); 1987 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 1988 return rc; 1989 } 1990 1991 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 1992 return req->r_result; 1993 } 1994 EXPORT_SYMBOL(ceph_osdc_wait_request); 1995 1996 /* 1997 * sync - wait for all in-flight requests to flush. avoid starvation. 1998 */ 1999 void ceph_osdc_sync(struct ceph_osd_client *osdc) 2000 { 2001 struct ceph_osd_request *req; 2002 u64 last_tid, next_tid = 0; 2003 2004 mutex_lock(&osdc->request_mutex); 2005 last_tid = osdc->last_tid; 2006 while (1) { 2007 req = __lookup_request_ge(osdc, next_tid); 2008 if (!req) 2009 break; 2010 if (req->r_tid > last_tid) 2011 break; 2012 2013 next_tid = req->r_tid + 1; 2014 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 2015 continue; 2016 2017 ceph_osdc_get_request(req); 2018 mutex_unlock(&osdc->request_mutex); 2019 dout("sync waiting on tid %llu (last is %llu)\n", 2020 req->r_tid, last_tid); 2021 wait_for_completion(&req->r_safe_completion); 2022 mutex_lock(&osdc->request_mutex); 2023 ceph_osdc_put_request(req); 2024 } 2025 mutex_unlock(&osdc->request_mutex); 2026 dout("sync done (thru tid %llu)\n", last_tid); 2027 } 2028 EXPORT_SYMBOL(ceph_osdc_sync); 2029 2030 /* 2031 * init, shutdown 2032 */ 2033 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 2034 { 2035 int err; 2036 2037 dout("init\n"); 2038 osdc->client = client; 2039 osdc->osdmap = NULL; 2040 init_rwsem(&osdc->map_sem); 2041 init_completion(&osdc->map_waiters); 2042 osdc->last_requested_map = 0; 2043 mutex_init(&osdc->request_mutex); 2044 osdc->last_tid = 0; 2045 osdc->osds = RB_ROOT; 2046 INIT_LIST_HEAD(&osdc->osd_lru); 2047 osdc->requests = RB_ROOT; 2048 INIT_LIST_HEAD(&osdc->req_lru); 2049 INIT_LIST_HEAD(&osdc->req_unsent); 2050 INIT_LIST_HEAD(&osdc->req_notarget); 2051 INIT_LIST_HEAD(&osdc->req_linger); 2052 osdc->num_requests = 0; 2053 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 2054 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 2055 spin_lock_init(&osdc->event_lock); 2056 osdc->event_tree = RB_ROOT; 2057 osdc->event_count = 0; 2058 2059 schedule_delayed_work(&osdc->osds_timeout_work, 2060 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 2061 2062 err = -ENOMEM; 2063 osdc->req_mempool = mempool_create_kmalloc_pool(10, 2064 sizeof(struct ceph_osd_request)); 2065 if (!osdc->req_mempool) 2066 goto out; 2067 2068 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2069 OSD_OP_FRONT_LEN, 10, true, 2070 "osd_op"); 2071 if (err < 0) 2072 goto out_mempool; 2073 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2074 OSD_OPREPLY_FRONT_LEN, 10, true, 2075 "osd_op_reply"); 2076 if (err < 0) 2077 goto out_msgpool; 2078 2079 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2080 if (IS_ERR(osdc->notify_wq)) { 2081 err = PTR_ERR(osdc->notify_wq); 2082 osdc->notify_wq = NULL; 2083 goto out_msgpool; 2084 } 2085 return 0; 2086 2087 out_msgpool: 2088 ceph_msgpool_destroy(&osdc->msgpool_op); 2089 out_mempool: 2090 mempool_destroy(osdc->req_mempool); 2091 out: 2092 return err; 2093 } 2094 2095 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2096 { 2097 flush_workqueue(osdc->notify_wq); 2098 destroy_workqueue(osdc->notify_wq); 2099 cancel_delayed_work_sync(&osdc->timeout_work); 2100 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2101 if (osdc->osdmap) { 2102 ceph_osdmap_destroy(osdc->osdmap); 2103 osdc->osdmap = NULL; 2104 } 2105 remove_all_osds(osdc); 2106 mempool_destroy(osdc->req_mempool); 2107 ceph_msgpool_destroy(&osdc->msgpool_op); 2108 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2109 } 2110 2111 /* 2112 * Read some contiguous pages. If we cross a stripe boundary, shorten 2113 * *plen. Return number of bytes read, or error. 2114 */ 2115 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2116 struct ceph_vino vino, struct ceph_file_layout *layout, 2117 u64 off, u64 *plen, 2118 u32 truncate_seq, u64 truncate_size, 2119 struct page **pages, int num_pages, int page_align) 2120 { 2121 struct ceph_osd_request *req; 2122 int rc = 0; 2123 2124 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2125 vino.snap, off, *plen); 2126 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, 2127 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2128 NULL, truncate_seq, truncate_size, 2129 false); 2130 if (IS_ERR(req)) 2131 return PTR_ERR(req); 2132 2133 /* it may be a short read due to an object boundary */ 2134 2135 ceph_osd_data_pages_init(&req->r_data_in, pages, *plen, page_align, 2136 false, false); 2137 2138 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2139 off, *plen, *plen, page_align); 2140 2141 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); 2142 2143 rc = ceph_osdc_start_request(osdc, req, false); 2144 if (!rc) 2145 rc = ceph_osdc_wait_request(osdc, req); 2146 2147 ceph_osdc_put_request(req); 2148 dout("readpages result %d\n", rc); 2149 return rc; 2150 } 2151 EXPORT_SYMBOL(ceph_osdc_readpages); 2152 2153 /* 2154 * do a synchronous write on N pages 2155 */ 2156 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2157 struct ceph_file_layout *layout, 2158 struct ceph_snap_context *snapc, 2159 u64 off, u64 len, 2160 u32 truncate_seq, u64 truncate_size, 2161 struct timespec *mtime, 2162 struct page **pages, int num_pages) 2163 { 2164 struct ceph_osd_request *req; 2165 int rc = 0; 2166 int page_align = off & ~PAGE_MASK; 2167 2168 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ 2169 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, 2170 CEPH_OSD_OP_WRITE, 2171 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2172 snapc, truncate_seq, truncate_size, 2173 true); 2174 if (IS_ERR(req)) 2175 return PTR_ERR(req); 2176 2177 /* it may be a short write due to an object boundary */ 2178 ceph_osd_data_pages_init(&req->r_data_out, pages, len, page_align, 2179 false, false); 2180 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2181 2182 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2183 2184 rc = ceph_osdc_start_request(osdc, req, true); 2185 if (!rc) 2186 rc = ceph_osdc_wait_request(osdc, req); 2187 2188 ceph_osdc_put_request(req); 2189 if (rc == 0) 2190 rc = len; 2191 dout("writepages result %d\n", rc); 2192 return rc; 2193 } 2194 EXPORT_SYMBOL(ceph_osdc_writepages); 2195 2196 /* 2197 * handle incoming message 2198 */ 2199 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2200 { 2201 struct ceph_osd *osd = con->private; 2202 struct ceph_osd_client *osdc; 2203 int type = le16_to_cpu(msg->hdr.type); 2204 2205 if (!osd) 2206 goto out; 2207 osdc = osd->o_osdc; 2208 2209 switch (type) { 2210 case CEPH_MSG_OSD_MAP: 2211 ceph_osdc_handle_map(osdc, msg); 2212 break; 2213 case CEPH_MSG_OSD_OPREPLY: 2214 handle_reply(osdc, msg, con); 2215 break; 2216 case CEPH_MSG_WATCH_NOTIFY: 2217 handle_watch_notify(osdc, msg); 2218 break; 2219 2220 default: 2221 pr_err("received unknown message type %d %s\n", type, 2222 ceph_msg_type_name(type)); 2223 } 2224 out: 2225 ceph_msg_put(msg); 2226 } 2227 2228 /* 2229 * lookup and return message for incoming reply. set up reply message 2230 * pages. 2231 */ 2232 static struct ceph_msg *get_reply(struct ceph_connection *con, 2233 struct ceph_msg_header *hdr, 2234 int *skip) 2235 { 2236 struct ceph_osd *osd = con->private; 2237 struct ceph_osd_client *osdc = osd->o_osdc; 2238 struct ceph_msg *m; 2239 struct ceph_osd_request *req; 2240 int front = le32_to_cpu(hdr->front_len); 2241 int data_len = le32_to_cpu(hdr->data_len); 2242 u64 tid; 2243 2244 tid = le64_to_cpu(hdr->tid); 2245 mutex_lock(&osdc->request_mutex); 2246 req = __lookup_request(osdc, tid); 2247 if (!req) { 2248 *skip = 1; 2249 m = NULL; 2250 dout("get_reply unknown tid %llu from osd%d\n", tid, 2251 osd->o_osd); 2252 goto out; 2253 } 2254 2255 if (req->r_reply->con) 2256 dout("%s revoking msg %p from old con %p\n", __func__, 2257 req->r_reply, req->r_reply->con); 2258 ceph_msg_revoke_incoming(req->r_reply); 2259 2260 if (front > req->r_reply->front.iov_len) { 2261 pr_warning("get_reply front %d > preallocated %d\n", 2262 front, (int)req->r_reply->front.iov_len); 2263 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false); 2264 if (!m) 2265 goto out; 2266 ceph_msg_put(req->r_reply); 2267 req->r_reply = m; 2268 } 2269 m = ceph_msg_get(req->r_reply); 2270 2271 if (data_len > 0) { 2272 struct ceph_osd_data *osd_data = &req->r_data_in; 2273 2274 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2275 if (osd_data->pages && 2276 unlikely(osd_data->length < data_len)) { 2277 2278 pr_warning("tid %lld reply has %d bytes " 2279 "we had only %llu bytes ready\n", 2280 tid, data_len, osd_data->length); 2281 *skip = 1; 2282 ceph_msg_put(m); 2283 m = NULL; 2284 goto out; 2285 } 2286 } 2287 } 2288 *skip = 0; 2289 dout("get_reply tid %lld %p\n", tid, m); 2290 2291 out: 2292 mutex_unlock(&osdc->request_mutex); 2293 return m; 2294 2295 } 2296 2297 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2298 struct ceph_msg_header *hdr, 2299 int *skip) 2300 { 2301 struct ceph_osd *osd = con->private; 2302 int type = le16_to_cpu(hdr->type); 2303 int front = le32_to_cpu(hdr->front_len); 2304 2305 *skip = 0; 2306 switch (type) { 2307 case CEPH_MSG_OSD_MAP: 2308 case CEPH_MSG_WATCH_NOTIFY: 2309 return ceph_msg_new(type, front, GFP_NOFS, false); 2310 case CEPH_MSG_OSD_OPREPLY: 2311 return get_reply(con, hdr, skip); 2312 default: 2313 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2314 osd->o_osd); 2315 *skip = 1; 2316 return NULL; 2317 } 2318 } 2319 2320 /* 2321 * Wrappers to refcount containing ceph_osd struct 2322 */ 2323 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2324 { 2325 struct ceph_osd *osd = con->private; 2326 if (get_osd(osd)) 2327 return con; 2328 return NULL; 2329 } 2330 2331 static void put_osd_con(struct ceph_connection *con) 2332 { 2333 struct ceph_osd *osd = con->private; 2334 put_osd(osd); 2335 } 2336 2337 /* 2338 * authentication 2339 */ 2340 /* 2341 * Note: returned pointer is the address of a structure that's 2342 * managed separately. Caller must *not* attempt to free it. 2343 */ 2344 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2345 int *proto, int force_new) 2346 { 2347 struct ceph_osd *o = con->private; 2348 struct ceph_osd_client *osdc = o->o_osdc; 2349 struct ceph_auth_client *ac = osdc->client->monc.auth; 2350 struct ceph_auth_handshake *auth = &o->o_auth; 2351 2352 if (force_new && auth->authorizer) { 2353 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2354 auth->authorizer = NULL; 2355 } 2356 if (!auth->authorizer) { 2357 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2358 auth); 2359 if (ret) 2360 return ERR_PTR(ret); 2361 } else { 2362 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2363 auth); 2364 if (ret) 2365 return ERR_PTR(ret); 2366 } 2367 *proto = ac->protocol; 2368 2369 return auth; 2370 } 2371 2372 2373 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2374 { 2375 struct ceph_osd *o = con->private; 2376 struct ceph_osd_client *osdc = o->o_osdc; 2377 struct ceph_auth_client *ac = osdc->client->monc.auth; 2378 2379 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2380 } 2381 2382 static int invalidate_authorizer(struct ceph_connection *con) 2383 { 2384 struct ceph_osd *o = con->private; 2385 struct ceph_osd_client *osdc = o->o_osdc; 2386 struct ceph_auth_client *ac = osdc->client->monc.auth; 2387 2388 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2389 return ceph_monc_validate_auth(&osdc->client->monc); 2390 } 2391 2392 static const struct ceph_connection_operations osd_con_ops = { 2393 .get = get_osd_con, 2394 .put = put_osd_con, 2395 .dispatch = dispatch, 2396 .get_authorizer = get_authorizer, 2397 .verify_authorizer_reply = verify_authorizer_reply, 2398 .invalidate_authorizer = invalidate_authorizer, 2399 .alloc_msg = alloc_msg, 2400 .fault = osd_reset, 2401 }; 2402