1 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/err.h> 6 #include <linux/highmem.h> 7 #include <linux/mm.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/uaccess.h> 11 #ifdef CONFIG_BLOCK 12 #include <linux/bio.h> 13 #endif 14 15 #include <linux/ceph/libceph.h> 16 #include <linux/ceph/osd_client.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/auth.h> 20 #include <linux/ceph/pagelist.h> 21 22 #define OSD_OP_FRONT_LEN 4096 23 #define OSD_OPREPLY_FRONT_LEN 512 24 25 static struct kmem_cache *ceph_osd_request_cache; 26 27 static const struct ceph_connection_operations osd_con_ops; 28 29 static void __send_queued(struct ceph_osd_client *osdc); 30 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 31 static void __register_request(struct ceph_osd_client *osdc, 32 struct ceph_osd_request *req); 33 static void __unregister_request(struct ceph_osd_client *osdc, 34 struct ceph_osd_request *req); 35 static void __unregister_linger_request(struct ceph_osd_client *osdc, 36 struct ceph_osd_request *req); 37 static void __enqueue_request(struct ceph_osd_request *req); 38 static void __send_request(struct ceph_osd_client *osdc, 39 struct ceph_osd_request *req); 40 41 /* 42 * Implement client access to distributed object storage cluster. 43 * 44 * All data objects are stored within a cluster/cloud of OSDs, or 45 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 46 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 47 * remote daemons serving up and coordinating consistent and safe 48 * access to storage. 49 * 50 * Cluster membership and the mapping of data objects onto storage devices 51 * are described by the osd map. 52 * 53 * We keep track of pending OSD requests (read, write), resubmit 54 * requests to different OSDs when the cluster topology/data layout 55 * change, or retry the affected requests when the communications 56 * channel with an OSD is reset. 57 */ 58 59 /* 60 * calculate the mapping of a file extent onto an object, and fill out the 61 * request accordingly. shorten extent as necessary if it crosses an 62 * object boundary. 63 * 64 * fill osd op in request message. 65 */ 66 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 67 u64 *objnum, u64 *objoff, u64 *objlen) 68 { 69 u64 orig_len = *plen; 70 int r; 71 72 /* object extent? */ 73 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 74 objoff, objlen); 75 if (r < 0) 76 return r; 77 if (*objlen < orig_len) { 78 *plen = *objlen; 79 dout(" skipping last %llu, final file extent %llu~%llu\n", 80 orig_len - *plen, off, *plen); 81 } 82 83 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 84 85 return 0; 86 } 87 88 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 89 { 90 memset(osd_data, 0, sizeof (*osd_data)); 91 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 92 } 93 94 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 95 struct page **pages, u64 length, u32 alignment, 96 bool pages_from_pool, bool own_pages) 97 { 98 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 99 osd_data->pages = pages; 100 osd_data->length = length; 101 osd_data->alignment = alignment; 102 osd_data->pages_from_pool = pages_from_pool; 103 osd_data->own_pages = own_pages; 104 } 105 106 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 107 struct ceph_pagelist *pagelist) 108 { 109 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 110 osd_data->pagelist = pagelist; 111 } 112 113 #ifdef CONFIG_BLOCK 114 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 115 struct bio *bio, size_t bio_length) 116 { 117 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 118 osd_data->bio = bio; 119 osd_data->bio_length = bio_length; 120 } 121 #endif /* CONFIG_BLOCK */ 122 123 #define osd_req_op_data(oreq, whch, typ, fld) \ 124 ({ \ 125 struct ceph_osd_request *__oreq = (oreq); \ 126 unsigned int __whch = (whch); \ 127 BUG_ON(__whch >= __oreq->r_num_ops); \ 128 &__oreq->r_ops[__whch].typ.fld; \ 129 }) 130 131 static struct ceph_osd_data * 132 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 133 { 134 BUG_ON(which >= osd_req->r_num_ops); 135 136 return &osd_req->r_ops[which].raw_data_in; 137 } 138 139 struct ceph_osd_data * 140 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 141 unsigned int which) 142 { 143 return osd_req_op_data(osd_req, which, extent, osd_data); 144 } 145 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 146 147 struct ceph_osd_data * 148 osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, 149 unsigned int which) 150 { 151 return osd_req_op_data(osd_req, which, cls, response_data); 152 } 153 EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */ 154 155 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 156 unsigned int which, struct page **pages, 157 u64 length, u32 alignment, 158 bool pages_from_pool, bool own_pages) 159 { 160 struct ceph_osd_data *osd_data; 161 162 osd_data = osd_req_op_raw_data_in(osd_req, which); 163 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 164 pages_from_pool, own_pages); 165 } 166 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 167 168 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 169 unsigned int which, struct page **pages, 170 u64 length, u32 alignment, 171 bool pages_from_pool, bool own_pages) 172 { 173 struct ceph_osd_data *osd_data; 174 175 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 176 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 177 pages_from_pool, own_pages); 178 } 179 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 180 181 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 182 unsigned int which, struct ceph_pagelist *pagelist) 183 { 184 struct ceph_osd_data *osd_data; 185 186 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 187 ceph_osd_data_pagelist_init(osd_data, pagelist); 188 } 189 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 190 191 #ifdef CONFIG_BLOCK 192 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 193 unsigned int which, struct bio *bio, size_t bio_length) 194 { 195 struct ceph_osd_data *osd_data; 196 197 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 198 ceph_osd_data_bio_init(osd_data, bio, bio_length); 199 } 200 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 201 #endif /* CONFIG_BLOCK */ 202 203 static void osd_req_op_cls_request_info_pagelist( 204 struct ceph_osd_request *osd_req, 205 unsigned int which, struct ceph_pagelist *pagelist) 206 { 207 struct ceph_osd_data *osd_data; 208 209 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 210 ceph_osd_data_pagelist_init(osd_data, pagelist); 211 } 212 213 void osd_req_op_cls_request_data_pagelist( 214 struct ceph_osd_request *osd_req, 215 unsigned int which, struct ceph_pagelist *pagelist) 216 { 217 struct ceph_osd_data *osd_data; 218 219 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 220 ceph_osd_data_pagelist_init(osd_data, pagelist); 221 } 222 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 223 224 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 225 unsigned int which, struct page **pages, u64 length, 226 u32 alignment, bool pages_from_pool, bool own_pages) 227 { 228 struct ceph_osd_data *osd_data; 229 230 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 231 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 232 pages_from_pool, own_pages); 233 } 234 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 235 236 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 237 unsigned int which, struct page **pages, u64 length, 238 u32 alignment, bool pages_from_pool, bool own_pages) 239 { 240 struct ceph_osd_data *osd_data; 241 242 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 243 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 244 pages_from_pool, own_pages); 245 } 246 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 247 248 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 249 { 250 switch (osd_data->type) { 251 case CEPH_OSD_DATA_TYPE_NONE: 252 return 0; 253 case CEPH_OSD_DATA_TYPE_PAGES: 254 return osd_data->length; 255 case CEPH_OSD_DATA_TYPE_PAGELIST: 256 return (u64)osd_data->pagelist->length; 257 #ifdef CONFIG_BLOCK 258 case CEPH_OSD_DATA_TYPE_BIO: 259 return (u64)osd_data->bio_length; 260 #endif /* CONFIG_BLOCK */ 261 default: 262 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 263 return 0; 264 } 265 } 266 267 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 268 { 269 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 270 int num_pages; 271 272 num_pages = calc_pages_for((u64)osd_data->alignment, 273 (u64)osd_data->length); 274 ceph_release_page_vector(osd_data->pages, num_pages); 275 } 276 ceph_osd_data_init(osd_data); 277 } 278 279 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 280 unsigned int which) 281 { 282 struct ceph_osd_req_op *op; 283 284 BUG_ON(which >= osd_req->r_num_ops); 285 op = &osd_req->r_ops[which]; 286 287 switch (op->op) { 288 case CEPH_OSD_OP_READ: 289 case CEPH_OSD_OP_WRITE: 290 case CEPH_OSD_OP_WRITEFULL: 291 ceph_osd_data_release(&op->extent.osd_data); 292 break; 293 case CEPH_OSD_OP_CALL: 294 ceph_osd_data_release(&op->cls.request_info); 295 ceph_osd_data_release(&op->cls.request_data); 296 ceph_osd_data_release(&op->cls.response_data); 297 break; 298 case CEPH_OSD_OP_SETXATTR: 299 case CEPH_OSD_OP_CMPXATTR: 300 ceph_osd_data_release(&op->xattr.osd_data); 301 break; 302 case CEPH_OSD_OP_STAT: 303 ceph_osd_data_release(&op->raw_data_in); 304 break; 305 default: 306 break; 307 } 308 } 309 310 /* 311 * requests 312 */ 313 static void ceph_osdc_release_request(struct kref *kref) 314 { 315 struct ceph_osd_request *req = container_of(kref, 316 struct ceph_osd_request, r_kref); 317 unsigned int which; 318 319 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 320 req->r_request, req->r_reply); 321 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 322 WARN_ON(!list_empty(&req->r_req_lru_item)); 323 WARN_ON(!list_empty(&req->r_osd_item)); 324 WARN_ON(!list_empty(&req->r_linger_item)); 325 WARN_ON(!list_empty(&req->r_linger_osd_item)); 326 WARN_ON(req->r_osd); 327 328 if (req->r_request) 329 ceph_msg_put(req->r_request); 330 if (req->r_reply) { 331 ceph_msg_revoke_incoming(req->r_reply); 332 ceph_msg_put(req->r_reply); 333 } 334 335 for (which = 0; which < req->r_num_ops; which++) 336 osd_req_op_data_release(req, which); 337 338 ceph_put_snap_context(req->r_snapc); 339 if (req->r_mempool) 340 mempool_free(req, req->r_osdc->req_mempool); 341 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 342 kmem_cache_free(ceph_osd_request_cache, req); 343 else 344 kfree(req); 345 } 346 347 void ceph_osdc_get_request(struct ceph_osd_request *req) 348 { 349 dout("%s %p (was %d)\n", __func__, req, 350 atomic_read(&req->r_kref.refcount)); 351 kref_get(&req->r_kref); 352 } 353 EXPORT_SYMBOL(ceph_osdc_get_request); 354 355 void ceph_osdc_put_request(struct ceph_osd_request *req) 356 { 357 if (req) { 358 dout("%s %p (was %d)\n", __func__, req, 359 atomic_read(&req->r_kref.refcount)); 360 kref_put(&req->r_kref, ceph_osdc_release_request); 361 } 362 } 363 EXPORT_SYMBOL(ceph_osdc_put_request); 364 365 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 366 struct ceph_snap_context *snapc, 367 unsigned int num_ops, 368 bool use_mempool, 369 gfp_t gfp_flags) 370 { 371 struct ceph_osd_request *req; 372 struct ceph_msg *msg; 373 size_t msg_size; 374 375 if (use_mempool) { 376 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 377 req = mempool_alloc(osdc->req_mempool, gfp_flags); 378 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 379 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 380 } else { 381 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 382 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]), 383 gfp_flags); 384 } 385 if (unlikely(!req)) 386 return NULL; 387 388 /* req only, each op is zeroed in _osd_req_op_init() */ 389 memset(req, 0, sizeof(*req)); 390 391 req->r_osdc = osdc; 392 req->r_mempool = use_mempool; 393 req->r_num_ops = num_ops; 394 req->r_snapid = CEPH_NOSNAP; 395 req->r_snapc = ceph_get_snap_context(snapc); 396 397 kref_init(&req->r_kref); 398 init_completion(&req->r_completion); 399 init_completion(&req->r_safe_completion); 400 RB_CLEAR_NODE(&req->r_node); 401 INIT_LIST_HEAD(&req->r_unsafe_item); 402 INIT_LIST_HEAD(&req->r_linger_item); 403 INIT_LIST_HEAD(&req->r_linger_osd_item); 404 INIT_LIST_HEAD(&req->r_req_lru_item); 405 INIT_LIST_HEAD(&req->r_osd_item); 406 407 req->r_base_oloc.pool = -1; 408 req->r_target_oloc.pool = -1; 409 410 msg_size = OSD_OPREPLY_FRONT_LEN; 411 if (num_ops > CEPH_OSD_SLAB_OPS) { 412 /* ceph_osd_op and rval */ 413 msg_size += (num_ops - CEPH_OSD_SLAB_OPS) * 414 (sizeof(struct ceph_osd_op) + 4); 415 } 416 417 /* create reply message */ 418 if (use_mempool) 419 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 420 else 421 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, 422 gfp_flags, true); 423 if (!msg) { 424 ceph_osdc_put_request(req); 425 return NULL; 426 } 427 req->r_reply = msg; 428 429 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ 430 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ 431 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 432 msg_size += 1 + 8 + 4 + 4; /* pgid */ 433 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ 434 msg_size += 2 + num_ops * sizeof(struct ceph_osd_op); 435 msg_size += 8; /* snapid */ 436 msg_size += 8; /* snap_seq */ 437 msg_size += 4 + 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 438 msg_size += 4; /* retry_attempt */ 439 440 /* create request message; allow space for oid */ 441 if (use_mempool) 442 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 443 else 444 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 445 if (!msg) { 446 ceph_osdc_put_request(req); 447 return NULL; 448 } 449 450 memset(msg->front.iov_base, 0, msg->front.iov_len); 451 452 req->r_request = msg; 453 454 return req; 455 } 456 EXPORT_SYMBOL(ceph_osdc_alloc_request); 457 458 static bool osd_req_opcode_valid(u16 opcode) 459 { 460 switch (opcode) { 461 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 462 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 463 #undef GENERATE_CASE 464 default: 465 return false; 466 } 467 } 468 469 /* 470 * This is an osd op init function for opcodes that have no data or 471 * other information associated with them. It also serves as a 472 * common init routine for all the other init functions, below. 473 */ 474 static struct ceph_osd_req_op * 475 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 476 u16 opcode, u32 flags) 477 { 478 struct ceph_osd_req_op *op; 479 480 BUG_ON(which >= osd_req->r_num_ops); 481 BUG_ON(!osd_req_opcode_valid(opcode)); 482 483 op = &osd_req->r_ops[which]; 484 memset(op, 0, sizeof (*op)); 485 op->op = opcode; 486 op->flags = flags; 487 488 return op; 489 } 490 491 void osd_req_op_init(struct ceph_osd_request *osd_req, 492 unsigned int which, u16 opcode, u32 flags) 493 { 494 (void)_osd_req_op_init(osd_req, which, opcode, flags); 495 } 496 EXPORT_SYMBOL(osd_req_op_init); 497 498 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 499 unsigned int which, u16 opcode, 500 u64 offset, u64 length, 501 u64 truncate_size, u32 truncate_seq) 502 { 503 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 504 opcode, 0); 505 size_t payload_len = 0; 506 507 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 508 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 509 opcode != CEPH_OSD_OP_TRUNCATE); 510 511 op->extent.offset = offset; 512 op->extent.length = length; 513 op->extent.truncate_size = truncate_size; 514 op->extent.truncate_seq = truncate_seq; 515 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 516 payload_len += length; 517 518 op->indata_len = payload_len; 519 } 520 EXPORT_SYMBOL(osd_req_op_extent_init); 521 522 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 523 unsigned int which, u64 length) 524 { 525 struct ceph_osd_req_op *op; 526 u64 previous; 527 528 BUG_ON(which >= osd_req->r_num_ops); 529 op = &osd_req->r_ops[which]; 530 previous = op->extent.length; 531 532 if (length == previous) 533 return; /* Nothing to do */ 534 BUG_ON(length > previous); 535 536 op->extent.length = length; 537 op->indata_len -= previous - length; 538 } 539 EXPORT_SYMBOL(osd_req_op_extent_update); 540 541 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 542 unsigned int which, u64 offset_inc) 543 { 544 struct ceph_osd_req_op *op, *prev_op; 545 546 BUG_ON(which + 1 >= osd_req->r_num_ops); 547 548 prev_op = &osd_req->r_ops[which]; 549 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 550 /* dup previous one */ 551 op->indata_len = prev_op->indata_len; 552 op->outdata_len = prev_op->outdata_len; 553 op->extent = prev_op->extent; 554 /* adjust offset */ 555 op->extent.offset += offset_inc; 556 op->extent.length -= offset_inc; 557 558 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 559 op->indata_len -= offset_inc; 560 } 561 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 562 563 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 564 u16 opcode, const char *class, const char *method) 565 { 566 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 567 opcode, 0); 568 struct ceph_pagelist *pagelist; 569 size_t payload_len = 0; 570 size_t size; 571 572 BUG_ON(opcode != CEPH_OSD_OP_CALL); 573 574 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 575 BUG_ON(!pagelist); 576 ceph_pagelist_init(pagelist); 577 578 op->cls.class_name = class; 579 size = strlen(class); 580 BUG_ON(size > (size_t) U8_MAX); 581 op->cls.class_len = size; 582 ceph_pagelist_append(pagelist, class, size); 583 payload_len += size; 584 585 op->cls.method_name = method; 586 size = strlen(method); 587 BUG_ON(size > (size_t) U8_MAX); 588 op->cls.method_len = size; 589 ceph_pagelist_append(pagelist, method, size); 590 payload_len += size; 591 592 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 593 594 op->cls.argc = 0; /* currently unused */ 595 596 op->indata_len = payload_len; 597 } 598 EXPORT_SYMBOL(osd_req_op_cls_init); 599 600 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 601 u16 opcode, const char *name, const void *value, 602 size_t size, u8 cmp_op, u8 cmp_mode) 603 { 604 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 605 opcode, 0); 606 struct ceph_pagelist *pagelist; 607 size_t payload_len; 608 609 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 610 611 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 612 if (!pagelist) 613 return -ENOMEM; 614 615 ceph_pagelist_init(pagelist); 616 617 payload_len = strlen(name); 618 op->xattr.name_len = payload_len; 619 ceph_pagelist_append(pagelist, name, payload_len); 620 621 op->xattr.value_len = size; 622 ceph_pagelist_append(pagelist, value, size); 623 payload_len += size; 624 625 op->xattr.cmp_op = cmp_op; 626 op->xattr.cmp_mode = cmp_mode; 627 628 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 629 op->indata_len = payload_len; 630 return 0; 631 } 632 EXPORT_SYMBOL(osd_req_op_xattr_init); 633 634 void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 635 unsigned int which, u16 opcode, 636 u64 cookie, u64 version, int flag) 637 { 638 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 639 opcode, 0); 640 641 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 642 643 op->watch.cookie = cookie; 644 op->watch.ver = version; 645 if (opcode == CEPH_OSD_OP_WATCH && flag) 646 op->watch.flag = (u8)1; 647 } 648 EXPORT_SYMBOL(osd_req_op_watch_init); 649 650 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 651 unsigned int which, 652 u64 expected_object_size, 653 u64 expected_write_size) 654 { 655 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 656 CEPH_OSD_OP_SETALLOCHINT, 657 0); 658 659 op->alloc_hint.expected_object_size = expected_object_size; 660 op->alloc_hint.expected_write_size = expected_write_size; 661 662 /* 663 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 664 * not worth a feature bit. Set FAILOK per-op flag to make 665 * sure older osds don't trip over an unsupported opcode. 666 */ 667 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 668 } 669 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 670 671 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 672 struct ceph_osd_data *osd_data) 673 { 674 u64 length = ceph_osd_data_length(osd_data); 675 676 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 677 BUG_ON(length > (u64) SIZE_MAX); 678 if (length) 679 ceph_msg_data_add_pages(msg, osd_data->pages, 680 length, osd_data->alignment); 681 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 682 BUG_ON(!length); 683 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 684 #ifdef CONFIG_BLOCK 685 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 686 ceph_msg_data_add_bio(msg, osd_data->bio, length); 687 #endif 688 } else { 689 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 690 } 691 } 692 693 static u64 osd_req_encode_op(struct ceph_osd_request *req, 694 struct ceph_osd_op *dst, unsigned int which) 695 { 696 struct ceph_osd_req_op *src; 697 struct ceph_osd_data *osd_data; 698 u64 request_data_len = 0; 699 u64 data_length; 700 701 BUG_ON(which >= req->r_num_ops); 702 src = &req->r_ops[which]; 703 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 704 pr_err("unrecognized osd opcode %d\n", src->op); 705 706 return 0; 707 } 708 709 switch (src->op) { 710 case CEPH_OSD_OP_STAT: 711 osd_data = &src->raw_data_in; 712 ceph_osdc_msg_data_add(req->r_reply, osd_data); 713 break; 714 case CEPH_OSD_OP_READ: 715 case CEPH_OSD_OP_WRITE: 716 case CEPH_OSD_OP_WRITEFULL: 717 case CEPH_OSD_OP_ZERO: 718 case CEPH_OSD_OP_TRUNCATE: 719 if (src->op == CEPH_OSD_OP_WRITE || 720 src->op == CEPH_OSD_OP_WRITEFULL) 721 request_data_len = src->extent.length; 722 dst->extent.offset = cpu_to_le64(src->extent.offset); 723 dst->extent.length = cpu_to_le64(src->extent.length); 724 dst->extent.truncate_size = 725 cpu_to_le64(src->extent.truncate_size); 726 dst->extent.truncate_seq = 727 cpu_to_le32(src->extent.truncate_seq); 728 osd_data = &src->extent.osd_data; 729 if (src->op == CEPH_OSD_OP_WRITE || 730 src->op == CEPH_OSD_OP_WRITEFULL) 731 ceph_osdc_msg_data_add(req->r_request, osd_data); 732 else 733 ceph_osdc_msg_data_add(req->r_reply, osd_data); 734 break; 735 case CEPH_OSD_OP_CALL: 736 dst->cls.class_len = src->cls.class_len; 737 dst->cls.method_len = src->cls.method_len; 738 osd_data = &src->cls.request_info; 739 ceph_osdc_msg_data_add(req->r_request, osd_data); 740 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST); 741 request_data_len = osd_data->pagelist->length; 742 743 osd_data = &src->cls.request_data; 744 data_length = ceph_osd_data_length(osd_data); 745 if (data_length) { 746 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); 747 dst->cls.indata_len = cpu_to_le32(data_length); 748 ceph_osdc_msg_data_add(req->r_request, osd_data); 749 src->indata_len += data_length; 750 request_data_len += data_length; 751 } 752 osd_data = &src->cls.response_data; 753 ceph_osdc_msg_data_add(req->r_reply, osd_data); 754 break; 755 case CEPH_OSD_OP_STARTSYNC: 756 break; 757 case CEPH_OSD_OP_NOTIFY_ACK: 758 case CEPH_OSD_OP_WATCH: 759 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 760 dst->watch.ver = cpu_to_le64(src->watch.ver); 761 dst->watch.flag = src->watch.flag; 762 break; 763 case CEPH_OSD_OP_SETALLOCHINT: 764 dst->alloc_hint.expected_object_size = 765 cpu_to_le64(src->alloc_hint.expected_object_size); 766 dst->alloc_hint.expected_write_size = 767 cpu_to_le64(src->alloc_hint.expected_write_size); 768 break; 769 case CEPH_OSD_OP_SETXATTR: 770 case CEPH_OSD_OP_CMPXATTR: 771 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 772 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 773 dst->xattr.cmp_op = src->xattr.cmp_op; 774 dst->xattr.cmp_mode = src->xattr.cmp_mode; 775 osd_data = &src->xattr.osd_data; 776 ceph_osdc_msg_data_add(req->r_request, osd_data); 777 request_data_len = osd_data->pagelist->length; 778 break; 779 case CEPH_OSD_OP_CREATE: 780 case CEPH_OSD_OP_DELETE: 781 break; 782 default: 783 pr_err("unsupported osd opcode %s\n", 784 ceph_osd_op_name(src->op)); 785 WARN_ON(1); 786 787 return 0; 788 } 789 790 dst->op = cpu_to_le16(src->op); 791 dst->flags = cpu_to_le32(src->flags); 792 dst->payload_len = cpu_to_le32(src->indata_len); 793 794 return request_data_len; 795 } 796 797 /* 798 * build new request AND message, calculate layout, and adjust file 799 * extent as needed. 800 * 801 * if the file was recently truncated, we include information about its 802 * old and new size so that the object can be updated appropriately. (we 803 * avoid synchronously deleting truncated objects because it's slow.) 804 * 805 * if @do_sync, include a 'startsync' command so that the osd will flush 806 * data quickly. 807 */ 808 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 809 struct ceph_file_layout *layout, 810 struct ceph_vino vino, 811 u64 off, u64 *plen, 812 unsigned int which, int num_ops, 813 int opcode, int flags, 814 struct ceph_snap_context *snapc, 815 u32 truncate_seq, 816 u64 truncate_size, 817 bool use_mempool) 818 { 819 struct ceph_osd_request *req; 820 u64 objnum = 0; 821 u64 objoff = 0; 822 u64 objlen = 0; 823 int r; 824 825 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 826 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 827 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 828 829 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 830 GFP_NOFS); 831 if (!req) 832 return ERR_PTR(-ENOMEM); 833 834 req->r_flags = flags; 835 836 /* calculate max write size */ 837 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 838 if (r < 0) { 839 ceph_osdc_put_request(req); 840 return ERR_PTR(r); 841 } 842 843 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 844 osd_req_op_init(req, which, opcode, 0); 845 } else { 846 u32 object_size = le32_to_cpu(layout->fl_object_size); 847 u32 object_base = off - objoff; 848 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 849 if (truncate_size <= object_base) { 850 truncate_size = 0; 851 } else { 852 truncate_size -= object_base; 853 if (truncate_size > object_size) 854 truncate_size = object_size; 855 } 856 } 857 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 858 truncate_size, truncate_seq); 859 } 860 861 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); 862 863 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), 864 "%llx.%08llx", vino.ino, objnum); 865 req->r_base_oid.name_len = strlen(req->r_base_oid.name); 866 867 return req; 868 } 869 EXPORT_SYMBOL(ceph_osdc_new_request); 870 871 /* 872 * We keep osd requests in an rbtree, sorted by ->r_tid. 873 */ 874 static void __insert_request(struct ceph_osd_client *osdc, 875 struct ceph_osd_request *new) 876 { 877 struct rb_node **p = &osdc->requests.rb_node; 878 struct rb_node *parent = NULL; 879 struct ceph_osd_request *req = NULL; 880 881 while (*p) { 882 parent = *p; 883 req = rb_entry(parent, struct ceph_osd_request, r_node); 884 if (new->r_tid < req->r_tid) 885 p = &(*p)->rb_left; 886 else if (new->r_tid > req->r_tid) 887 p = &(*p)->rb_right; 888 else 889 BUG(); 890 } 891 892 rb_link_node(&new->r_node, parent, p); 893 rb_insert_color(&new->r_node, &osdc->requests); 894 } 895 896 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 897 u64 tid) 898 { 899 struct ceph_osd_request *req; 900 struct rb_node *n = osdc->requests.rb_node; 901 902 while (n) { 903 req = rb_entry(n, struct ceph_osd_request, r_node); 904 if (tid < req->r_tid) 905 n = n->rb_left; 906 else if (tid > req->r_tid) 907 n = n->rb_right; 908 else 909 return req; 910 } 911 return NULL; 912 } 913 914 static struct ceph_osd_request * 915 __lookup_request_ge(struct ceph_osd_client *osdc, 916 u64 tid) 917 { 918 struct ceph_osd_request *req; 919 struct rb_node *n = osdc->requests.rb_node; 920 921 while (n) { 922 req = rb_entry(n, struct ceph_osd_request, r_node); 923 if (tid < req->r_tid) { 924 if (!n->rb_left) 925 return req; 926 n = n->rb_left; 927 } else if (tid > req->r_tid) { 928 n = n->rb_right; 929 } else { 930 return req; 931 } 932 } 933 return NULL; 934 } 935 936 static void __kick_linger_request(struct ceph_osd_request *req) 937 { 938 struct ceph_osd_client *osdc = req->r_osdc; 939 struct ceph_osd *osd = req->r_osd; 940 941 /* 942 * Linger requests need to be resent with a new tid to avoid 943 * the dup op detection logic on the OSDs. Achieve this with 944 * a re-register dance instead of open-coding. 945 */ 946 ceph_osdc_get_request(req); 947 if (!list_empty(&req->r_linger_item)) 948 __unregister_linger_request(osdc, req); 949 else 950 __unregister_request(osdc, req); 951 __register_request(osdc, req); 952 ceph_osdc_put_request(req); 953 954 /* 955 * Unless request has been registered as both normal and 956 * lingering, __unregister{,_linger}_request clears r_osd. 957 * However, here we need to preserve r_osd to make sure we 958 * requeue on the same OSD. 959 */ 960 WARN_ON(req->r_osd || !osd); 961 req->r_osd = osd; 962 963 dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid); 964 __enqueue_request(req); 965 } 966 967 /* 968 * Resubmit requests pending on the given osd. 969 */ 970 static void __kick_osd_requests(struct ceph_osd_client *osdc, 971 struct ceph_osd *osd) 972 { 973 struct ceph_osd_request *req, *nreq; 974 LIST_HEAD(resend); 975 LIST_HEAD(resend_linger); 976 int err; 977 978 dout("%s osd%d\n", __func__, osd->o_osd); 979 err = __reset_osd(osdc, osd); 980 if (err) 981 return; 982 983 /* 984 * Build up a list of requests to resend by traversing the 985 * osd's list of requests. Requests for a given object are 986 * sent in tid order, and that is also the order they're 987 * kept on this list. Therefore all requests that are in 988 * flight will be found first, followed by all requests that 989 * have not yet been sent. And to resend requests while 990 * preserving this order we will want to put any sent 991 * requests back on the front of the osd client's unsent 992 * list. 993 * 994 * So we build a separate ordered list of already-sent 995 * requests for the affected osd and splice it onto the 996 * front of the osd client's unsent list. Once we've seen a 997 * request that has not yet been sent we're done. Those 998 * requests are already sitting right where they belong. 999 */ 1000 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 1001 if (!req->r_sent) 1002 break; 1003 1004 if (!req->r_linger) { 1005 dout("%s requeueing %p tid %llu\n", __func__, req, 1006 req->r_tid); 1007 list_move_tail(&req->r_req_lru_item, &resend); 1008 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1009 } else { 1010 list_move_tail(&req->r_req_lru_item, &resend_linger); 1011 } 1012 } 1013 list_splice(&resend, &osdc->req_unsent); 1014 1015 /* 1016 * Both registered and not yet registered linger requests are 1017 * enqueued with a new tid on the same OSD. We add/move them 1018 * to req_unsent/o_requests at the end to keep things in tid 1019 * order. 1020 */ 1021 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 1022 r_linger_osd_item) { 1023 WARN_ON(!list_empty(&req->r_req_lru_item)); 1024 __kick_linger_request(req); 1025 } 1026 1027 list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item) 1028 __kick_linger_request(req); 1029 } 1030 1031 /* 1032 * If the osd connection drops, we need to resubmit all requests. 1033 */ 1034 static void osd_reset(struct ceph_connection *con) 1035 { 1036 struct ceph_osd *osd = con->private; 1037 struct ceph_osd_client *osdc; 1038 1039 if (!osd) 1040 return; 1041 dout("osd_reset osd%d\n", osd->o_osd); 1042 osdc = osd->o_osdc; 1043 down_read(&osdc->map_sem); 1044 mutex_lock(&osdc->request_mutex); 1045 __kick_osd_requests(osdc, osd); 1046 __send_queued(osdc); 1047 mutex_unlock(&osdc->request_mutex); 1048 up_read(&osdc->map_sem); 1049 } 1050 1051 /* 1052 * Track open sessions with osds. 1053 */ 1054 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1055 { 1056 struct ceph_osd *osd; 1057 1058 osd = kzalloc(sizeof(*osd), GFP_NOFS); 1059 if (!osd) 1060 return NULL; 1061 1062 atomic_set(&osd->o_ref, 1); 1063 osd->o_osdc = osdc; 1064 osd->o_osd = onum; 1065 RB_CLEAR_NODE(&osd->o_node); 1066 INIT_LIST_HEAD(&osd->o_requests); 1067 INIT_LIST_HEAD(&osd->o_linger_requests); 1068 INIT_LIST_HEAD(&osd->o_osd_lru); 1069 osd->o_incarnation = 1; 1070 1071 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1072 1073 INIT_LIST_HEAD(&osd->o_keepalive_item); 1074 return osd; 1075 } 1076 1077 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1078 { 1079 if (atomic_inc_not_zero(&osd->o_ref)) { 1080 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 1081 atomic_read(&osd->o_ref)); 1082 return osd; 1083 } else { 1084 dout("get_osd %p FAIL\n", osd); 1085 return NULL; 1086 } 1087 } 1088 1089 static void put_osd(struct ceph_osd *osd) 1090 { 1091 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 1092 atomic_read(&osd->o_ref) - 1); 1093 if (atomic_dec_and_test(&osd->o_ref)) { 1094 if (osd->o_auth.authorizer) 1095 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1096 kfree(osd); 1097 } 1098 } 1099 1100 /* 1101 * remove an osd from our map 1102 */ 1103 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1104 { 1105 dout("%s %p osd%d\n", __func__, osd, osd->o_osd); 1106 WARN_ON(!list_empty(&osd->o_requests)); 1107 WARN_ON(!list_empty(&osd->o_linger_requests)); 1108 1109 list_del_init(&osd->o_osd_lru); 1110 rb_erase(&osd->o_node, &osdc->osds); 1111 RB_CLEAR_NODE(&osd->o_node); 1112 } 1113 1114 static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1115 { 1116 dout("%s %p osd%d\n", __func__, osd, osd->o_osd); 1117 1118 if (!RB_EMPTY_NODE(&osd->o_node)) { 1119 ceph_con_close(&osd->o_con); 1120 __remove_osd(osdc, osd); 1121 put_osd(osd); 1122 } 1123 } 1124 1125 static void remove_all_osds(struct ceph_osd_client *osdc) 1126 { 1127 dout("%s %p\n", __func__, osdc); 1128 mutex_lock(&osdc->request_mutex); 1129 while (!RB_EMPTY_ROOT(&osdc->osds)) { 1130 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 1131 struct ceph_osd, o_node); 1132 remove_osd(osdc, osd); 1133 } 1134 mutex_unlock(&osdc->request_mutex); 1135 } 1136 1137 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 1138 struct ceph_osd *osd) 1139 { 1140 dout("%s %p\n", __func__, osd); 1141 BUG_ON(!list_empty(&osd->o_osd_lru)); 1142 1143 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1144 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1145 } 1146 1147 static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, 1148 struct ceph_osd *osd) 1149 { 1150 dout("%s %p\n", __func__, osd); 1151 1152 if (list_empty(&osd->o_requests) && 1153 list_empty(&osd->o_linger_requests)) 1154 __move_osd_to_lru(osdc, osd); 1155 } 1156 1157 static void __remove_osd_from_lru(struct ceph_osd *osd) 1158 { 1159 dout("__remove_osd_from_lru %p\n", osd); 1160 if (!list_empty(&osd->o_osd_lru)) 1161 list_del_init(&osd->o_osd_lru); 1162 } 1163 1164 static void remove_old_osds(struct ceph_osd_client *osdc) 1165 { 1166 struct ceph_osd *osd, *nosd; 1167 1168 dout("__remove_old_osds %p\n", osdc); 1169 mutex_lock(&osdc->request_mutex); 1170 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 1171 if (time_before(jiffies, osd->lru_ttl)) 1172 break; 1173 remove_osd(osdc, osd); 1174 } 1175 mutex_unlock(&osdc->request_mutex); 1176 } 1177 1178 /* 1179 * reset osd connect 1180 */ 1181 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1182 { 1183 struct ceph_entity_addr *peer_addr; 1184 1185 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 1186 if (list_empty(&osd->o_requests) && 1187 list_empty(&osd->o_linger_requests)) { 1188 remove_osd(osdc, osd); 1189 return -ENODEV; 1190 } 1191 1192 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 1193 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1194 !ceph_con_opened(&osd->o_con)) { 1195 struct ceph_osd_request *req; 1196 1197 dout("osd addr hasn't changed and connection never opened, " 1198 "letting msgr retry\n"); 1199 /* touch each r_stamp for handle_timeout()'s benfit */ 1200 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1201 req->r_stamp = jiffies; 1202 1203 return -EAGAIN; 1204 } 1205 1206 ceph_con_close(&osd->o_con); 1207 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1208 osd->o_incarnation++; 1209 1210 return 0; 1211 } 1212 1213 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 1214 { 1215 struct rb_node **p = &osdc->osds.rb_node; 1216 struct rb_node *parent = NULL; 1217 struct ceph_osd *osd = NULL; 1218 1219 dout("__insert_osd %p osd%d\n", new, new->o_osd); 1220 while (*p) { 1221 parent = *p; 1222 osd = rb_entry(parent, struct ceph_osd, o_node); 1223 if (new->o_osd < osd->o_osd) 1224 p = &(*p)->rb_left; 1225 else if (new->o_osd > osd->o_osd) 1226 p = &(*p)->rb_right; 1227 else 1228 BUG(); 1229 } 1230 1231 rb_link_node(&new->o_node, parent, p); 1232 rb_insert_color(&new->o_node, &osdc->osds); 1233 } 1234 1235 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 1236 { 1237 struct ceph_osd *osd; 1238 struct rb_node *n = osdc->osds.rb_node; 1239 1240 while (n) { 1241 osd = rb_entry(n, struct ceph_osd, o_node); 1242 if (o < osd->o_osd) 1243 n = n->rb_left; 1244 else if (o > osd->o_osd) 1245 n = n->rb_right; 1246 else 1247 return osd; 1248 } 1249 return NULL; 1250 } 1251 1252 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 1253 { 1254 schedule_delayed_work(&osdc->timeout_work, 1255 osdc->client->options->osd_keepalive_timeout); 1256 } 1257 1258 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 1259 { 1260 cancel_delayed_work(&osdc->timeout_work); 1261 } 1262 1263 /* 1264 * Register request, assign tid. If this is the first request, set up 1265 * the timeout event. 1266 */ 1267 static void __register_request(struct ceph_osd_client *osdc, 1268 struct ceph_osd_request *req) 1269 { 1270 req->r_tid = ++osdc->last_tid; 1271 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 1272 dout("__register_request %p tid %lld\n", req, req->r_tid); 1273 __insert_request(osdc, req); 1274 ceph_osdc_get_request(req); 1275 osdc->num_requests++; 1276 if (osdc->num_requests == 1) { 1277 dout(" first request, scheduling timeout\n"); 1278 __schedule_osd_timeout(osdc); 1279 } 1280 } 1281 1282 /* 1283 * called under osdc->request_mutex 1284 */ 1285 static void __unregister_request(struct ceph_osd_client *osdc, 1286 struct ceph_osd_request *req) 1287 { 1288 if (RB_EMPTY_NODE(&req->r_node)) { 1289 dout("__unregister_request %p tid %lld not registered\n", 1290 req, req->r_tid); 1291 return; 1292 } 1293 1294 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1295 rb_erase(&req->r_node, &osdc->requests); 1296 RB_CLEAR_NODE(&req->r_node); 1297 osdc->num_requests--; 1298 1299 if (req->r_osd) { 1300 /* make sure the original request isn't in flight. */ 1301 ceph_msg_revoke(req->r_request); 1302 1303 list_del_init(&req->r_osd_item); 1304 maybe_move_osd_to_lru(osdc, req->r_osd); 1305 if (list_empty(&req->r_linger_osd_item)) 1306 req->r_osd = NULL; 1307 } 1308 1309 list_del_init(&req->r_req_lru_item); 1310 ceph_osdc_put_request(req); 1311 1312 if (osdc->num_requests == 0) { 1313 dout(" no requests, canceling timeout\n"); 1314 __cancel_osd_timeout(osdc); 1315 } 1316 } 1317 1318 /* 1319 * Cancel a previously queued request message 1320 */ 1321 static void __cancel_request(struct ceph_osd_request *req) 1322 { 1323 if (req->r_sent && req->r_osd) { 1324 ceph_msg_revoke(req->r_request); 1325 req->r_sent = 0; 1326 } 1327 } 1328 1329 static void __register_linger_request(struct ceph_osd_client *osdc, 1330 struct ceph_osd_request *req) 1331 { 1332 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 1333 WARN_ON(!req->r_linger); 1334 1335 ceph_osdc_get_request(req); 1336 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1337 if (req->r_osd) 1338 list_add_tail(&req->r_linger_osd_item, 1339 &req->r_osd->o_linger_requests); 1340 } 1341 1342 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1343 struct ceph_osd_request *req) 1344 { 1345 WARN_ON(!req->r_linger); 1346 1347 if (list_empty(&req->r_linger_item)) { 1348 dout("%s %p tid %llu not registered\n", __func__, req, 1349 req->r_tid); 1350 return; 1351 } 1352 1353 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 1354 list_del_init(&req->r_linger_item); 1355 1356 if (req->r_osd) { 1357 list_del_init(&req->r_linger_osd_item); 1358 maybe_move_osd_to_lru(osdc, req->r_osd); 1359 if (list_empty(&req->r_osd_item)) 1360 req->r_osd = NULL; 1361 } 1362 ceph_osdc_put_request(req); 1363 } 1364 1365 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1366 struct ceph_osd_request *req) 1367 { 1368 if (!req->r_linger) { 1369 dout("set_request_linger %p\n", req); 1370 req->r_linger = 1; 1371 } 1372 } 1373 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1374 1375 /* 1376 * Returns whether a request should be blocked from being sent 1377 * based on the current osdmap and osd_client settings. 1378 * 1379 * Caller should hold map_sem for read. 1380 */ 1381 static bool __req_should_be_paused(struct ceph_osd_client *osdc, 1382 struct ceph_osd_request *req) 1383 { 1384 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); 1385 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || 1386 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 1387 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || 1388 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); 1389 } 1390 1391 /* 1392 * Calculate mapping of a request to a PG. Takes tiering into account. 1393 */ 1394 static int __calc_request_pg(struct ceph_osdmap *osdmap, 1395 struct ceph_osd_request *req, 1396 struct ceph_pg *pg_out) 1397 { 1398 bool need_check_tiering; 1399 1400 need_check_tiering = false; 1401 if (req->r_target_oloc.pool == -1) { 1402 req->r_target_oloc = req->r_base_oloc; /* struct */ 1403 need_check_tiering = true; 1404 } 1405 if (req->r_target_oid.name_len == 0) { 1406 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); 1407 need_check_tiering = true; 1408 } 1409 1410 if (need_check_tiering && 1411 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1412 struct ceph_pg_pool_info *pi; 1413 1414 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); 1415 if (pi) { 1416 if ((req->r_flags & CEPH_OSD_FLAG_READ) && 1417 pi->read_tier >= 0) 1418 req->r_target_oloc.pool = pi->read_tier; 1419 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 1420 pi->write_tier >= 0) 1421 req->r_target_oloc.pool = pi->write_tier; 1422 } 1423 /* !pi is caught in ceph_oloc_oid_to_pg() */ 1424 } 1425 1426 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, 1427 &req->r_target_oid, pg_out); 1428 } 1429 1430 static void __enqueue_request(struct ceph_osd_request *req) 1431 { 1432 struct ceph_osd_client *osdc = req->r_osdc; 1433 1434 dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid, 1435 req->r_osd ? req->r_osd->o_osd : -1); 1436 1437 if (req->r_osd) { 1438 __remove_osd_from_lru(req->r_osd); 1439 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1440 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1441 } else { 1442 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1443 } 1444 } 1445 1446 /* 1447 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1448 * (as needed), and set the request r_osd appropriately. If there is 1449 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1450 * (unsent, homeless) or leave on in-flight lru. 1451 * 1452 * Return 0 if unchanged, 1 if changed, or negative on error. 1453 * 1454 * Caller should hold map_sem for read and request_mutex. 1455 */ 1456 static int __map_request(struct ceph_osd_client *osdc, 1457 struct ceph_osd_request *req, int force_resend) 1458 { 1459 struct ceph_pg pgid; 1460 int acting[CEPH_PG_MAX_SIZE]; 1461 int num, o; 1462 int err; 1463 bool was_paused; 1464 1465 dout("map_request %p tid %lld\n", req, req->r_tid); 1466 1467 err = __calc_request_pg(osdc->osdmap, req, &pgid); 1468 if (err) { 1469 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1470 return err; 1471 } 1472 req->r_pgid = pgid; 1473 1474 num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o); 1475 if (num < 0) 1476 num = 0; 1477 1478 was_paused = req->r_paused; 1479 req->r_paused = __req_should_be_paused(osdc, req); 1480 if (was_paused && !req->r_paused) 1481 force_resend = 1; 1482 1483 if ((!force_resend && 1484 req->r_osd && req->r_osd->o_osd == o && 1485 req->r_sent >= req->r_osd->o_incarnation && 1486 req->r_num_pg_osds == num && 1487 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1488 (req->r_osd == NULL && o == -1) || 1489 req->r_paused) 1490 return 0; /* no change */ 1491 1492 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1493 req->r_tid, pgid.pool, pgid.seed, o, 1494 req->r_osd ? req->r_osd->o_osd : -1); 1495 1496 /* record full pg acting set */ 1497 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1498 req->r_num_pg_osds = num; 1499 1500 if (req->r_osd) { 1501 __cancel_request(req); 1502 list_del_init(&req->r_osd_item); 1503 list_del_init(&req->r_linger_osd_item); 1504 req->r_osd = NULL; 1505 } 1506 1507 req->r_osd = __lookup_osd(osdc, o); 1508 if (!req->r_osd && o >= 0) { 1509 err = -ENOMEM; 1510 req->r_osd = create_osd(osdc, o); 1511 if (!req->r_osd) { 1512 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1513 goto out; 1514 } 1515 1516 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1517 __insert_osd(osdc, req->r_osd); 1518 1519 ceph_con_open(&req->r_osd->o_con, 1520 CEPH_ENTITY_TYPE_OSD, o, 1521 &osdc->osdmap->osd_addr[o]); 1522 } 1523 1524 __enqueue_request(req); 1525 err = 1; /* osd or pg changed */ 1526 1527 out: 1528 return err; 1529 } 1530 1531 /* 1532 * caller should hold map_sem (for read) and request_mutex 1533 */ 1534 static void __send_request(struct ceph_osd_client *osdc, 1535 struct ceph_osd_request *req) 1536 { 1537 void *p; 1538 1539 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1540 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1541 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1542 1543 /* fill in message content that changes each time we send it */ 1544 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1545 put_unaligned_le32(req->r_flags, req->r_request_flags); 1546 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); 1547 p = req->r_request_pgid; 1548 ceph_encode_64(&p, req->r_pgid.pool); 1549 ceph_encode_32(&p, req->r_pgid.seed); 1550 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1551 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1552 sizeof(req->r_reassert_version)); 1553 1554 req->r_stamp = jiffies; 1555 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1556 1557 ceph_msg_get(req->r_request); /* send consumes a ref */ 1558 1559 req->r_sent = req->r_osd->o_incarnation; 1560 1561 ceph_con_send(&req->r_osd->o_con, req->r_request); 1562 } 1563 1564 /* 1565 * Send any requests in the queue (req_unsent). 1566 */ 1567 static void __send_queued(struct ceph_osd_client *osdc) 1568 { 1569 struct ceph_osd_request *req, *tmp; 1570 1571 dout("__send_queued\n"); 1572 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1573 __send_request(osdc, req); 1574 } 1575 1576 /* 1577 * Caller should hold map_sem for read and request_mutex. 1578 */ 1579 static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, 1580 struct ceph_osd_request *req, 1581 bool nofail) 1582 { 1583 int rc; 1584 1585 __register_request(osdc, req); 1586 req->r_sent = 0; 1587 req->r_got_reply = 0; 1588 rc = __map_request(osdc, req, 0); 1589 if (rc < 0) { 1590 if (nofail) { 1591 dout("osdc_start_request failed map, " 1592 " will retry %lld\n", req->r_tid); 1593 rc = 0; 1594 } else { 1595 __unregister_request(osdc, req); 1596 } 1597 return rc; 1598 } 1599 1600 if (req->r_osd == NULL) { 1601 dout("send_request %p no up osds in pg\n", req); 1602 ceph_monc_request_next_osdmap(&osdc->client->monc); 1603 } else { 1604 __send_queued(osdc); 1605 } 1606 1607 return 0; 1608 } 1609 1610 /* 1611 * Timeout callback, called every N seconds when 1 or more osd 1612 * requests has been active for more than N seconds. When this 1613 * happens, we ping all OSDs with requests who have timed out to 1614 * ensure any communications channel reset is detected. Reset the 1615 * request timeouts another N seconds in the future as we go. 1616 * Reschedule the timeout event another N seconds in future (unless 1617 * there are no open requests). 1618 */ 1619 static void handle_timeout(struct work_struct *work) 1620 { 1621 struct ceph_osd_client *osdc = 1622 container_of(work, struct ceph_osd_client, timeout_work.work); 1623 struct ceph_options *opts = osdc->client->options; 1624 struct ceph_osd_request *req; 1625 struct ceph_osd *osd; 1626 struct list_head slow_osds; 1627 dout("timeout\n"); 1628 down_read(&osdc->map_sem); 1629 1630 ceph_monc_request_next_osdmap(&osdc->client->monc); 1631 1632 mutex_lock(&osdc->request_mutex); 1633 1634 /* 1635 * ping osds that are a bit slow. this ensures that if there 1636 * is a break in the TCP connection we will notice, and reopen 1637 * a connection with that osd (from the fault callback). 1638 */ 1639 INIT_LIST_HEAD(&slow_osds); 1640 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1641 if (time_before(jiffies, 1642 req->r_stamp + opts->osd_keepalive_timeout)) 1643 break; 1644 1645 osd = req->r_osd; 1646 BUG_ON(!osd); 1647 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1648 req->r_tid, osd->o_osd); 1649 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1650 } 1651 while (!list_empty(&slow_osds)) { 1652 osd = list_entry(slow_osds.next, struct ceph_osd, 1653 o_keepalive_item); 1654 list_del_init(&osd->o_keepalive_item); 1655 ceph_con_keepalive(&osd->o_con); 1656 } 1657 1658 __schedule_osd_timeout(osdc); 1659 __send_queued(osdc); 1660 mutex_unlock(&osdc->request_mutex); 1661 up_read(&osdc->map_sem); 1662 } 1663 1664 static void handle_osds_timeout(struct work_struct *work) 1665 { 1666 struct ceph_osd_client *osdc = 1667 container_of(work, struct ceph_osd_client, 1668 osds_timeout_work.work); 1669 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 1670 1671 dout("osds timeout\n"); 1672 down_read(&osdc->map_sem); 1673 remove_old_osds(osdc); 1674 up_read(&osdc->map_sem); 1675 1676 schedule_delayed_work(&osdc->osds_timeout_work, 1677 round_jiffies_relative(delay)); 1678 } 1679 1680 static int ceph_oloc_decode(void **p, void *end, 1681 struct ceph_object_locator *oloc) 1682 { 1683 u8 struct_v, struct_cv; 1684 u32 len; 1685 void *struct_end; 1686 int ret = 0; 1687 1688 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1689 struct_v = ceph_decode_8(p); 1690 struct_cv = ceph_decode_8(p); 1691 if (struct_v < 3) { 1692 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 1693 struct_v, struct_cv); 1694 goto e_inval; 1695 } 1696 if (struct_cv > 6) { 1697 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 1698 struct_v, struct_cv); 1699 goto e_inval; 1700 } 1701 len = ceph_decode_32(p); 1702 ceph_decode_need(p, end, len, e_inval); 1703 struct_end = *p + len; 1704 1705 oloc->pool = ceph_decode_64(p); 1706 *p += 4; /* skip preferred */ 1707 1708 len = ceph_decode_32(p); 1709 if (len > 0) { 1710 pr_warn("ceph_object_locator::key is set\n"); 1711 goto e_inval; 1712 } 1713 1714 if (struct_v >= 5) { 1715 len = ceph_decode_32(p); 1716 if (len > 0) { 1717 pr_warn("ceph_object_locator::nspace is set\n"); 1718 goto e_inval; 1719 } 1720 } 1721 1722 if (struct_v >= 6) { 1723 s64 hash = ceph_decode_64(p); 1724 if (hash != -1) { 1725 pr_warn("ceph_object_locator::hash is set\n"); 1726 goto e_inval; 1727 } 1728 } 1729 1730 /* skip the rest */ 1731 *p = struct_end; 1732 out: 1733 return ret; 1734 1735 e_inval: 1736 ret = -EINVAL; 1737 goto out; 1738 } 1739 1740 static int ceph_redirect_decode(void **p, void *end, 1741 struct ceph_request_redirect *redir) 1742 { 1743 u8 struct_v, struct_cv; 1744 u32 len; 1745 void *struct_end; 1746 int ret; 1747 1748 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1749 struct_v = ceph_decode_8(p); 1750 struct_cv = ceph_decode_8(p); 1751 if (struct_cv > 1) { 1752 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 1753 struct_v, struct_cv); 1754 goto e_inval; 1755 } 1756 len = ceph_decode_32(p); 1757 ceph_decode_need(p, end, len, e_inval); 1758 struct_end = *p + len; 1759 1760 ret = ceph_oloc_decode(p, end, &redir->oloc); 1761 if (ret) 1762 goto out; 1763 1764 len = ceph_decode_32(p); 1765 if (len > 0) { 1766 pr_warn("ceph_request_redirect::object_name is set\n"); 1767 goto e_inval; 1768 } 1769 1770 len = ceph_decode_32(p); 1771 *p += len; /* skip osd_instructions */ 1772 1773 /* skip the rest */ 1774 *p = struct_end; 1775 out: 1776 return ret; 1777 1778 e_inval: 1779 ret = -EINVAL; 1780 goto out; 1781 } 1782 1783 static void complete_request(struct ceph_osd_request *req) 1784 { 1785 complete_all(&req->r_safe_completion); /* fsync waiter */ 1786 } 1787 1788 /* 1789 * handle osd op reply. either call the callback if it is specified, 1790 * or do the completion to wake up the waiting thread. 1791 */ 1792 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1793 { 1794 void *p, *end; 1795 struct ceph_osd_request *req; 1796 struct ceph_request_redirect redir; 1797 u64 tid; 1798 int object_len; 1799 unsigned int numops; 1800 int payload_len, flags; 1801 s32 result; 1802 s32 retry_attempt; 1803 struct ceph_pg pg; 1804 int err; 1805 u32 reassert_epoch; 1806 u64 reassert_version; 1807 u32 osdmap_epoch; 1808 int already_completed; 1809 u32 bytes; 1810 u8 decode_redir; 1811 unsigned int i; 1812 1813 tid = le64_to_cpu(msg->hdr.tid); 1814 dout("handle_reply %p tid %llu\n", msg, tid); 1815 1816 p = msg->front.iov_base; 1817 end = p + msg->front.iov_len; 1818 1819 ceph_decode_need(&p, end, 4, bad); 1820 object_len = ceph_decode_32(&p); 1821 ceph_decode_need(&p, end, object_len, bad); 1822 p += object_len; 1823 1824 err = ceph_decode_pgid(&p, end, &pg); 1825 if (err) 1826 goto bad; 1827 1828 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1829 flags = ceph_decode_64(&p); 1830 result = ceph_decode_32(&p); 1831 reassert_epoch = ceph_decode_32(&p); 1832 reassert_version = ceph_decode_64(&p); 1833 osdmap_epoch = ceph_decode_32(&p); 1834 1835 /* lookup */ 1836 down_read(&osdc->map_sem); 1837 mutex_lock(&osdc->request_mutex); 1838 req = __lookup_request(osdc, tid); 1839 if (req == NULL) { 1840 dout("handle_reply tid %llu dne\n", tid); 1841 goto bad_mutex; 1842 } 1843 ceph_osdc_get_request(req); 1844 1845 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1846 req, result); 1847 1848 ceph_decode_need(&p, end, 4, bad_put); 1849 numops = ceph_decode_32(&p); 1850 if (numops > CEPH_OSD_MAX_OPS) 1851 goto bad_put; 1852 if (numops != req->r_num_ops) 1853 goto bad_put; 1854 payload_len = 0; 1855 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); 1856 for (i = 0; i < numops; i++) { 1857 struct ceph_osd_op *op = p; 1858 int len; 1859 1860 len = le32_to_cpu(op->payload_len); 1861 req->r_ops[i].outdata_len = len; 1862 dout(" op %d has %d bytes\n", i, len); 1863 payload_len += len; 1864 p += sizeof(*op); 1865 } 1866 bytes = le32_to_cpu(msg->hdr.data_len); 1867 if (payload_len != bytes) { 1868 pr_warn("sum of op payload lens %d != data_len %d\n", 1869 payload_len, bytes); 1870 goto bad_put; 1871 } 1872 1873 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1874 retry_attempt = ceph_decode_32(&p); 1875 for (i = 0; i < numops; i++) 1876 req->r_ops[i].rval = ceph_decode_32(&p); 1877 1878 if (le16_to_cpu(msg->hdr.version) >= 6) { 1879 p += 8 + 4; /* skip replay_version */ 1880 p += 8; /* skip user_version */ 1881 1882 if (le16_to_cpu(msg->hdr.version) >= 7) 1883 ceph_decode_8_safe(&p, end, decode_redir, bad_put); 1884 else 1885 decode_redir = 1; 1886 } else { 1887 decode_redir = 0; 1888 } 1889 1890 if (decode_redir) { 1891 err = ceph_redirect_decode(&p, end, &redir); 1892 if (err) 1893 goto bad_put; 1894 } else { 1895 redir.oloc.pool = -1; 1896 } 1897 1898 if (redir.oloc.pool != -1) { 1899 dout("redirect pool %lld\n", redir.oloc.pool); 1900 1901 __unregister_request(osdc, req); 1902 1903 req->r_target_oloc = redir.oloc; /* struct */ 1904 1905 /* 1906 * Start redirect requests with nofail=true. If 1907 * mapping fails, request will end up on the notarget 1908 * list, waiting for the new osdmap (which can take 1909 * a while), even though the original request mapped 1910 * successfully. In the future we might want to follow 1911 * original request's nofail setting here. 1912 */ 1913 err = __ceph_osdc_start_request(osdc, req, true); 1914 BUG_ON(err); 1915 1916 goto out_unlock; 1917 } 1918 1919 already_completed = req->r_got_reply; 1920 if (!req->r_got_reply) { 1921 req->r_result = result; 1922 dout("handle_reply result %d bytes %d\n", req->r_result, 1923 bytes); 1924 if (req->r_result == 0) 1925 req->r_result = bytes; 1926 1927 /* in case this is a write and we need to replay, */ 1928 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1929 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1930 1931 req->r_got_reply = 1; 1932 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1933 dout("handle_reply tid %llu dup ack\n", tid); 1934 goto out_unlock; 1935 } 1936 1937 dout("handle_reply tid %llu flags %d\n", tid, flags); 1938 1939 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1940 __register_linger_request(osdc, req); 1941 1942 /* either this is a read, or we got the safe response */ 1943 if (result < 0 || 1944 (flags & CEPH_OSD_FLAG_ONDISK) || 1945 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1946 __unregister_request(osdc, req); 1947 1948 mutex_unlock(&osdc->request_mutex); 1949 up_read(&osdc->map_sem); 1950 1951 if (!already_completed) { 1952 if (req->r_unsafe_callback && 1953 result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) 1954 req->r_unsafe_callback(req, true); 1955 if (req->r_callback) 1956 req->r_callback(req, msg); 1957 else 1958 complete_all(&req->r_completion); 1959 } 1960 1961 if (flags & CEPH_OSD_FLAG_ONDISK) { 1962 if (req->r_unsafe_callback && already_completed) 1963 req->r_unsafe_callback(req, false); 1964 complete_request(req); 1965 } 1966 1967 out: 1968 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1969 ceph_osdc_put_request(req); 1970 return; 1971 out_unlock: 1972 mutex_unlock(&osdc->request_mutex); 1973 up_read(&osdc->map_sem); 1974 goto out; 1975 1976 bad_put: 1977 req->r_result = -EIO; 1978 __unregister_request(osdc, req); 1979 if (req->r_callback) 1980 req->r_callback(req, msg); 1981 else 1982 complete_all(&req->r_completion); 1983 complete_request(req); 1984 ceph_osdc_put_request(req); 1985 bad_mutex: 1986 mutex_unlock(&osdc->request_mutex); 1987 up_read(&osdc->map_sem); 1988 bad: 1989 pr_err("corrupt osd_op_reply got %d %d\n", 1990 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1991 ceph_msg_dump(msg); 1992 } 1993 1994 static void reset_changed_osds(struct ceph_osd_client *osdc) 1995 { 1996 struct rb_node *p, *n; 1997 1998 dout("%s %p\n", __func__, osdc); 1999 for (p = rb_first(&osdc->osds); p; p = n) { 2000 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 2001 2002 n = rb_next(p); 2003 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 2004 memcmp(&osd->o_con.peer_addr, 2005 ceph_osd_addr(osdc->osdmap, 2006 osd->o_osd), 2007 sizeof(struct ceph_entity_addr)) != 0) 2008 __reset_osd(osdc, osd); 2009 } 2010 } 2011 2012 /* 2013 * Requeue requests whose mapping to an OSD has changed. If requests map to 2014 * no osd, request a new map. 2015 * 2016 * Caller should hold map_sem for read. 2017 */ 2018 static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, 2019 bool force_resend_writes) 2020 { 2021 struct ceph_osd_request *req, *nreq; 2022 struct rb_node *p; 2023 int needmap = 0; 2024 int err; 2025 bool force_resend_req; 2026 2027 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", 2028 force_resend_writes ? " (force resend writes)" : ""); 2029 mutex_lock(&osdc->request_mutex); 2030 for (p = rb_first(&osdc->requests); p; ) { 2031 req = rb_entry(p, struct ceph_osd_request, r_node); 2032 p = rb_next(p); 2033 2034 /* 2035 * For linger requests that have not yet been 2036 * registered, move them to the linger list; they'll 2037 * be sent to the osd in the loop below. Unregister 2038 * the request before re-registering it as a linger 2039 * request to ensure the __map_request() below 2040 * will decide it needs to be sent. 2041 */ 2042 if (req->r_linger && list_empty(&req->r_linger_item)) { 2043 dout("%p tid %llu restart on osd%d\n", 2044 req, req->r_tid, 2045 req->r_osd ? req->r_osd->o_osd : -1); 2046 ceph_osdc_get_request(req); 2047 __unregister_request(osdc, req); 2048 __register_linger_request(osdc, req); 2049 ceph_osdc_put_request(req); 2050 continue; 2051 } 2052 2053 force_resend_req = force_resend || 2054 (force_resend_writes && 2055 req->r_flags & CEPH_OSD_FLAG_WRITE); 2056 err = __map_request(osdc, req, force_resend_req); 2057 if (err < 0) 2058 continue; /* error */ 2059 if (req->r_osd == NULL) { 2060 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 2061 needmap++; /* request a newer map */ 2062 } else if (err > 0) { 2063 if (!req->r_linger) { 2064 dout("%p tid %llu requeued on osd%d\n", req, 2065 req->r_tid, 2066 req->r_osd ? req->r_osd->o_osd : -1); 2067 req->r_flags |= CEPH_OSD_FLAG_RETRY; 2068 } 2069 } 2070 } 2071 2072 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 2073 r_linger_item) { 2074 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 2075 2076 err = __map_request(osdc, req, 2077 force_resend || force_resend_writes); 2078 dout("__map_request returned %d\n", err); 2079 if (err < 0) 2080 continue; /* hrm! */ 2081 if (req->r_osd == NULL || err > 0) { 2082 if (req->r_osd == NULL) { 2083 dout("lingering %p tid %llu maps to no osd\n", 2084 req, req->r_tid); 2085 /* 2086 * A homeless lingering request makes 2087 * no sense, as it's job is to keep 2088 * a particular OSD connection open. 2089 * Request a newer map and kick the 2090 * request, knowing that it won't be 2091 * resent until we actually get a map 2092 * that can tell us where to send it. 2093 */ 2094 needmap++; 2095 } 2096 2097 dout("kicking lingering %p tid %llu osd%d\n", req, 2098 req->r_tid, req->r_osd ? req->r_osd->o_osd : -1); 2099 __register_request(osdc, req); 2100 __unregister_linger_request(osdc, req); 2101 } 2102 } 2103 reset_changed_osds(osdc); 2104 mutex_unlock(&osdc->request_mutex); 2105 2106 if (needmap) { 2107 dout("%d requests for down osds, need new map\n", needmap); 2108 ceph_monc_request_next_osdmap(&osdc->client->monc); 2109 } 2110 } 2111 2112 2113 /* 2114 * Process updated osd map. 2115 * 2116 * The message contains any number of incremental and full maps, normally 2117 * indicating some sort of topology change in the cluster. Kick requests 2118 * off to different OSDs as needed. 2119 */ 2120 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 2121 { 2122 void *p, *end, *next; 2123 u32 nr_maps, maplen; 2124 u32 epoch; 2125 struct ceph_osdmap *newmap = NULL, *oldmap; 2126 int err; 2127 struct ceph_fsid fsid; 2128 bool was_full; 2129 2130 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 2131 p = msg->front.iov_base; 2132 end = p + msg->front.iov_len; 2133 2134 /* verify fsid */ 2135 ceph_decode_need(&p, end, sizeof(fsid), bad); 2136 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 2137 if (ceph_check_fsid(osdc->client, &fsid) < 0) 2138 return; 2139 2140 down_write(&osdc->map_sem); 2141 2142 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 2143 2144 /* incremental maps */ 2145 ceph_decode_32_safe(&p, end, nr_maps, bad); 2146 dout(" %d inc maps\n", nr_maps); 2147 while (nr_maps > 0) { 2148 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2149 epoch = ceph_decode_32(&p); 2150 maplen = ceph_decode_32(&p); 2151 ceph_decode_need(&p, end, maplen, bad); 2152 next = p + maplen; 2153 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 2154 dout("applying incremental map %u len %d\n", 2155 epoch, maplen); 2156 newmap = osdmap_apply_incremental(&p, next, 2157 osdc->osdmap, 2158 &osdc->client->msgr); 2159 if (IS_ERR(newmap)) { 2160 err = PTR_ERR(newmap); 2161 goto bad; 2162 } 2163 BUG_ON(!newmap); 2164 if (newmap != osdc->osdmap) { 2165 ceph_osdmap_destroy(osdc->osdmap); 2166 osdc->osdmap = newmap; 2167 } 2168 was_full = was_full || 2169 ceph_osdmap_flag(osdc->osdmap, 2170 CEPH_OSDMAP_FULL); 2171 kick_requests(osdc, 0, was_full); 2172 } else { 2173 dout("ignoring incremental map %u len %d\n", 2174 epoch, maplen); 2175 } 2176 p = next; 2177 nr_maps--; 2178 } 2179 if (newmap) 2180 goto done; 2181 2182 /* full maps */ 2183 ceph_decode_32_safe(&p, end, nr_maps, bad); 2184 dout(" %d full maps\n", nr_maps); 2185 while (nr_maps) { 2186 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2187 epoch = ceph_decode_32(&p); 2188 maplen = ceph_decode_32(&p); 2189 ceph_decode_need(&p, end, maplen, bad); 2190 if (nr_maps > 1) { 2191 dout("skipping non-latest full map %u len %d\n", 2192 epoch, maplen); 2193 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 2194 dout("skipping full map %u len %d, " 2195 "older than our %u\n", epoch, maplen, 2196 osdc->osdmap->epoch); 2197 } else { 2198 int skipped_map = 0; 2199 2200 dout("taking full map %u len %d\n", epoch, maplen); 2201 newmap = ceph_osdmap_decode(&p, p+maplen); 2202 if (IS_ERR(newmap)) { 2203 err = PTR_ERR(newmap); 2204 goto bad; 2205 } 2206 BUG_ON(!newmap); 2207 oldmap = osdc->osdmap; 2208 osdc->osdmap = newmap; 2209 if (oldmap) { 2210 if (oldmap->epoch + 1 < newmap->epoch) 2211 skipped_map = 1; 2212 ceph_osdmap_destroy(oldmap); 2213 } 2214 was_full = was_full || 2215 ceph_osdmap_flag(osdc->osdmap, 2216 CEPH_OSDMAP_FULL); 2217 kick_requests(osdc, skipped_map, was_full); 2218 } 2219 p += maplen; 2220 nr_maps--; 2221 } 2222 2223 if (!osdc->osdmap) 2224 goto bad; 2225 done: 2226 downgrade_write(&osdc->map_sem); 2227 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 2228 osdc->osdmap->epoch); 2229 2230 /* 2231 * subscribe to subsequent osdmap updates if full to ensure 2232 * we find out when we are no longer full and stop returning 2233 * ENOSPC. 2234 */ 2235 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || 2236 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || 2237 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) 2238 ceph_monc_request_next_osdmap(&osdc->client->monc); 2239 2240 mutex_lock(&osdc->request_mutex); 2241 __send_queued(osdc); 2242 mutex_unlock(&osdc->request_mutex); 2243 up_read(&osdc->map_sem); 2244 wake_up_all(&osdc->client->auth_wq); 2245 return; 2246 2247 bad: 2248 pr_err("osdc handle_map corrupt msg\n"); 2249 ceph_msg_dump(msg); 2250 up_write(&osdc->map_sem); 2251 } 2252 2253 /* 2254 * watch/notify callback event infrastructure 2255 * 2256 * These callbacks are used both for watch and notify operations. 2257 */ 2258 static void __release_event(struct kref *kref) 2259 { 2260 struct ceph_osd_event *event = 2261 container_of(kref, struct ceph_osd_event, kref); 2262 2263 dout("__release_event %p\n", event); 2264 kfree(event); 2265 } 2266 2267 static void get_event(struct ceph_osd_event *event) 2268 { 2269 kref_get(&event->kref); 2270 } 2271 2272 void ceph_osdc_put_event(struct ceph_osd_event *event) 2273 { 2274 kref_put(&event->kref, __release_event); 2275 } 2276 EXPORT_SYMBOL(ceph_osdc_put_event); 2277 2278 static void __insert_event(struct ceph_osd_client *osdc, 2279 struct ceph_osd_event *new) 2280 { 2281 struct rb_node **p = &osdc->event_tree.rb_node; 2282 struct rb_node *parent = NULL; 2283 struct ceph_osd_event *event = NULL; 2284 2285 while (*p) { 2286 parent = *p; 2287 event = rb_entry(parent, struct ceph_osd_event, node); 2288 if (new->cookie < event->cookie) 2289 p = &(*p)->rb_left; 2290 else if (new->cookie > event->cookie) 2291 p = &(*p)->rb_right; 2292 else 2293 BUG(); 2294 } 2295 2296 rb_link_node(&new->node, parent, p); 2297 rb_insert_color(&new->node, &osdc->event_tree); 2298 } 2299 2300 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 2301 u64 cookie) 2302 { 2303 struct rb_node **p = &osdc->event_tree.rb_node; 2304 struct rb_node *parent = NULL; 2305 struct ceph_osd_event *event = NULL; 2306 2307 while (*p) { 2308 parent = *p; 2309 event = rb_entry(parent, struct ceph_osd_event, node); 2310 if (cookie < event->cookie) 2311 p = &(*p)->rb_left; 2312 else if (cookie > event->cookie) 2313 p = &(*p)->rb_right; 2314 else 2315 return event; 2316 } 2317 return NULL; 2318 } 2319 2320 static void __remove_event(struct ceph_osd_event *event) 2321 { 2322 struct ceph_osd_client *osdc = event->osdc; 2323 2324 if (!RB_EMPTY_NODE(&event->node)) { 2325 dout("__remove_event removed %p\n", event); 2326 rb_erase(&event->node, &osdc->event_tree); 2327 ceph_osdc_put_event(event); 2328 } else { 2329 dout("__remove_event didn't remove %p\n", event); 2330 } 2331 } 2332 2333 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 2334 void (*event_cb)(u64, u64, u8, void *), 2335 void *data, struct ceph_osd_event **pevent) 2336 { 2337 struct ceph_osd_event *event; 2338 2339 event = kmalloc(sizeof(*event), GFP_NOIO); 2340 if (!event) 2341 return -ENOMEM; 2342 2343 dout("create_event %p\n", event); 2344 event->cb = event_cb; 2345 event->one_shot = 0; 2346 event->data = data; 2347 event->osdc = osdc; 2348 INIT_LIST_HEAD(&event->osd_node); 2349 RB_CLEAR_NODE(&event->node); 2350 kref_init(&event->kref); /* one ref for us */ 2351 kref_get(&event->kref); /* one ref for the caller */ 2352 2353 spin_lock(&osdc->event_lock); 2354 event->cookie = ++osdc->event_count; 2355 __insert_event(osdc, event); 2356 spin_unlock(&osdc->event_lock); 2357 2358 *pevent = event; 2359 return 0; 2360 } 2361 EXPORT_SYMBOL(ceph_osdc_create_event); 2362 2363 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 2364 { 2365 struct ceph_osd_client *osdc = event->osdc; 2366 2367 dout("cancel_event %p\n", event); 2368 spin_lock(&osdc->event_lock); 2369 __remove_event(event); 2370 spin_unlock(&osdc->event_lock); 2371 ceph_osdc_put_event(event); /* caller's */ 2372 } 2373 EXPORT_SYMBOL(ceph_osdc_cancel_event); 2374 2375 2376 static void do_event_work(struct work_struct *work) 2377 { 2378 struct ceph_osd_event_work *event_work = 2379 container_of(work, struct ceph_osd_event_work, work); 2380 struct ceph_osd_event *event = event_work->event; 2381 u64 ver = event_work->ver; 2382 u64 notify_id = event_work->notify_id; 2383 u8 opcode = event_work->opcode; 2384 2385 dout("do_event_work completing %p\n", event); 2386 event->cb(ver, notify_id, opcode, event->data); 2387 dout("do_event_work completed %p\n", event); 2388 ceph_osdc_put_event(event); 2389 kfree(event_work); 2390 } 2391 2392 2393 /* 2394 * Process osd watch notifications 2395 */ 2396 static void handle_watch_notify(struct ceph_osd_client *osdc, 2397 struct ceph_msg *msg) 2398 { 2399 void *p, *end; 2400 u8 proto_ver; 2401 u64 cookie, ver, notify_id; 2402 u8 opcode; 2403 struct ceph_osd_event *event; 2404 struct ceph_osd_event_work *event_work; 2405 2406 p = msg->front.iov_base; 2407 end = p + msg->front.iov_len; 2408 2409 ceph_decode_8_safe(&p, end, proto_ver, bad); 2410 ceph_decode_8_safe(&p, end, opcode, bad); 2411 ceph_decode_64_safe(&p, end, cookie, bad); 2412 ceph_decode_64_safe(&p, end, ver, bad); 2413 ceph_decode_64_safe(&p, end, notify_id, bad); 2414 2415 spin_lock(&osdc->event_lock); 2416 event = __find_event(osdc, cookie); 2417 if (event) { 2418 BUG_ON(event->one_shot); 2419 get_event(event); 2420 } 2421 spin_unlock(&osdc->event_lock); 2422 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 2423 cookie, ver, event); 2424 if (event) { 2425 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 2426 if (!event_work) { 2427 pr_err("couldn't allocate event_work\n"); 2428 ceph_osdc_put_event(event); 2429 return; 2430 } 2431 INIT_WORK(&event_work->work, do_event_work); 2432 event_work->event = event; 2433 event_work->ver = ver; 2434 event_work->notify_id = notify_id; 2435 event_work->opcode = opcode; 2436 2437 queue_work(osdc->notify_wq, &event_work->work); 2438 } 2439 2440 return; 2441 2442 bad: 2443 pr_err("osdc handle_watch_notify corrupt msg\n"); 2444 } 2445 2446 /* 2447 * build new request AND message 2448 * 2449 */ 2450 void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, 2451 struct ceph_snap_context *snapc, u64 snap_id, 2452 struct timespec *mtime) 2453 { 2454 struct ceph_msg *msg = req->r_request; 2455 void *p; 2456 size_t msg_size; 2457 int flags = req->r_flags; 2458 u64 data_len; 2459 unsigned int i; 2460 2461 req->r_snapid = snap_id; 2462 WARN_ON(snapc != req->r_snapc); 2463 2464 /* encode request */ 2465 msg->hdr.version = cpu_to_le16(4); 2466 2467 p = msg->front.iov_base; 2468 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 2469 req->r_request_osdmap_epoch = p; 2470 p += 4; 2471 req->r_request_flags = p; 2472 p += 4; 2473 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 2474 ceph_encode_timespec(p, mtime); 2475 p += sizeof(struct ceph_timespec); 2476 req->r_request_reassert_version = p; 2477 p += sizeof(struct ceph_eversion); /* will get filled in */ 2478 2479 /* oloc */ 2480 ceph_encode_8(&p, 4); 2481 ceph_encode_8(&p, 4); 2482 ceph_encode_32(&p, 8 + 4 + 4); 2483 req->r_request_pool = p; 2484 p += 8; 2485 ceph_encode_32(&p, -1); /* preferred */ 2486 ceph_encode_32(&p, 0); /* key len */ 2487 2488 ceph_encode_8(&p, 1); 2489 req->r_request_pgid = p; 2490 p += 8 + 4; 2491 ceph_encode_32(&p, -1); /* preferred */ 2492 2493 /* oid */ 2494 ceph_encode_32(&p, req->r_base_oid.name_len); 2495 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); 2496 dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, 2497 req->r_base_oid.name, req->r_base_oid.name_len); 2498 p += req->r_base_oid.name_len; 2499 2500 /* ops--can imply data */ 2501 ceph_encode_16(&p, (u16)req->r_num_ops); 2502 data_len = 0; 2503 for (i = 0; i < req->r_num_ops; i++) { 2504 data_len += osd_req_encode_op(req, p, i); 2505 p += sizeof(struct ceph_osd_op); 2506 } 2507 2508 /* snaps */ 2509 ceph_encode_64(&p, req->r_snapid); 2510 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 2511 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 2512 if (req->r_snapc) { 2513 for (i = 0; i < req->r_snapc->num_snaps; i++) { 2514 ceph_encode_64(&p, req->r_snapc->snaps[i]); 2515 } 2516 } 2517 2518 req->r_request_attempts = p; 2519 p += 4; 2520 2521 /* data */ 2522 if (flags & CEPH_OSD_FLAG_WRITE) { 2523 u16 data_off; 2524 2525 /* 2526 * The header "data_off" is a hint to the receiver 2527 * allowing it to align received data into its 2528 * buffers such that there's no need to re-copy 2529 * it before writing it to disk (direct I/O). 2530 */ 2531 data_off = (u16) (off & 0xffff); 2532 req->r_request->hdr.data_off = cpu_to_le16(data_off); 2533 } 2534 req->r_request->hdr.data_len = cpu_to_le32(data_len); 2535 2536 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 2537 msg_size = p - msg->front.iov_base; 2538 msg->front.iov_len = msg_size; 2539 msg->hdr.front_len = cpu_to_le32(msg_size); 2540 2541 dout("build_request msg_size was %d\n", (int)msg_size); 2542 } 2543 EXPORT_SYMBOL(ceph_osdc_build_request); 2544 2545 /* 2546 * Register request, send initial attempt. 2547 */ 2548 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2549 struct ceph_osd_request *req, 2550 bool nofail) 2551 { 2552 int rc; 2553 2554 down_read(&osdc->map_sem); 2555 mutex_lock(&osdc->request_mutex); 2556 2557 rc = __ceph_osdc_start_request(osdc, req, nofail); 2558 2559 mutex_unlock(&osdc->request_mutex); 2560 up_read(&osdc->map_sem); 2561 2562 return rc; 2563 } 2564 EXPORT_SYMBOL(ceph_osdc_start_request); 2565 2566 /* 2567 * Unregister a registered request. The request is not completed (i.e. 2568 * no callbacks or wakeups) - higher layers are supposed to know what 2569 * they are canceling. 2570 */ 2571 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 2572 { 2573 struct ceph_osd_client *osdc = req->r_osdc; 2574 2575 mutex_lock(&osdc->request_mutex); 2576 if (req->r_linger) 2577 __unregister_linger_request(osdc, req); 2578 __unregister_request(osdc, req); 2579 mutex_unlock(&osdc->request_mutex); 2580 2581 dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid); 2582 } 2583 EXPORT_SYMBOL(ceph_osdc_cancel_request); 2584 2585 /* 2586 * wait for a request to complete 2587 */ 2588 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 2589 struct ceph_osd_request *req) 2590 { 2591 int rc; 2592 2593 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 2594 2595 rc = wait_for_completion_interruptible(&req->r_completion); 2596 if (rc < 0) { 2597 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); 2598 ceph_osdc_cancel_request(req); 2599 complete_request(req); 2600 return rc; 2601 } 2602 2603 dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid, 2604 req->r_result); 2605 return req->r_result; 2606 } 2607 EXPORT_SYMBOL(ceph_osdc_wait_request); 2608 2609 /* 2610 * sync - wait for all in-flight requests to flush. avoid starvation. 2611 */ 2612 void ceph_osdc_sync(struct ceph_osd_client *osdc) 2613 { 2614 struct ceph_osd_request *req; 2615 u64 last_tid, next_tid = 0; 2616 2617 mutex_lock(&osdc->request_mutex); 2618 last_tid = osdc->last_tid; 2619 while (1) { 2620 req = __lookup_request_ge(osdc, next_tid); 2621 if (!req) 2622 break; 2623 if (req->r_tid > last_tid) 2624 break; 2625 2626 next_tid = req->r_tid + 1; 2627 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 2628 continue; 2629 2630 ceph_osdc_get_request(req); 2631 mutex_unlock(&osdc->request_mutex); 2632 dout("sync waiting on tid %llu (last is %llu)\n", 2633 req->r_tid, last_tid); 2634 wait_for_completion(&req->r_safe_completion); 2635 mutex_lock(&osdc->request_mutex); 2636 ceph_osdc_put_request(req); 2637 } 2638 mutex_unlock(&osdc->request_mutex); 2639 dout("sync done (thru tid %llu)\n", last_tid); 2640 } 2641 EXPORT_SYMBOL(ceph_osdc_sync); 2642 2643 /* 2644 * Call all pending notify callbacks - for use after a watch is 2645 * unregistered, to make sure no more callbacks for it will be invoked 2646 */ 2647 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 2648 { 2649 flush_workqueue(osdc->notify_wq); 2650 } 2651 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 2652 2653 2654 /* 2655 * init, shutdown 2656 */ 2657 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 2658 { 2659 int err; 2660 2661 dout("init\n"); 2662 osdc->client = client; 2663 osdc->osdmap = NULL; 2664 init_rwsem(&osdc->map_sem); 2665 init_completion(&osdc->map_waiters); 2666 osdc->last_requested_map = 0; 2667 mutex_init(&osdc->request_mutex); 2668 osdc->last_tid = 0; 2669 osdc->osds = RB_ROOT; 2670 INIT_LIST_HEAD(&osdc->osd_lru); 2671 osdc->requests = RB_ROOT; 2672 INIT_LIST_HEAD(&osdc->req_lru); 2673 INIT_LIST_HEAD(&osdc->req_unsent); 2674 INIT_LIST_HEAD(&osdc->req_notarget); 2675 INIT_LIST_HEAD(&osdc->req_linger); 2676 osdc->num_requests = 0; 2677 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 2678 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 2679 spin_lock_init(&osdc->event_lock); 2680 osdc->event_tree = RB_ROOT; 2681 osdc->event_count = 0; 2682 2683 schedule_delayed_work(&osdc->osds_timeout_work, 2684 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 2685 2686 err = -ENOMEM; 2687 osdc->req_mempool = mempool_create_slab_pool(10, 2688 ceph_osd_request_cache); 2689 if (!osdc->req_mempool) 2690 goto out; 2691 2692 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2693 OSD_OP_FRONT_LEN, 10, true, 2694 "osd_op"); 2695 if (err < 0) 2696 goto out_mempool; 2697 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2698 OSD_OPREPLY_FRONT_LEN, 10, true, 2699 "osd_op_reply"); 2700 if (err < 0) 2701 goto out_msgpool; 2702 2703 err = -ENOMEM; 2704 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2705 if (!osdc->notify_wq) 2706 goto out_msgpool_reply; 2707 2708 return 0; 2709 2710 out_msgpool_reply: 2711 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2712 out_msgpool: 2713 ceph_msgpool_destroy(&osdc->msgpool_op); 2714 out_mempool: 2715 mempool_destroy(osdc->req_mempool); 2716 out: 2717 return err; 2718 } 2719 2720 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2721 { 2722 flush_workqueue(osdc->notify_wq); 2723 destroy_workqueue(osdc->notify_wq); 2724 cancel_delayed_work_sync(&osdc->timeout_work); 2725 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2726 if (osdc->osdmap) { 2727 ceph_osdmap_destroy(osdc->osdmap); 2728 osdc->osdmap = NULL; 2729 } 2730 remove_all_osds(osdc); 2731 mempool_destroy(osdc->req_mempool); 2732 ceph_msgpool_destroy(&osdc->msgpool_op); 2733 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2734 } 2735 2736 /* 2737 * Read some contiguous pages. If we cross a stripe boundary, shorten 2738 * *plen. Return number of bytes read, or error. 2739 */ 2740 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2741 struct ceph_vino vino, struct ceph_file_layout *layout, 2742 u64 off, u64 *plen, 2743 u32 truncate_seq, u64 truncate_size, 2744 struct page **pages, int num_pages, int page_align) 2745 { 2746 struct ceph_osd_request *req; 2747 int rc = 0; 2748 2749 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2750 vino.snap, off, *plen); 2751 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, 2752 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2753 NULL, truncate_seq, truncate_size, 2754 false); 2755 if (IS_ERR(req)) 2756 return PTR_ERR(req); 2757 2758 /* it may be a short read due to an object boundary */ 2759 2760 osd_req_op_extent_osd_data_pages(req, 0, 2761 pages, *plen, page_align, false, false); 2762 2763 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2764 off, *plen, *plen, page_align); 2765 2766 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); 2767 2768 rc = ceph_osdc_start_request(osdc, req, false); 2769 if (!rc) 2770 rc = ceph_osdc_wait_request(osdc, req); 2771 2772 ceph_osdc_put_request(req); 2773 dout("readpages result %d\n", rc); 2774 return rc; 2775 } 2776 EXPORT_SYMBOL(ceph_osdc_readpages); 2777 2778 /* 2779 * do a synchronous write on N pages 2780 */ 2781 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2782 struct ceph_file_layout *layout, 2783 struct ceph_snap_context *snapc, 2784 u64 off, u64 len, 2785 u32 truncate_seq, u64 truncate_size, 2786 struct timespec *mtime, 2787 struct page **pages, int num_pages) 2788 { 2789 struct ceph_osd_request *req; 2790 int rc = 0; 2791 int page_align = off & ~PAGE_MASK; 2792 2793 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ 2794 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 2795 CEPH_OSD_OP_WRITE, 2796 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2797 snapc, truncate_seq, truncate_size, 2798 true); 2799 if (IS_ERR(req)) 2800 return PTR_ERR(req); 2801 2802 /* it may be a short write due to an object boundary */ 2803 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 2804 false, false); 2805 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2806 2807 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2808 2809 rc = ceph_osdc_start_request(osdc, req, true); 2810 if (!rc) 2811 rc = ceph_osdc_wait_request(osdc, req); 2812 2813 ceph_osdc_put_request(req); 2814 if (rc == 0) 2815 rc = len; 2816 dout("writepages result %d\n", rc); 2817 return rc; 2818 } 2819 EXPORT_SYMBOL(ceph_osdc_writepages); 2820 2821 int ceph_osdc_setup(void) 2822 { 2823 size_t size = sizeof(struct ceph_osd_request) + 2824 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 2825 2826 BUG_ON(ceph_osd_request_cache); 2827 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 2828 0, 0, NULL); 2829 2830 return ceph_osd_request_cache ? 0 : -ENOMEM; 2831 } 2832 EXPORT_SYMBOL(ceph_osdc_setup); 2833 2834 void ceph_osdc_cleanup(void) 2835 { 2836 BUG_ON(!ceph_osd_request_cache); 2837 kmem_cache_destroy(ceph_osd_request_cache); 2838 ceph_osd_request_cache = NULL; 2839 } 2840 EXPORT_SYMBOL(ceph_osdc_cleanup); 2841 2842 /* 2843 * handle incoming message 2844 */ 2845 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2846 { 2847 struct ceph_osd *osd = con->private; 2848 struct ceph_osd_client *osdc; 2849 int type = le16_to_cpu(msg->hdr.type); 2850 2851 if (!osd) 2852 goto out; 2853 osdc = osd->o_osdc; 2854 2855 switch (type) { 2856 case CEPH_MSG_OSD_MAP: 2857 ceph_osdc_handle_map(osdc, msg); 2858 break; 2859 case CEPH_MSG_OSD_OPREPLY: 2860 handle_reply(osdc, msg); 2861 break; 2862 case CEPH_MSG_WATCH_NOTIFY: 2863 handle_watch_notify(osdc, msg); 2864 break; 2865 2866 default: 2867 pr_err("received unknown message type %d %s\n", type, 2868 ceph_msg_type_name(type)); 2869 } 2870 out: 2871 ceph_msg_put(msg); 2872 } 2873 2874 /* 2875 * Lookup and return message for incoming reply. Don't try to do 2876 * anything about a larger than preallocated data portion of the 2877 * message at the moment - for now, just skip the message. 2878 */ 2879 static struct ceph_msg *get_reply(struct ceph_connection *con, 2880 struct ceph_msg_header *hdr, 2881 int *skip) 2882 { 2883 struct ceph_osd *osd = con->private; 2884 struct ceph_osd_client *osdc = osd->o_osdc; 2885 struct ceph_msg *m; 2886 struct ceph_osd_request *req; 2887 int front_len = le32_to_cpu(hdr->front_len); 2888 int data_len = le32_to_cpu(hdr->data_len); 2889 u64 tid; 2890 2891 tid = le64_to_cpu(hdr->tid); 2892 mutex_lock(&osdc->request_mutex); 2893 req = __lookup_request(osdc, tid); 2894 if (!req) { 2895 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 2896 osd->o_osd, tid); 2897 m = NULL; 2898 *skip = 1; 2899 goto out; 2900 } 2901 2902 ceph_msg_revoke_incoming(req->r_reply); 2903 2904 if (front_len > req->r_reply->front_alloc_len) { 2905 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 2906 __func__, osd->o_osd, req->r_tid, front_len, 2907 req->r_reply->front_alloc_len); 2908 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 2909 false); 2910 if (!m) 2911 goto out; 2912 ceph_msg_put(req->r_reply); 2913 req->r_reply = m; 2914 } 2915 2916 if (data_len > req->r_reply->data_length) { 2917 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 2918 __func__, osd->o_osd, req->r_tid, data_len, 2919 req->r_reply->data_length); 2920 m = NULL; 2921 *skip = 1; 2922 goto out; 2923 } 2924 2925 m = ceph_msg_get(req->r_reply); 2926 dout("get_reply tid %lld %p\n", tid, m); 2927 2928 out: 2929 mutex_unlock(&osdc->request_mutex); 2930 return m; 2931 } 2932 2933 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2934 struct ceph_msg_header *hdr, 2935 int *skip) 2936 { 2937 struct ceph_osd *osd = con->private; 2938 int type = le16_to_cpu(hdr->type); 2939 int front = le32_to_cpu(hdr->front_len); 2940 2941 *skip = 0; 2942 switch (type) { 2943 case CEPH_MSG_OSD_MAP: 2944 case CEPH_MSG_WATCH_NOTIFY: 2945 return ceph_msg_new(type, front, GFP_NOFS, false); 2946 case CEPH_MSG_OSD_OPREPLY: 2947 return get_reply(con, hdr, skip); 2948 default: 2949 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2950 osd->o_osd); 2951 *skip = 1; 2952 return NULL; 2953 } 2954 } 2955 2956 /* 2957 * Wrappers to refcount containing ceph_osd struct 2958 */ 2959 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2960 { 2961 struct ceph_osd *osd = con->private; 2962 if (get_osd(osd)) 2963 return con; 2964 return NULL; 2965 } 2966 2967 static void put_osd_con(struct ceph_connection *con) 2968 { 2969 struct ceph_osd *osd = con->private; 2970 put_osd(osd); 2971 } 2972 2973 /* 2974 * authentication 2975 */ 2976 /* 2977 * Note: returned pointer is the address of a structure that's 2978 * managed separately. Caller must *not* attempt to free it. 2979 */ 2980 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2981 int *proto, int force_new) 2982 { 2983 struct ceph_osd *o = con->private; 2984 struct ceph_osd_client *osdc = o->o_osdc; 2985 struct ceph_auth_client *ac = osdc->client->monc.auth; 2986 struct ceph_auth_handshake *auth = &o->o_auth; 2987 2988 if (force_new && auth->authorizer) { 2989 ceph_auth_destroy_authorizer(auth->authorizer); 2990 auth->authorizer = NULL; 2991 } 2992 if (!auth->authorizer) { 2993 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2994 auth); 2995 if (ret) 2996 return ERR_PTR(ret); 2997 } else { 2998 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2999 auth); 3000 if (ret) 3001 return ERR_PTR(ret); 3002 } 3003 *proto = ac->protocol; 3004 3005 return auth; 3006 } 3007 3008 3009 static int verify_authorizer_reply(struct ceph_connection *con, int len) 3010 { 3011 struct ceph_osd *o = con->private; 3012 struct ceph_osd_client *osdc = o->o_osdc; 3013 struct ceph_auth_client *ac = osdc->client->monc.auth; 3014 3015 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 3016 } 3017 3018 static int invalidate_authorizer(struct ceph_connection *con) 3019 { 3020 struct ceph_osd *o = con->private; 3021 struct ceph_osd_client *osdc = o->o_osdc; 3022 struct ceph_auth_client *ac = osdc->client->monc.auth; 3023 3024 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 3025 return ceph_monc_validate_auth(&osdc->client->monc); 3026 } 3027 3028 static int osd_sign_message(struct ceph_msg *msg) 3029 { 3030 struct ceph_osd *o = msg->con->private; 3031 struct ceph_auth_handshake *auth = &o->o_auth; 3032 3033 return ceph_auth_sign_message(auth, msg); 3034 } 3035 3036 static int osd_check_message_signature(struct ceph_msg *msg) 3037 { 3038 struct ceph_osd *o = msg->con->private; 3039 struct ceph_auth_handshake *auth = &o->o_auth; 3040 3041 return ceph_auth_check_message_signature(auth, msg); 3042 } 3043 3044 static const struct ceph_connection_operations osd_con_ops = { 3045 .get = get_osd_con, 3046 .put = put_osd_con, 3047 .dispatch = dispatch, 3048 .get_authorizer = get_authorizer, 3049 .verify_authorizer_reply = verify_authorizer_reply, 3050 .invalidate_authorizer = invalidate_authorizer, 3051 .alloc_msg = alloc_msg, 3052 .sign_message = osd_sign_message, 3053 .check_message_signature = osd_check_message_signature, 3054 .fault = osd_reset, 3055 }; 3056