1 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/err.h> 6 #include <linux/highmem.h> 7 #include <linux/mm.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/uaccess.h> 11 #ifdef CONFIG_BLOCK 12 #include <linux/bio.h> 13 #endif 14 15 #include <linux/ceph/libceph.h> 16 #include <linux/ceph/osd_client.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/auth.h> 20 #include <linux/ceph/pagelist.h> 21 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static struct kmem_cache *ceph_osd_request_cache; 25 26 static const struct ceph_connection_operations osd_con_ops; 27 28 static void __send_queued(struct ceph_osd_client *osdc); 29 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 30 static void __register_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __unregister_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 static void __unregister_linger_request(struct ceph_osd_client *osdc, 35 struct ceph_osd_request *req); 36 static void __enqueue_request(struct ceph_osd_request *req); 37 static void __send_request(struct ceph_osd_client *osdc, 38 struct ceph_osd_request *req); 39 40 /* 41 * Implement client access to distributed object storage cluster. 42 * 43 * All data objects are stored within a cluster/cloud of OSDs, or 44 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 45 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 46 * remote daemons serving up and coordinating consistent and safe 47 * access to storage. 48 * 49 * Cluster membership and the mapping of data objects onto storage devices 50 * are described by the osd map. 51 * 52 * We keep track of pending OSD requests (read, write), resubmit 53 * requests to different OSDs when the cluster topology/data layout 54 * change, or retry the affected requests when the communications 55 * channel with an OSD is reset. 56 */ 57 58 /* 59 * calculate the mapping of a file extent onto an object, and fill out the 60 * request accordingly. shorten extent as necessary if it crosses an 61 * object boundary. 62 * 63 * fill osd op in request message. 64 */ 65 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 66 u64 *objnum, u64 *objoff, u64 *objlen) 67 { 68 u64 orig_len = *plen; 69 int r; 70 71 /* object extent? */ 72 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 73 objoff, objlen); 74 if (r < 0) 75 return r; 76 if (*objlen < orig_len) { 77 *plen = *objlen; 78 dout(" skipping last %llu, final file extent %llu~%llu\n", 79 orig_len - *plen, off, *plen); 80 } 81 82 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 83 84 return 0; 85 } 86 87 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 88 { 89 memset(osd_data, 0, sizeof (*osd_data)); 90 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 91 } 92 93 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 94 struct page **pages, u64 length, u32 alignment, 95 bool pages_from_pool, bool own_pages) 96 { 97 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 98 osd_data->pages = pages; 99 osd_data->length = length; 100 osd_data->alignment = alignment; 101 osd_data->pages_from_pool = pages_from_pool; 102 osd_data->own_pages = own_pages; 103 } 104 105 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 106 struct ceph_pagelist *pagelist) 107 { 108 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 109 osd_data->pagelist = pagelist; 110 } 111 112 #ifdef CONFIG_BLOCK 113 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 114 struct bio *bio, size_t bio_length) 115 { 116 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 117 osd_data->bio = bio; 118 osd_data->bio_length = bio_length; 119 } 120 #endif /* CONFIG_BLOCK */ 121 122 #define osd_req_op_data(oreq, whch, typ, fld) \ 123 ({ \ 124 struct ceph_osd_request *__oreq = (oreq); \ 125 unsigned int __whch = (whch); \ 126 BUG_ON(__whch >= __oreq->r_num_ops); \ 127 &__oreq->r_ops[__whch].typ.fld; \ 128 }) 129 130 static struct ceph_osd_data * 131 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 132 { 133 BUG_ON(which >= osd_req->r_num_ops); 134 135 return &osd_req->r_ops[which].raw_data_in; 136 } 137 138 struct ceph_osd_data * 139 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 140 unsigned int which) 141 { 142 return osd_req_op_data(osd_req, which, extent, osd_data); 143 } 144 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 145 146 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 147 unsigned int which, struct page **pages, 148 u64 length, u32 alignment, 149 bool pages_from_pool, bool own_pages) 150 { 151 struct ceph_osd_data *osd_data; 152 153 osd_data = osd_req_op_raw_data_in(osd_req, which); 154 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 155 pages_from_pool, own_pages); 156 } 157 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 158 159 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 160 unsigned int which, struct page **pages, 161 u64 length, u32 alignment, 162 bool pages_from_pool, bool own_pages) 163 { 164 struct ceph_osd_data *osd_data; 165 166 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 167 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 168 pages_from_pool, own_pages); 169 } 170 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 171 172 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 173 unsigned int which, struct ceph_pagelist *pagelist) 174 { 175 struct ceph_osd_data *osd_data; 176 177 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 178 ceph_osd_data_pagelist_init(osd_data, pagelist); 179 } 180 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 181 182 #ifdef CONFIG_BLOCK 183 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 184 unsigned int which, struct bio *bio, size_t bio_length) 185 { 186 struct ceph_osd_data *osd_data; 187 188 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 189 ceph_osd_data_bio_init(osd_data, bio, bio_length); 190 } 191 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 192 #endif /* CONFIG_BLOCK */ 193 194 static void osd_req_op_cls_request_info_pagelist( 195 struct ceph_osd_request *osd_req, 196 unsigned int which, struct ceph_pagelist *pagelist) 197 { 198 struct ceph_osd_data *osd_data; 199 200 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 201 ceph_osd_data_pagelist_init(osd_data, pagelist); 202 } 203 204 void osd_req_op_cls_request_data_pagelist( 205 struct ceph_osd_request *osd_req, 206 unsigned int which, struct ceph_pagelist *pagelist) 207 { 208 struct ceph_osd_data *osd_data; 209 210 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 211 ceph_osd_data_pagelist_init(osd_data, pagelist); 212 } 213 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 214 215 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 216 unsigned int which, struct page **pages, u64 length, 217 u32 alignment, bool pages_from_pool, bool own_pages) 218 { 219 struct ceph_osd_data *osd_data; 220 221 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 222 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 223 pages_from_pool, own_pages); 224 } 225 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 226 227 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 228 unsigned int which, struct page **pages, u64 length, 229 u32 alignment, bool pages_from_pool, bool own_pages) 230 { 231 struct ceph_osd_data *osd_data; 232 233 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 234 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 235 pages_from_pool, own_pages); 236 } 237 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 238 239 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 240 { 241 switch (osd_data->type) { 242 case CEPH_OSD_DATA_TYPE_NONE: 243 return 0; 244 case CEPH_OSD_DATA_TYPE_PAGES: 245 return osd_data->length; 246 case CEPH_OSD_DATA_TYPE_PAGELIST: 247 return (u64)osd_data->pagelist->length; 248 #ifdef CONFIG_BLOCK 249 case CEPH_OSD_DATA_TYPE_BIO: 250 return (u64)osd_data->bio_length; 251 #endif /* CONFIG_BLOCK */ 252 default: 253 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 254 return 0; 255 } 256 } 257 258 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 259 { 260 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 261 int num_pages; 262 263 num_pages = calc_pages_for((u64)osd_data->alignment, 264 (u64)osd_data->length); 265 ceph_release_page_vector(osd_data->pages, num_pages); 266 } 267 ceph_osd_data_init(osd_data); 268 } 269 270 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 271 unsigned int which) 272 { 273 struct ceph_osd_req_op *op; 274 275 BUG_ON(which >= osd_req->r_num_ops); 276 op = &osd_req->r_ops[which]; 277 278 switch (op->op) { 279 case CEPH_OSD_OP_READ: 280 case CEPH_OSD_OP_WRITE: 281 case CEPH_OSD_OP_WRITEFULL: 282 ceph_osd_data_release(&op->extent.osd_data); 283 break; 284 case CEPH_OSD_OP_CALL: 285 ceph_osd_data_release(&op->cls.request_info); 286 ceph_osd_data_release(&op->cls.request_data); 287 ceph_osd_data_release(&op->cls.response_data); 288 break; 289 case CEPH_OSD_OP_SETXATTR: 290 case CEPH_OSD_OP_CMPXATTR: 291 ceph_osd_data_release(&op->xattr.osd_data); 292 break; 293 case CEPH_OSD_OP_STAT: 294 ceph_osd_data_release(&op->raw_data_in); 295 break; 296 default: 297 break; 298 } 299 } 300 301 /* 302 * Assumes @t is zero-initialized. 303 */ 304 static void target_init(struct ceph_osd_request_target *t) 305 { 306 ceph_oid_init(&t->base_oid); 307 ceph_oloc_init(&t->base_oloc); 308 ceph_oid_init(&t->target_oid); 309 ceph_oloc_init(&t->target_oloc); 310 311 ceph_osds_init(&t->acting); 312 ceph_osds_init(&t->up); 313 t->size = -1; 314 t->min_size = -1; 315 316 t->osd = CEPH_HOMELESS_OSD; 317 } 318 319 static void target_destroy(struct ceph_osd_request_target *t) 320 { 321 ceph_oid_destroy(&t->base_oid); 322 ceph_oid_destroy(&t->target_oid); 323 } 324 325 /* 326 * requests 327 */ 328 static void ceph_osdc_release_request(struct kref *kref) 329 { 330 struct ceph_osd_request *req = container_of(kref, 331 struct ceph_osd_request, r_kref); 332 unsigned int which; 333 334 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 335 req->r_request, req->r_reply); 336 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 337 WARN_ON(!list_empty(&req->r_req_lru_item)); 338 WARN_ON(!list_empty(&req->r_osd_item)); 339 WARN_ON(!list_empty(&req->r_linger_item)); 340 WARN_ON(!list_empty(&req->r_linger_osd_item)); 341 WARN_ON(req->r_osd); 342 343 if (req->r_request) 344 ceph_msg_put(req->r_request); 345 if (req->r_reply) { 346 ceph_msg_revoke_incoming(req->r_reply); 347 ceph_msg_put(req->r_reply); 348 } 349 350 for (which = 0; which < req->r_num_ops; which++) 351 osd_req_op_data_release(req, which); 352 353 target_destroy(&req->r_t); 354 ceph_put_snap_context(req->r_snapc); 355 356 if (req->r_mempool) 357 mempool_free(req, req->r_osdc->req_mempool); 358 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 359 kmem_cache_free(ceph_osd_request_cache, req); 360 else 361 kfree(req); 362 } 363 364 void ceph_osdc_get_request(struct ceph_osd_request *req) 365 { 366 dout("%s %p (was %d)\n", __func__, req, 367 atomic_read(&req->r_kref.refcount)); 368 kref_get(&req->r_kref); 369 } 370 EXPORT_SYMBOL(ceph_osdc_get_request); 371 372 void ceph_osdc_put_request(struct ceph_osd_request *req) 373 { 374 if (req) { 375 dout("%s %p (was %d)\n", __func__, req, 376 atomic_read(&req->r_kref.refcount)); 377 kref_put(&req->r_kref, ceph_osdc_release_request); 378 } 379 } 380 EXPORT_SYMBOL(ceph_osdc_put_request); 381 382 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 383 struct ceph_snap_context *snapc, 384 unsigned int num_ops, 385 bool use_mempool, 386 gfp_t gfp_flags) 387 { 388 struct ceph_osd_request *req; 389 390 if (use_mempool) { 391 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 392 req = mempool_alloc(osdc->req_mempool, gfp_flags); 393 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 394 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 395 } else { 396 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 397 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]), 398 gfp_flags); 399 } 400 if (unlikely(!req)) 401 return NULL; 402 403 /* req only, each op is zeroed in _osd_req_op_init() */ 404 memset(req, 0, sizeof(*req)); 405 406 req->r_osdc = osdc; 407 req->r_mempool = use_mempool; 408 req->r_num_ops = num_ops; 409 req->r_snapid = CEPH_NOSNAP; 410 req->r_snapc = ceph_get_snap_context(snapc); 411 412 kref_init(&req->r_kref); 413 init_completion(&req->r_completion); 414 init_completion(&req->r_safe_completion); 415 RB_CLEAR_NODE(&req->r_node); 416 INIT_LIST_HEAD(&req->r_unsafe_item); 417 INIT_LIST_HEAD(&req->r_linger_item); 418 INIT_LIST_HEAD(&req->r_linger_osd_item); 419 INIT_LIST_HEAD(&req->r_req_lru_item); 420 INIT_LIST_HEAD(&req->r_osd_item); 421 422 target_init(&req->r_t); 423 424 dout("%s req %p\n", __func__, req); 425 return req; 426 } 427 EXPORT_SYMBOL(ceph_osdc_alloc_request); 428 429 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 430 { 431 struct ceph_osd_client *osdc = req->r_osdc; 432 struct ceph_msg *msg; 433 int msg_size; 434 435 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 436 437 /* create request message */ 438 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ 439 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ 440 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 441 msg_size += 1 + 8 + 4 + 4; /* pgid */ 442 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 443 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 444 msg_size += 8; /* snapid */ 445 msg_size += 8; /* snap_seq */ 446 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 447 msg_size += 4; /* retry_attempt */ 448 449 if (req->r_mempool) 450 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 451 else 452 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); 453 if (!msg) 454 return -ENOMEM; 455 456 memset(msg->front.iov_base, 0, msg->front.iov_len); 457 req->r_request = msg; 458 459 /* create reply message */ 460 msg_size = OSD_OPREPLY_FRONT_LEN; 461 msg_size += req->r_base_oid.name_len; 462 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 463 464 if (req->r_mempool) 465 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 466 else 467 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); 468 if (!msg) 469 return -ENOMEM; 470 471 req->r_reply = msg; 472 473 return 0; 474 } 475 EXPORT_SYMBOL(ceph_osdc_alloc_messages); 476 477 static bool osd_req_opcode_valid(u16 opcode) 478 { 479 switch (opcode) { 480 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 481 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 482 #undef GENERATE_CASE 483 default: 484 return false; 485 } 486 } 487 488 /* 489 * This is an osd op init function for opcodes that have no data or 490 * other information associated with them. It also serves as a 491 * common init routine for all the other init functions, below. 492 */ 493 static struct ceph_osd_req_op * 494 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 495 u16 opcode, u32 flags) 496 { 497 struct ceph_osd_req_op *op; 498 499 BUG_ON(which >= osd_req->r_num_ops); 500 BUG_ON(!osd_req_opcode_valid(opcode)); 501 502 op = &osd_req->r_ops[which]; 503 memset(op, 0, sizeof (*op)); 504 op->op = opcode; 505 op->flags = flags; 506 507 return op; 508 } 509 510 void osd_req_op_init(struct ceph_osd_request *osd_req, 511 unsigned int which, u16 opcode, u32 flags) 512 { 513 (void)_osd_req_op_init(osd_req, which, opcode, flags); 514 } 515 EXPORT_SYMBOL(osd_req_op_init); 516 517 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 518 unsigned int which, u16 opcode, 519 u64 offset, u64 length, 520 u64 truncate_size, u32 truncate_seq) 521 { 522 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 523 opcode, 0); 524 size_t payload_len = 0; 525 526 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 527 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 528 opcode != CEPH_OSD_OP_TRUNCATE); 529 530 op->extent.offset = offset; 531 op->extent.length = length; 532 op->extent.truncate_size = truncate_size; 533 op->extent.truncate_seq = truncate_seq; 534 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 535 payload_len += length; 536 537 op->indata_len = payload_len; 538 } 539 EXPORT_SYMBOL(osd_req_op_extent_init); 540 541 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 542 unsigned int which, u64 length) 543 { 544 struct ceph_osd_req_op *op; 545 u64 previous; 546 547 BUG_ON(which >= osd_req->r_num_ops); 548 op = &osd_req->r_ops[which]; 549 previous = op->extent.length; 550 551 if (length == previous) 552 return; /* Nothing to do */ 553 BUG_ON(length > previous); 554 555 op->extent.length = length; 556 op->indata_len -= previous - length; 557 } 558 EXPORT_SYMBOL(osd_req_op_extent_update); 559 560 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 561 unsigned int which, u64 offset_inc) 562 { 563 struct ceph_osd_req_op *op, *prev_op; 564 565 BUG_ON(which + 1 >= osd_req->r_num_ops); 566 567 prev_op = &osd_req->r_ops[which]; 568 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 569 /* dup previous one */ 570 op->indata_len = prev_op->indata_len; 571 op->outdata_len = prev_op->outdata_len; 572 op->extent = prev_op->extent; 573 /* adjust offset */ 574 op->extent.offset += offset_inc; 575 op->extent.length -= offset_inc; 576 577 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 578 op->indata_len -= offset_inc; 579 } 580 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 581 582 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 583 u16 opcode, const char *class, const char *method) 584 { 585 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 586 opcode, 0); 587 struct ceph_pagelist *pagelist; 588 size_t payload_len = 0; 589 size_t size; 590 591 BUG_ON(opcode != CEPH_OSD_OP_CALL); 592 593 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 594 BUG_ON(!pagelist); 595 ceph_pagelist_init(pagelist); 596 597 op->cls.class_name = class; 598 size = strlen(class); 599 BUG_ON(size > (size_t) U8_MAX); 600 op->cls.class_len = size; 601 ceph_pagelist_append(pagelist, class, size); 602 payload_len += size; 603 604 op->cls.method_name = method; 605 size = strlen(method); 606 BUG_ON(size > (size_t) U8_MAX); 607 op->cls.method_len = size; 608 ceph_pagelist_append(pagelist, method, size); 609 payload_len += size; 610 611 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 612 613 op->cls.argc = 0; /* currently unused */ 614 615 op->indata_len = payload_len; 616 } 617 EXPORT_SYMBOL(osd_req_op_cls_init); 618 619 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 620 u16 opcode, const char *name, const void *value, 621 size_t size, u8 cmp_op, u8 cmp_mode) 622 { 623 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 624 opcode, 0); 625 struct ceph_pagelist *pagelist; 626 size_t payload_len; 627 628 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 629 630 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 631 if (!pagelist) 632 return -ENOMEM; 633 634 ceph_pagelist_init(pagelist); 635 636 payload_len = strlen(name); 637 op->xattr.name_len = payload_len; 638 ceph_pagelist_append(pagelist, name, payload_len); 639 640 op->xattr.value_len = size; 641 ceph_pagelist_append(pagelist, value, size); 642 payload_len += size; 643 644 op->xattr.cmp_op = cmp_op; 645 op->xattr.cmp_mode = cmp_mode; 646 647 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 648 op->indata_len = payload_len; 649 return 0; 650 } 651 EXPORT_SYMBOL(osd_req_op_xattr_init); 652 653 void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 654 unsigned int which, u16 opcode, 655 u64 cookie, u64 version, int flag) 656 { 657 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 658 opcode, 0); 659 660 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 661 662 op->watch.cookie = cookie; 663 op->watch.ver = version; 664 if (opcode == CEPH_OSD_OP_WATCH && flag) 665 op->watch.flag = (u8)1; 666 } 667 EXPORT_SYMBOL(osd_req_op_watch_init); 668 669 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 670 unsigned int which, 671 u64 expected_object_size, 672 u64 expected_write_size) 673 { 674 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 675 CEPH_OSD_OP_SETALLOCHINT, 676 0); 677 678 op->alloc_hint.expected_object_size = expected_object_size; 679 op->alloc_hint.expected_write_size = expected_write_size; 680 681 /* 682 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 683 * not worth a feature bit. Set FAILOK per-op flag to make 684 * sure older osds don't trip over an unsupported opcode. 685 */ 686 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 687 } 688 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 689 690 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 691 struct ceph_osd_data *osd_data) 692 { 693 u64 length = ceph_osd_data_length(osd_data); 694 695 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 696 BUG_ON(length > (u64) SIZE_MAX); 697 if (length) 698 ceph_msg_data_add_pages(msg, osd_data->pages, 699 length, osd_data->alignment); 700 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 701 BUG_ON(!length); 702 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 703 #ifdef CONFIG_BLOCK 704 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 705 ceph_msg_data_add_bio(msg, osd_data->bio, length); 706 #endif 707 } else { 708 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 709 } 710 } 711 712 static u64 osd_req_encode_op(struct ceph_osd_request *req, 713 struct ceph_osd_op *dst, unsigned int which) 714 { 715 struct ceph_osd_req_op *src; 716 struct ceph_osd_data *osd_data; 717 u64 request_data_len = 0; 718 u64 data_length; 719 720 BUG_ON(which >= req->r_num_ops); 721 src = &req->r_ops[which]; 722 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 723 pr_err("unrecognized osd opcode %d\n", src->op); 724 725 return 0; 726 } 727 728 switch (src->op) { 729 case CEPH_OSD_OP_STAT: 730 osd_data = &src->raw_data_in; 731 ceph_osdc_msg_data_add(req->r_reply, osd_data); 732 break; 733 case CEPH_OSD_OP_READ: 734 case CEPH_OSD_OP_WRITE: 735 case CEPH_OSD_OP_WRITEFULL: 736 case CEPH_OSD_OP_ZERO: 737 case CEPH_OSD_OP_TRUNCATE: 738 if (src->op == CEPH_OSD_OP_WRITE || 739 src->op == CEPH_OSD_OP_WRITEFULL) 740 request_data_len = src->extent.length; 741 dst->extent.offset = cpu_to_le64(src->extent.offset); 742 dst->extent.length = cpu_to_le64(src->extent.length); 743 dst->extent.truncate_size = 744 cpu_to_le64(src->extent.truncate_size); 745 dst->extent.truncate_seq = 746 cpu_to_le32(src->extent.truncate_seq); 747 osd_data = &src->extent.osd_data; 748 if (src->op == CEPH_OSD_OP_WRITE || 749 src->op == CEPH_OSD_OP_WRITEFULL) 750 ceph_osdc_msg_data_add(req->r_request, osd_data); 751 else 752 ceph_osdc_msg_data_add(req->r_reply, osd_data); 753 break; 754 case CEPH_OSD_OP_CALL: 755 dst->cls.class_len = src->cls.class_len; 756 dst->cls.method_len = src->cls.method_len; 757 osd_data = &src->cls.request_info; 758 ceph_osdc_msg_data_add(req->r_request, osd_data); 759 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST); 760 request_data_len = osd_data->pagelist->length; 761 762 osd_data = &src->cls.request_data; 763 data_length = ceph_osd_data_length(osd_data); 764 if (data_length) { 765 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); 766 dst->cls.indata_len = cpu_to_le32(data_length); 767 ceph_osdc_msg_data_add(req->r_request, osd_data); 768 src->indata_len += data_length; 769 request_data_len += data_length; 770 } 771 osd_data = &src->cls.response_data; 772 ceph_osdc_msg_data_add(req->r_reply, osd_data); 773 break; 774 case CEPH_OSD_OP_STARTSYNC: 775 break; 776 case CEPH_OSD_OP_NOTIFY_ACK: 777 case CEPH_OSD_OP_WATCH: 778 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 779 dst->watch.ver = cpu_to_le64(src->watch.ver); 780 dst->watch.flag = src->watch.flag; 781 break; 782 case CEPH_OSD_OP_SETALLOCHINT: 783 dst->alloc_hint.expected_object_size = 784 cpu_to_le64(src->alloc_hint.expected_object_size); 785 dst->alloc_hint.expected_write_size = 786 cpu_to_le64(src->alloc_hint.expected_write_size); 787 break; 788 case CEPH_OSD_OP_SETXATTR: 789 case CEPH_OSD_OP_CMPXATTR: 790 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 791 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 792 dst->xattr.cmp_op = src->xattr.cmp_op; 793 dst->xattr.cmp_mode = src->xattr.cmp_mode; 794 osd_data = &src->xattr.osd_data; 795 ceph_osdc_msg_data_add(req->r_request, osd_data); 796 request_data_len = osd_data->pagelist->length; 797 break; 798 case CEPH_OSD_OP_CREATE: 799 case CEPH_OSD_OP_DELETE: 800 break; 801 default: 802 pr_err("unsupported osd opcode %s\n", 803 ceph_osd_op_name(src->op)); 804 WARN_ON(1); 805 806 return 0; 807 } 808 809 dst->op = cpu_to_le16(src->op); 810 dst->flags = cpu_to_le32(src->flags); 811 dst->payload_len = cpu_to_le32(src->indata_len); 812 813 return request_data_len; 814 } 815 816 /* 817 * build new request AND message, calculate layout, and adjust file 818 * extent as needed. 819 * 820 * if the file was recently truncated, we include information about its 821 * old and new size so that the object can be updated appropriately. (we 822 * avoid synchronously deleting truncated objects because it's slow.) 823 * 824 * if @do_sync, include a 'startsync' command so that the osd will flush 825 * data quickly. 826 */ 827 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 828 struct ceph_file_layout *layout, 829 struct ceph_vino vino, 830 u64 off, u64 *plen, 831 unsigned int which, int num_ops, 832 int opcode, int flags, 833 struct ceph_snap_context *snapc, 834 u32 truncate_seq, 835 u64 truncate_size, 836 bool use_mempool) 837 { 838 struct ceph_osd_request *req; 839 u64 objnum = 0; 840 u64 objoff = 0; 841 u64 objlen = 0; 842 int r; 843 844 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 845 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 846 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 847 848 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 849 GFP_NOFS); 850 if (!req) { 851 r = -ENOMEM; 852 goto fail; 853 } 854 855 req->r_flags = flags; 856 857 /* calculate max write size */ 858 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 859 if (r) 860 goto fail; 861 862 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 863 osd_req_op_init(req, which, opcode, 0); 864 } else { 865 u32 object_size = le32_to_cpu(layout->fl_object_size); 866 u32 object_base = off - objoff; 867 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 868 if (truncate_size <= object_base) { 869 truncate_size = 0; 870 } else { 871 truncate_size -= object_base; 872 if (truncate_size > object_size) 873 truncate_size = object_size; 874 } 875 } 876 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 877 truncate_size, truncate_seq); 878 } 879 880 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); 881 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 882 883 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 884 if (r) 885 goto fail; 886 887 return req; 888 889 fail: 890 ceph_osdc_put_request(req); 891 return ERR_PTR(r); 892 } 893 EXPORT_SYMBOL(ceph_osdc_new_request); 894 895 /* 896 * We keep osd requests in an rbtree, sorted by ->r_tid. 897 */ 898 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 899 900 static struct ceph_osd_request * 901 __lookup_request_ge(struct ceph_osd_client *osdc, 902 u64 tid) 903 { 904 struct ceph_osd_request *req; 905 struct rb_node *n = osdc->requests.rb_node; 906 907 while (n) { 908 req = rb_entry(n, struct ceph_osd_request, r_node); 909 if (tid < req->r_tid) { 910 if (!n->rb_left) 911 return req; 912 n = n->rb_left; 913 } else if (tid > req->r_tid) { 914 n = n->rb_right; 915 } else { 916 return req; 917 } 918 } 919 return NULL; 920 } 921 922 static void __kick_linger_request(struct ceph_osd_request *req) 923 { 924 struct ceph_osd_client *osdc = req->r_osdc; 925 struct ceph_osd *osd = req->r_osd; 926 927 /* 928 * Linger requests need to be resent with a new tid to avoid 929 * the dup op detection logic on the OSDs. Achieve this with 930 * a re-register dance instead of open-coding. 931 */ 932 ceph_osdc_get_request(req); 933 if (!list_empty(&req->r_linger_item)) 934 __unregister_linger_request(osdc, req); 935 else 936 __unregister_request(osdc, req); 937 __register_request(osdc, req); 938 ceph_osdc_put_request(req); 939 940 /* 941 * Unless request has been registered as both normal and 942 * lingering, __unregister{,_linger}_request clears r_osd. 943 * However, here we need to preserve r_osd to make sure we 944 * requeue on the same OSD. 945 */ 946 WARN_ON(req->r_osd || !osd); 947 req->r_osd = osd; 948 949 dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid); 950 __enqueue_request(req); 951 } 952 953 /* 954 * Resubmit requests pending on the given osd. 955 */ 956 static void __kick_osd_requests(struct ceph_osd_client *osdc, 957 struct ceph_osd *osd) 958 { 959 struct ceph_osd_request *req, *nreq; 960 LIST_HEAD(resend); 961 LIST_HEAD(resend_linger); 962 int err; 963 964 dout("%s osd%d\n", __func__, osd->o_osd); 965 err = __reset_osd(osdc, osd); 966 if (err) 967 return; 968 969 /* 970 * Build up a list of requests to resend by traversing the 971 * osd's list of requests. Requests for a given object are 972 * sent in tid order, and that is also the order they're 973 * kept on this list. Therefore all requests that are in 974 * flight will be found first, followed by all requests that 975 * have not yet been sent. And to resend requests while 976 * preserving this order we will want to put any sent 977 * requests back on the front of the osd client's unsent 978 * list. 979 * 980 * So we build a separate ordered list of already-sent 981 * requests for the affected osd and splice it onto the 982 * front of the osd client's unsent list. Once we've seen a 983 * request that has not yet been sent we're done. Those 984 * requests are already sitting right where they belong. 985 */ 986 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 987 if (!req->r_sent) 988 break; 989 990 if (!req->r_linger) { 991 dout("%s requeueing %p tid %llu\n", __func__, req, 992 req->r_tid); 993 list_move_tail(&req->r_req_lru_item, &resend); 994 req->r_flags |= CEPH_OSD_FLAG_RETRY; 995 } else { 996 list_move_tail(&req->r_req_lru_item, &resend_linger); 997 } 998 } 999 list_splice(&resend, &osdc->req_unsent); 1000 1001 /* 1002 * Both registered and not yet registered linger requests are 1003 * enqueued with a new tid on the same OSD. We add/move them 1004 * to req_unsent/o_requests at the end to keep things in tid 1005 * order. 1006 */ 1007 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 1008 r_linger_osd_item) { 1009 WARN_ON(!list_empty(&req->r_req_lru_item)); 1010 __kick_linger_request(req); 1011 } 1012 1013 list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item) 1014 __kick_linger_request(req); 1015 } 1016 1017 /* 1018 * If the osd connection drops, we need to resubmit all requests. 1019 */ 1020 static void osd_reset(struct ceph_connection *con) 1021 { 1022 struct ceph_osd *osd = con->private; 1023 struct ceph_osd_client *osdc; 1024 1025 if (!osd) 1026 return; 1027 dout("osd_reset osd%d\n", osd->o_osd); 1028 osdc = osd->o_osdc; 1029 down_read(&osdc->map_sem); 1030 mutex_lock(&osdc->request_mutex); 1031 __kick_osd_requests(osdc, osd); 1032 __send_queued(osdc); 1033 mutex_unlock(&osdc->request_mutex); 1034 up_read(&osdc->map_sem); 1035 } 1036 1037 /* 1038 * Track open sessions with osds. 1039 */ 1040 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1041 { 1042 struct ceph_osd *osd; 1043 1044 osd = kzalloc(sizeof(*osd), GFP_NOFS); 1045 if (!osd) 1046 return NULL; 1047 1048 atomic_set(&osd->o_ref, 1); 1049 osd->o_osdc = osdc; 1050 osd->o_osd = onum; 1051 RB_CLEAR_NODE(&osd->o_node); 1052 INIT_LIST_HEAD(&osd->o_requests); 1053 INIT_LIST_HEAD(&osd->o_linger_requests); 1054 INIT_LIST_HEAD(&osd->o_osd_lru); 1055 osd->o_incarnation = 1; 1056 1057 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1058 1059 INIT_LIST_HEAD(&osd->o_keepalive_item); 1060 return osd; 1061 } 1062 1063 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1064 { 1065 if (atomic_inc_not_zero(&osd->o_ref)) { 1066 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 1067 atomic_read(&osd->o_ref)); 1068 return osd; 1069 } else { 1070 dout("get_osd %p FAIL\n", osd); 1071 return NULL; 1072 } 1073 } 1074 1075 static void put_osd(struct ceph_osd *osd) 1076 { 1077 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 1078 atomic_read(&osd->o_ref) - 1); 1079 if (atomic_dec_and_test(&osd->o_ref)) { 1080 if (osd->o_auth.authorizer) 1081 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1082 kfree(osd); 1083 } 1084 } 1085 1086 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) 1087 1088 /* 1089 * remove an osd from our map 1090 */ 1091 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1092 { 1093 dout("%s %p osd%d\n", __func__, osd, osd->o_osd); 1094 WARN_ON(!list_empty(&osd->o_requests)); 1095 WARN_ON(!list_empty(&osd->o_linger_requests)); 1096 1097 list_del_init(&osd->o_osd_lru); 1098 erase_osd(&osdc->osds, osd); 1099 } 1100 1101 static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1102 { 1103 dout("%s %p osd%d\n", __func__, osd, osd->o_osd); 1104 1105 if (!RB_EMPTY_NODE(&osd->o_node)) { 1106 ceph_con_close(&osd->o_con); 1107 __remove_osd(osdc, osd); 1108 put_osd(osd); 1109 } 1110 } 1111 1112 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 1113 struct ceph_osd *osd) 1114 { 1115 dout("%s %p\n", __func__, osd); 1116 BUG_ON(!list_empty(&osd->o_osd_lru)); 1117 1118 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1119 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1120 } 1121 1122 static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, 1123 struct ceph_osd *osd) 1124 { 1125 dout("%s %p\n", __func__, osd); 1126 1127 if (list_empty(&osd->o_requests) && 1128 list_empty(&osd->o_linger_requests)) 1129 __move_osd_to_lru(osdc, osd); 1130 } 1131 1132 static void __remove_osd_from_lru(struct ceph_osd *osd) 1133 { 1134 dout("__remove_osd_from_lru %p\n", osd); 1135 if (!list_empty(&osd->o_osd_lru)) 1136 list_del_init(&osd->o_osd_lru); 1137 } 1138 1139 /* 1140 * reset osd connect 1141 */ 1142 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1143 { 1144 struct ceph_entity_addr *peer_addr; 1145 1146 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 1147 if (list_empty(&osd->o_requests) && 1148 list_empty(&osd->o_linger_requests)) { 1149 remove_osd(osdc, osd); 1150 return -ENODEV; 1151 } 1152 1153 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 1154 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1155 !ceph_con_opened(&osd->o_con)) { 1156 struct ceph_osd_request *req; 1157 1158 dout("osd addr hasn't changed and connection never opened, " 1159 "letting msgr retry\n"); 1160 /* touch each r_stamp for handle_timeout()'s benfit */ 1161 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1162 req->r_stamp = jiffies; 1163 1164 return -EAGAIN; 1165 } 1166 1167 ceph_con_close(&osd->o_con); 1168 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1169 osd->o_incarnation++; 1170 1171 return 0; 1172 } 1173 1174 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 1175 { 1176 schedule_delayed_work(&osdc->timeout_work, 1177 osdc->client->options->osd_keepalive_timeout); 1178 } 1179 1180 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 1181 { 1182 cancel_delayed_work(&osdc->timeout_work); 1183 } 1184 1185 /* 1186 * Register request, assign tid. If this is the first request, set up 1187 * the timeout event. 1188 */ 1189 static void __register_request(struct ceph_osd_client *osdc, 1190 struct ceph_osd_request *req) 1191 { 1192 req->r_tid = ++osdc->last_tid; 1193 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 1194 dout("__register_request %p tid %lld\n", req, req->r_tid); 1195 insert_request(&osdc->requests, req); 1196 ceph_osdc_get_request(req); 1197 osdc->num_requests++; 1198 if (osdc->num_requests == 1) { 1199 dout(" first request, scheduling timeout\n"); 1200 __schedule_osd_timeout(osdc); 1201 } 1202 } 1203 1204 /* 1205 * called under osdc->request_mutex 1206 */ 1207 static void __unregister_request(struct ceph_osd_client *osdc, 1208 struct ceph_osd_request *req) 1209 { 1210 if (RB_EMPTY_NODE(&req->r_node)) { 1211 dout("__unregister_request %p tid %lld not registered\n", 1212 req, req->r_tid); 1213 return; 1214 } 1215 1216 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1217 erase_request(&osdc->requests, req); 1218 osdc->num_requests--; 1219 1220 if (req->r_osd) { 1221 /* make sure the original request isn't in flight. */ 1222 ceph_msg_revoke(req->r_request); 1223 1224 list_del_init(&req->r_osd_item); 1225 maybe_move_osd_to_lru(osdc, req->r_osd); 1226 if (list_empty(&req->r_linger_osd_item)) 1227 req->r_osd = NULL; 1228 } 1229 1230 list_del_init(&req->r_req_lru_item); 1231 ceph_osdc_put_request(req); 1232 1233 if (osdc->num_requests == 0) { 1234 dout(" no requests, canceling timeout\n"); 1235 __cancel_osd_timeout(osdc); 1236 } 1237 } 1238 1239 /* 1240 * Cancel a previously queued request message 1241 */ 1242 static void __cancel_request(struct ceph_osd_request *req) 1243 { 1244 if (req->r_sent && req->r_osd) { 1245 ceph_msg_revoke(req->r_request); 1246 req->r_sent = 0; 1247 } 1248 } 1249 1250 static void __register_linger_request(struct ceph_osd_client *osdc, 1251 struct ceph_osd_request *req) 1252 { 1253 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 1254 WARN_ON(!req->r_linger); 1255 1256 ceph_osdc_get_request(req); 1257 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1258 if (req->r_osd) 1259 list_add_tail(&req->r_linger_osd_item, 1260 &req->r_osd->o_linger_requests); 1261 } 1262 1263 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1264 struct ceph_osd_request *req) 1265 { 1266 WARN_ON(!req->r_linger); 1267 1268 if (list_empty(&req->r_linger_item)) { 1269 dout("%s %p tid %llu not registered\n", __func__, req, 1270 req->r_tid); 1271 return; 1272 } 1273 1274 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 1275 list_del_init(&req->r_linger_item); 1276 1277 if (req->r_osd) { 1278 list_del_init(&req->r_linger_osd_item); 1279 maybe_move_osd_to_lru(osdc, req->r_osd); 1280 if (list_empty(&req->r_osd_item)) 1281 req->r_osd = NULL; 1282 } 1283 ceph_osdc_put_request(req); 1284 } 1285 1286 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1287 struct ceph_osd_request *req) 1288 { 1289 if (!req->r_linger) { 1290 dout("set_request_linger %p\n", req); 1291 req->r_linger = 1; 1292 } 1293 } 1294 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1295 1296 static bool __pool_full(struct ceph_pg_pool_info *pi) 1297 { 1298 return pi->flags & CEPH_POOL_FLAG_FULL; 1299 } 1300 1301 /* 1302 * Returns whether a request should be blocked from being sent 1303 * based on the current osdmap and osd_client settings. 1304 * 1305 * Caller should hold map_sem for read. 1306 */ 1307 static bool target_should_be_paused(struct ceph_osd_client *osdc, 1308 const struct ceph_osd_request_target *t, 1309 struct ceph_pg_pool_info *pi) 1310 { 1311 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); 1312 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || 1313 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || 1314 __pool_full(pi); 1315 1316 WARN_ON(pi->id != t->base_oloc.pool); 1317 return (t->flags & CEPH_OSD_FLAG_READ && pauserd) || 1318 (t->flags & CEPH_OSD_FLAG_WRITE && pausewr); 1319 } 1320 1321 enum calc_target_result { 1322 CALC_TARGET_NO_ACTION = 0, 1323 CALC_TARGET_NEED_RESEND, 1324 CALC_TARGET_POOL_DNE, 1325 }; 1326 1327 static enum calc_target_result calc_target(struct ceph_osd_client *osdc, 1328 struct ceph_osd_request_target *t, 1329 u32 *last_force_resend, 1330 bool any_change) 1331 { 1332 struct ceph_pg_pool_info *pi; 1333 struct ceph_pg pgid, last_pgid; 1334 struct ceph_osds up, acting; 1335 bool force_resend = false; 1336 bool need_check_tiering = false; 1337 bool need_resend = false; 1338 bool sort_bitwise = ceph_osdmap_flag(osdc->osdmap, 1339 CEPH_OSDMAP_SORTBITWISE); 1340 enum calc_target_result ct_res; 1341 int ret; 1342 1343 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1344 if (!pi) { 1345 t->osd = CEPH_HOMELESS_OSD; 1346 ct_res = CALC_TARGET_POOL_DNE; 1347 goto out; 1348 } 1349 1350 if (osdc->osdmap->epoch == pi->last_force_request_resend) { 1351 if (last_force_resend && 1352 *last_force_resend < pi->last_force_request_resend) { 1353 *last_force_resend = pi->last_force_request_resend; 1354 force_resend = true; 1355 } else if (!last_force_resend) { 1356 force_resend = true; 1357 } 1358 } 1359 if (ceph_oid_empty(&t->target_oid) || force_resend) { 1360 ceph_oid_copy(&t->target_oid, &t->base_oid); 1361 need_check_tiering = true; 1362 } 1363 if (ceph_oloc_empty(&t->target_oloc) || force_resend) { 1364 ceph_oloc_copy(&t->target_oloc, &t->base_oloc); 1365 need_check_tiering = true; 1366 } 1367 1368 if (need_check_tiering && 1369 (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1370 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0) 1371 t->target_oloc.pool = pi->read_tier; 1372 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0) 1373 t->target_oloc.pool = pi->write_tier; 1374 } 1375 1376 ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid, 1377 &t->target_oloc, &pgid); 1378 if (ret) { 1379 WARN_ON(ret != -ENOENT); 1380 t->osd = CEPH_HOMELESS_OSD; 1381 ct_res = CALC_TARGET_POOL_DNE; 1382 goto out; 1383 } 1384 last_pgid.pool = pgid.pool; 1385 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1386 1387 ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting); 1388 if (any_change && 1389 ceph_is_new_interval(&t->acting, 1390 &acting, 1391 &t->up, 1392 &up, 1393 t->size, 1394 pi->size, 1395 t->min_size, 1396 pi->min_size, 1397 t->pg_num, 1398 pi->pg_num, 1399 t->sort_bitwise, 1400 sort_bitwise, 1401 &last_pgid)) 1402 force_resend = true; 1403 1404 if (t->paused && !target_should_be_paused(osdc, t, pi)) { 1405 t->paused = false; 1406 need_resend = true; 1407 } 1408 1409 if (ceph_pg_compare(&t->pgid, &pgid) || 1410 ceph_osds_changed(&t->acting, &acting, any_change) || 1411 force_resend) { 1412 t->pgid = pgid; /* struct */ 1413 ceph_osds_copy(&t->acting, &acting); 1414 ceph_osds_copy(&t->up, &up); 1415 t->size = pi->size; 1416 t->min_size = pi->min_size; 1417 t->pg_num = pi->pg_num; 1418 t->pg_num_mask = pi->pg_num_mask; 1419 t->sort_bitwise = sort_bitwise; 1420 1421 t->osd = acting.primary; 1422 need_resend = true; 1423 } 1424 1425 ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION; 1426 out: 1427 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd); 1428 return ct_res; 1429 } 1430 1431 static void __enqueue_request(struct ceph_osd_request *req) 1432 { 1433 struct ceph_osd_client *osdc = req->r_osdc; 1434 1435 dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid, 1436 req->r_osd ? req->r_osd->o_osd : -1); 1437 1438 if (req->r_osd) { 1439 __remove_osd_from_lru(req->r_osd); 1440 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1441 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1442 } else { 1443 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1444 } 1445 } 1446 1447 /* 1448 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1449 * (as needed), and set the request r_osd appropriately. If there is 1450 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1451 * (unsent, homeless) or leave on in-flight lru. 1452 * 1453 * Return 0 if unchanged, 1 if changed, or negative on error. 1454 * 1455 * Caller should hold map_sem for read and request_mutex. 1456 */ 1457 static int __map_request(struct ceph_osd_client *osdc, 1458 struct ceph_osd_request *req, int force_resend) 1459 { 1460 enum calc_target_result ct_res; 1461 int err; 1462 1463 dout("map_request %p tid %lld\n", req, req->r_tid); 1464 1465 ct_res = calc_target(osdc, &req->r_t, NULL, force_resend); 1466 switch (ct_res) { 1467 case CALC_TARGET_POOL_DNE: 1468 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1469 return -EIO; 1470 case CALC_TARGET_NO_ACTION: 1471 return 0; /* no change */ 1472 default: 1473 BUG_ON(ct_res != CALC_TARGET_NEED_RESEND); 1474 } 1475 1476 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1477 req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd, 1478 req->r_osd ? req->r_osd->o_osd : -1); 1479 1480 if (req->r_osd) { 1481 __cancel_request(req); 1482 list_del_init(&req->r_osd_item); 1483 list_del_init(&req->r_linger_osd_item); 1484 req->r_osd = NULL; 1485 } 1486 1487 req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd); 1488 if (!req->r_osd && req->r_t.osd >= 0) { 1489 err = -ENOMEM; 1490 req->r_osd = create_osd(osdc, req->r_t.osd); 1491 if (!req->r_osd) { 1492 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1493 goto out; 1494 } 1495 1496 dout("map_request osd %p is osd%d\n", req->r_osd, 1497 req->r_osd->o_osd); 1498 insert_osd(&osdc->osds, req->r_osd); 1499 1500 ceph_con_open(&req->r_osd->o_con, 1501 CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd, 1502 &osdc->osdmap->osd_addr[req->r_osd->o_osd]); 1503 } 1504 1505 __enqueue_request(req); 1506 err = 1; /* osd or pg changed */ 1507 1508 out: 1509 return err; 1510 } 1511 1512 /* 1513 * caller should hold map_sem (for read) and request_mutex 1514 */ 1515 static void __send_request(struct ceph_osd_client *osdc, 1516 struct ceph_osd_request *req) 1517 { 1518 void *p; 1519 1520 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1521 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1522 req->r_t.pgid.pool, req->r_t.pgid.seed); 1523 1524 /* fill in message content that changes each time we send it */ 1525 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1526 put_unaligned_le32(req->r_flags, req->r_request_flags); 1527 put_unaligned_le64(req->r_t.target_oloc.pool, req->r_request_pool); 1528 p = req->r_request_pgid; 1529 ceph_encode_64(&p, req->r_t.pgid.pool); 1530 ceph_encode_32(&p, req->r_t.pgid.seed); 1531 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1532 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1533 sizeof(req->r_reassert_version)); 1534 1535 req->r_stamp = jiffies; 1536 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1537 1538 ceph_msg_get(req->r_request); /* send consumes a ref */ 1539 1540 req->r_sent = req->r_osd->o_incarnation; 1541 1542 ceph_con_send(&req->r_osd->o_con, req->r_request); 1543 } 1544 1545 /* 1546 * Send any requests in the queue (req_unsent). 1547 */ 1548 static void __send_queued(struct ceph_osd_client *osdc) 1549 { 1550 struct ceph_osd_request *req, *tmp; 1551 1552 dout("__send_queued\n"); 1553 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1554 __send_request(osdc, req); 1555 } 1556 1557 /* 1558 * Caller should hold map_sem for read and request_mutex. 1559 */ 1560 static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, 1561 struct ceph_osd_request *req, 1562 bool nofail) 1563 { 1564 int rc; 1565 1566 __register_request(osdc, req); 1567 req->r_sent = 0; 1568 req->r_got_reply = 0; 1569 rc = __map_request(osdc, req, 0); 1570 if (rc < 0) { 1571 if (nofail) { 1572 dout("osdc_start_request failed map, " 1573 " will retry %lld\n", req->r_tid); 1574 rc = 0; 1575 } else { 1576 __unregister_request(osdc, req); 1577 } 1578 return rc; 1579 } 1580 1581 if (req->r_osd == NULL) { 1582 dout("send_request %p no up osds in pg\n", req); 1583 ceph_monc_request_next_osdmap(&osdc->client->monc); 1584 } else { 1585 __send_queued(osdc); 1586 } 1587 1588 return 0; 1589 } 1590 1591 /* 1592 * Timeout callback, called every N seconds when 1 or more osd 1593 * requests has been active for more than N seconds. When this 1594 * happens, we ping all OSDs with requests who have timed out to 1595 * ensure any communications channel reset is detected. Reset the 1596 * request timeouts another N seconds in the future as we go. 1597 * Reschedule the timeout event another N seconds in future (unless 1598 * there are no open requests). 1599 */ 1600 static void handle_timeout(struct work_struct *work) 1601 { 1602 struct ceph_osd_client *osdc = 1603 container_of(work, struct ceph_osd_client, timeout_work.work); 1604 struct ceph_options *opts = osdc->client->options; 1605 struct ceph_osd_request *req; 1606 struct ceph_osd *osd; 1607 struct list_head slow_osds; 1608 dout("timeout\n"); 1609 down_read(&osdc->map_sem); 1610 1611 ceph_monc_request_next_osdmap(&osdc->client->monc); 1612 1613 mutex_lock(&osdc->request_mutex); 1614 1615 /* 1616 * ping osds that are a bit slow. this ensures that if there 1617 * is a break in the TCP connection we will notice, and reopen 1618 * a connection with that osd (from the fault callback). 1619 */ 1620 INIT_LIST_HEAD(&slow_osds); 1621 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1622 if (time_before(jiffies, 1623 req->r_stamp + opts->osd_keepalive_timeout)) 1624 break; 1625 1626 osd = req->r_osd; 1627 BUG_ON(!osd); 1628 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1629 req->r_tid, osd->o_osd); 1630 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1631 } 1632 while (!list_empty(&slow_osds)) { 1633 osd = list_entry(slow_osds.next, struct ceph_osd, 1634 o_keepalive_item); 1635 list_del_init(&osd->o_keepalive_item); 1636 ceph_con_keepalive(&osd->o_con); 1637 } 1638 1639 __schedule_osd_timeout(osdc); 1640 __send_queued(osdc); 1641 mutex_unlock(&osdc->request_mutex); 1642 up_read(&osdc->map_sem); 1643 } 1644 1645 static void handle_osds_timeout(struct work_struct *work) 1646 { 1647 struct ceph_osd_client *osdc = 1648 container_of(work, struct ceph_osd_client, 1649 osds_timeout_work.work); 1650 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 1651 struct ceph_osd *osd, *nosd; 1652 1653 dout("%s osdc %p\n", __func__, osdc); 1654 down_read(&osdc->map_sem); 1655 mutex_lock(&osdc->request_mutex); 1656 1657 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 1658 if (time_before(jiffies, osd->lru_ttl)) 1659 break; 1660 1661 remove_osd(osdc, osd); 1662 } 1663 1664 mutex_unlock(&osdc->request_mutex); 1665 up_read(&osdc->map_sem); 1666 schedule_delayed_work(&osdc->osds_timeout_work, 1667 round_jiffies_relative(delay)); 1668 } 1669 1670 static int ceph_oloc_decode(void **p, void *end, 1671 struct ceph_object_locator *oloc) 1672 { 1673 u8 struct_v, struct_cv; 1674 u32 len; 1675 void *struct_end; 1676 int ret = 0; 1677 1678 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1679 struct_v = ceph_decode_8(p); 1680 struct_cv = ceph_decode_8(p); 1681 if (struct_v < 3) { 1682 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 1683 struct_v, struct_cv); 1684 goto e_inval; 1685 } 1686 if (struct_cv > 6) { 1687 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 1688 struct_v, struct_cv); 1689 goto e_inval; 1690 } 1691 len = ceph_decode_32(p); 1692 ceph_decode_need(p, end, len, e_inval); 1693 struct_end = *p + len; 1694 1695 oloc->pool = ceph_decode_64(p); 1696 *p += 4; /* skip preferred */ 1697 1698 len = ceph_decode_32(p); 1699 if (len > 0) { 1700 pr_warn("ceph_object_locator::key is set\n"); 1701 goto e_inval; 1702 } 1703 1704 if (struct_v >= 5) { 1705 len = ceph_decode_32(p); 1706 if (len > 0) { 1707 pr_warn("ceph_object_locator::nspace is set\n"); 1708 goto e_inval; 1709 } 1710 } 1711 1712 if (struct_v >= 6) { 1713 s64 hash = ceph_decode_64(p); 1714 if (hash != -1) { 1715 pr_warn("ceph_object_locator::hash is set\n"); 1716 goto e_inval; 1717 } 1718 } 1719 1720 /* skip the rest */ 1721 *p = struct_end; 1722 out: 1723 return ret; 1724 1725 e_inval: 1726 ret = -EINVAL; 1727 goto out; 1728 } 1729 1730 static int ceph_redirect_decode(void **p, void *end, 1731 struct ceph_request_redirect *redir) 1732 { 1733 u8 struct_v, struct_cv; 1734 u32 len; 1735 void *struct_end; 1736 int ret; 1737 1738 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1739 struct_v = ceph_decode_8(p); 1740 struct_cv = ceph_decode_8(p); 1741 if (struct_cv > 1) { 1742 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 1743 struct_v, struct_cv); 1744 goto e_inval; 1745 } 1746 len = ceph_decode_32(p); 1747 ceph_decode_need(p, end, len, e_inval); 1748 struct_end = *p + len; 1749 1750 ret = ceph_oloc_decode(p, end, &redir->oloc); 1751 if (ret) 1752 goto out; 1753 1754 len = ceph_decode_32(p); 1755 if (len > 0) { 1756 pr_warn("ceph_request_redirect::object_name is set\n"); 1757 goto e_inval; 1758 } 1759 1760 len = ceph_decode_32(p); 1761 *p += len; /* skip osd_instructions */ 1762 1763 /* skip the rest */ 1764 *p = struct_end; 1765 out: 1766 return ret; 1767 1768 e_inval: 1769 ret = -EINVAL; 1770 goto out; 1771 } 1772 1773 static void complete_request(struct ceph_osd_request *req) 1774 { 1775 complete_all(&req->r_safe_completion); /* fsync waiter */ 1776 } 1777 1778 /* 1779 * handle osd op reply. either call the callback if it is specified, 1780 * or do the completion to wake up the waiting thread. 1781 */ 1782 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1783 { 1784 void *p, *end; 1785 struct ceph_osd_request *req; 1786 struct ceph_request_redirect redir; 1787 u64 tid; 1788 int object_len; 1789 unsigned int numops; 1790 int payload_len, flags; 1791 s32 result; 1792 s32 retry_attempt; 1793 struct ceph_pg pg; 1794 int err; 1795 u32 reassert_epoch; 1796 u64 reassert_version; 1797 u32 osdmap_epoch; 1798 int already_completed; 1799 u32 bytes; 1800 u8 decode_redir; 1801 unsigned int i; 1802 1803 tid = le64_to_cpu(msg->hdr.tid); 1804 dout("handle_reply %p tid %llu\n", msg, tid); 1805 1806 p = msg->front.iov_base; 1807 end = p + msg->front.iov_len; 1808 1809 ceph_decode_need(&p, end, 4, bad); 1810 object_len = ceph_decode_32(&p); 1811 ceph_decode_need(&p, end, object_len, bad); 1812 p += object_len; 1813 1814 err = ceph_decode_pgid(&p, end, &pg); 1815 if (err) 1816 goto bad; 1817 1818 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1819 flags = ceph_decode_64(&p); 1820 result = ceph_decode_32(&p); 1821 reassert_epoch = ceph_decode_32(&p); 1822 reassert_version = ceph_decode_64(&p); 1823 osdmap_epoch = ceph_decode_32(&p); 1824 1825 /* lookup */ 1826 down_read(&osdc->map_sem); 1827 mutex_lock(&osdc->request_mutex); 1828 req = lookup_request(&osdc->requests, tid); 1829 if (req == NULL) { 1830 dout("handle_reply tid %llu dne\n", tid); 1831 goto bad_mutex; 1832 } 1833 ceph_osdc_get_request(req); 1834 1835 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1836 req, result); 1837 1838 ceph_decode_need(&p, end, 4, bad_put); 1839 numops = ceph_decode_32(&p); 1840 if (numops > CEPH_OSD_MAX_OPS) 1841 goto bad_put; 1842 if (numops != req->r_num_ops) 1843 goto bad_put; 1844 payload_len = 0; 1845 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); 1846 for (i = 0; i < numops; i++) { 1847 struct ceph_osd_op *op = p; 1848 int len; 1849 1850 len = le32_to_cpu(op->payload_len); 1851 req->r_ops[i].outdata_len = len; 1852 dout(" op %d has %d bytes\n", i, len); 1853 payload_len += len; 1854 p += sizeof(*op); 1855 } 1856 bytes = le32_to_cpu(msg->hdr.data_len); 1857 if (payload_len != bytes) { 1858 pr_warn("sum of op payload lens %d != data_len %d\n", 1859 payload_len, bytes); 1860 goto bad_put; 1861 } 1862 1863 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1864 retry_attempt = ceph_decode_32(&p); 1865 for (i = 0; i < numops; i++) 1866 req->r_ops[i].rval = ceph_decode_32(&p); 1867 1868 if (le16_to_cpu(msg->hdr.version) >= 6) { 1869 p += 8 + 4; /* skip replay_version */ 1870 p += 8; /* skip user_version */ 1871 1872 if (le16_to_cpu(msg->hdr.version) >= 7) 1873 ceph_decode_8_safe(&p, end, decode_redir, bad_put); 1874 else 1875 decode_redir = 1; 1876 } else { 1877 decode_redir = 0; 1878 } 1879 1880 if (decode_redir) { 1881 err = ceph_redirect_decode(&p, end, &redir); 1882 if (err) 1883 goto bad_put; 1884 } else { 1885 redir.oloc.pool = -1; 1886 } 1887 1888 if (!ceph_oloc_empty(&redir.oloc)) { 1889 dout("redirect pool %lld\n", redir.oloc.pool); 1890 1891 __unregister_request(osdc, req); 1892 1893 ceph_oloc_copy(&req->r_t.target_oloc, &redir.oloc); 1894 1895 /* 1896 * Start redirect requests with nofail=true. If 1897 * mapping fails, request will end up on the notarget 1898 * list, waiting for the new osdmap (which can take 1899 * a while), even though the original request mapped 1900 * successfully. In the future we might want to follow 1901 * original request's nofail setting here. 1902 */ 1903 err = __ceph_osdc_start_request(osdc, req, true); 1904 BUG_ON(err); 1905 1906 goto out_unlock; 1907 } 1908 1909 already_completed = req->r_got_reply; 1910 if (!req->r_got_reply) { 1911 req->r_result = result; 1912 dout("handle_reply result %d bytes %d\n", req->r_result, 1913 bytes); 1914 if (req->r_result == 0) 1915 req->r_result = bytes; 1916 1917 /* in case this is a write and we need to replay, */ 1918 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1919 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1920 1921 req->r_got_reply = 1; 1922 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1923 dout("handle_reply tid %llu dup ack\n", tid); 1924 goto out_unlock; 1925 } 1926 1927 dout("handle_reply tid %llu flags %d\n", tid, flags); 1928 1929 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1930 __register_linger_request(osdc, req); 1931 1932 /* either this is a read, or we got the safe response */ 1933 if (result < 0 || 1934 (flags & CEPH_OSD_FLAG_ONDISK) || 1935 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1936 __unregister_request(osdc, req); 1937 1938 mutex_unlock(&osdc->request_mutex); 1939 up_read(&osdc->map_sem); 1940 1941 if (!already_completed) { 1942 if (req->r_unsafe_callback && 1943 result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) 1944 req->r_unsafe_callback(req, true); 1945 if (req->r_callback) 1946 req->r_callback(req, msg); 1947 else 1948 complete_all(&req->r_completion); 1949 } 1950 1951 if (flags & CEPH_OSD_FLAG_ONDISK) { 1952 if (req->r_unsafe_callback && already_completed) 1953 req->r_unsafe_callback(req, false); 1954 complete_request(req); 1955 } 1956 1957 out: 1958 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1959 ceph_osdc_put_request(req); 1960 return; 1961 out_unlock: 1962 mutex_unlock(&osdc->request_mutex); 1963 up_read(&osdc->map_sem); 1964 goto out; 1965 1966 bad_put: 1967 req->r_result = -EIO; 1968 __unregister_request(osdc, req); 1969 if (req->r_callback) 1970 req->r_callback(req, msg); 1971 else 1972 complete_all(&req->r_completion); 1973 complete_request(req); 1974 ceph_osdc_put_request(req); 1975 bad_mutex: 1976 mutex_unlock(&osdc->request_mutex); 1977 up_read(&osdc->map_sem); 1978 bad: 1979 pr_err("corrupt osd_op_reply got %d %d\n", 1980 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1981 ceph_msg_dump(msg); 1982 } 1983 1984 static void reset_changed_osds(struct ceph_osd_client *osdc) 1985 { 1986 struct rb_node *p, *n; 1987 1988 dout("%s %p\n", __func__, osdc); 1989 for (p = rb_first(&osdc->osds); p; p = n) { 1990 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1991 1992 n = rb_next(p); 1993 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1994 memcmp(&osd->o_con.peer_addr, 1995 ceph_osd_addr(osdc->osdmap, 1996 osd->o_osd), 1997 sizeof(struct ceph_entity_addr)) != 0) 1998 __reset_osd(osdc, osd); 1999 } 2000 } 2001 2002 /* 2003 * Requeue requests whose mapping to an OSD has changed. If requests map to 2004 * no osd, request a new map. 2005 * 2006 * Caller should hold map_sem for read. 2007 */ 2008 static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, 2009 bool force_resend_writes) 2010 { 2011 struct ceph_osd_request *req, *nreq; 2012 struct rb_node *p; 2013 int needmap = 0; 2014 int err; 2015 bool force_resend_req; 2016 2017 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", 2018 force_resend_writes ? " (force resend writes)" : ""); 2019 mutex_lock(&osdc->request_mutex); 2020 for (p = rb_first(&osdc->requests); p; ) { 2021 req = rb_entry(p, struct ceph_osd_request, r_node); 2022 p = rb_next(p); 2023 2024 /* 2025 * For linger requests that have not yet been 2026 * registered, move them to the linger list; they'll 2027 * be sent to the osd in the loop below. Unregister 2028 * the request before re-registering it as a linger 2029 * request to ensure the __map_request() below 2030 * will decide it needs to be sent. 2031 */ 2032 if (req->r_linger && list_empty(&req->r_linger_item)) { 2033 dout("%p tid %llu restart on osd%d\n", 2034 req, req->r_tid, 2035 req->r_osd ? req->r_osd->o_osd : -1); 2036 ceph_osdc_get_request(req); 2037 __unregister_request(osdc, req); 2038 __register_linger_request(osdc, req); 2039 ceph_osdc_put_request(req); 2040 continue; 2041 } 2042 2043 force_resend_req = force_resend || 2044 (force_resend_writes && 2045 req->r_flags & CEPH_OSD_FLAG_WRITE); 2046 err = __map_request(osdc, req, force_resend_req); 2047 if (err < 0) 2048 continue; /* error */ 2049 if (req->r_osd == NULL) { 2050 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 2051 needmap++; /* request a newer map */ 2052 } else if (err > 0) { 2053 if (!req->r_linger) { 2054 dout("%p tid %llu requeued on osd%d\n", req, 2055 req->r_tid, 2056 req->r_osd ? req->r_osd->o_osd : -1); 2057 req->r_flags |= CEPH_OSD_FLAG_RETRY; 2058 } 2059 } 2060 } 2061 2062 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 2063 r_linger_item) { 2064 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 2065 2066 err = __map_request(osdc, req, 2067 force_resend || force_resend_writes); 2068 dout("__map_request returned %d\n", err); 2069 if (err < 0) 2070 continue; /* hrm! */ 2071 if (req->r_osd == NULL || err > 0) { 2072 if (req->r_osd == NULL) { 2073 dout("lingering %p tid %llu maps to no osd\n", 2074 req, req->r_tid); 2075 /* 2076 * A homeless lingering request makes 2077 * no sense, as it's job is to keep 2078 * a particular OSD connection open. 2079 * Request a newer map and kick the 2080 * request, knowing that it won't be 2081 * resent until we actually get a map 2082 * that can tell us where to send it. 2083 */ 2084 needmap++; 2085 } 2086 2087 dout("kicking lingering %p tid %llu osd%d\n", req, 2088 req->r_tid, req->r_osd ? req->r_osd->o_osd : -1); 2089 __register_request(osdc, req); 2090 __unregister_linger_request(osdc, req); 2091 } 2092 } 2093 reset_changed_osds(osdc); 2094 mutex_unlock(&osdc->request_mutex); 2095 2096 if (needmap) { 2097 dout("%d requests for down osds, need new map\n", needmap); 2098 ceph_monc_request_next_osdmap(&osdc->client->monc); 2099 } 2100 } 2101 2102 2103 /* 2104 * Process updated osd map. 2105 * 2106 * The message contains any number of incremental and full maps, normally 2107 * indicating some sort of topology change in the cluster. Kick requests 2108 * off to different OSDs as needed. 2109 */ 2110 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 2111 { 2112 void *p, *end, *next; 2113 u32 nr_maps, maplen; 2114 u32 epoch; 2115 struct ceph_osdmap *newmap = NULL, *oldmap; 2116 int err; 2117 struct ceph_fsid fsid; 2118 bool was_full; 2119 2120 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 2121 p = msg->front.iov_base; 2122 end = p + msg->front.iov_len; 2123 2124 /* verify fsid */ 2125 ceph_decode_need(&p, end, sizeof(fsid), bad); 2126 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 2127 if (ceph_check_fsid(osdc->client, &fsid) < 0) 2128 return; 2129 2130 down_write(&osdc->map_sem); 2131 2132 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 2133 2134 /* incremental maps */ 2135 ceph_decode_32_safe(&p, end, nr_maps, bad); 2136 dout(" %d inc maps\n", nr_maps); 2137 while (nr_maps > 0) { 2138 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2139 epoch = ceph_decode_32(&p); 2140 maplen = ceph_decode_32(&p); 2141 ceph_decode_need(&p, end, maplen, bad); 2142 next = p + maplen; 2143 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 2144 dout("applying incremental map %u len %d\n", 2145 epoch, maplen); 2146 newmap = osdmap_apply_incremental(&p, next, 2147 osdc->osdmap); 2148 if (IS_ERR(newmap)) { 2149 err = PTR_ERR(newmap); 2150 goto bad; 2151 } 2152 BUG_ON(!newmap); 2153 if (newmap != osdc->osdmap) { 2154 ceph_osdmap_destroy(osdc->osdmap); 2155 osdc->osdmap = newmap; 2156 } 2157 was_full = was_full || 2158 ceph_osdmap_flag(osdc->osdmap, 2159 CEPH_OSDMAP_FULL); 2160 kick_requests(osdc, 0, was_full); 2161 } else { 2162 dout("ignoring incremental map %u len %d\n", 2163 epoch, maplen); 2164 } 2165 p = next; 2166 nr_maps--; 2167 } 2168 if (newmap) 2169 goto done; 2170 2171 /* full maps */ 2172 ceph_decode_32_safe(&p, end, nr_maps, bad); 2173 dout(" %d full maps\n", nr_maps); 2174 while (nr_maps) { 2175 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2176 epoch = ceph_decode_32(&p); 2177 maplen = ceph_decode_32(&p); 2178 ceph_decode_need(&p, end, maplen, bad); 2179 if (nr_maps > 1) { 2180 dout("skipping non-latest full map %u len %d\n", 2181 epoch, maplen); 2182 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 2183 dout("skipping full map %u len %d, " 2184 "older than our %u\n", epoch, maplen, 2185 osdc->osdmap->epoch); 2186 } else { 2187 int skipped_map = 0; 2188 2189 dout("taking full map %u len %d\n", epoch, maplen); 2190 newmap = ceph_osdmap_decode(&p, p+maplen); 2191 if (IS_ERR(newmap)) { 2192 err = PTR_ERR(newmap); 2193 goto bad; 2194 } 2195 BUG_ON(!newmap); 2196 oldmap = osdc->osdmap; 2197 osdc->osdmap = newmap; 2198 if (oldmap) { 2199 if (oldmap->epoch + 1 < newmap->epoch) 2200 skipped_map = 1; 2201 ceph_osdmap_destroy(oldmap); 2202 } 2203 was_full = was_full || 2204 ceph_osdmap_flag(osdc->osdmap, 2205 CEPH_OSDMAP_FULL); 2206 kick_requests(osdc, skipped_map, was_full); 2207 } 2208 p += maplen; 2209 nr_maps--; 2210 } 2211 2212 if (!osdc->osdmap) 2213 goto bad; 2214 done: 2215 downgrade_write(&osdc->map_sem); 2216 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 2217 osdc->osdmap->epoch); 2218 2219 /* 2220 * subscribe to subsequent osdmap updates if full to ensure 2221 * we find out when we are no longer full and stop returning 2222 * ENOSPC. 2223 */ 2224 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || 2225 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || 2226 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) 2227 ceph_monc_request_next_osdmap(&osdc->client->monc); 2228 2229 mutex_lock(&osdc->request_mutex); 2230 __send_queued(osdc); 2231 mutex_unlock(&osdc->request_mutex); 2232 up_read(&osdc->map_sem); 2233 wake_up_all(&osdc->client->auth_wq); 2234 return; 2235 2236 bad: 2237 pr_err("osdc handle_map corrupt msg\n"); 2238 ceph_msg_dump(msg); 2239 up_write(&osdc->map_sem); 2240 } 2241 2242 /* 2243 * watch/notify callback event infrastructure 2244 * 2245 * These callbacks are used both for watch and notify operations. 2246 */ 2247 static void __release_event(struct kref *kref) 2248 { 2249 struct ceph_osd_event *event = 2250 container_of(kref, struct ceph_osd_event, kref); 2251 2252 dout("__release_event %p\n", event); 2253 kfree(event); 2254 } 2255 2256 static void get_event(struct ceph_osd_event *event) 2257 { 2258 kref_get(&event->kref); 2259 } 2260 2261 void ceph_osdc_put_event(struct ceph_osd_event *event) 2262 { 2263 kref_put(&event->kref, __release_event); 2264 } 2265 EXPORT_SYMBOL(ceph_osdc_put_event); 2266 2267 static void __insert_event(struct ceph_osd_client *osdc, 2268 struct ceph_osd_event *new) 2269 { 2270 struct rb_node **p = &osdc->event_tree.rb_node; 2271 struct rb_node *parent = NULL; 2272 struct ceph_osd_event *event = NULL; 2273 2274 while (*p) { 2275 parent = *p; 2276 event = rb_entry(parent, struct ceph_osd_event, node); 2277 if (new->cookie < event->cookie) 2278 p = &(*p)->rb_left; 2279 else if (new->cookie > event->cookie) 2280 p = &(*p)->rb_right; 2281 else 2282 BUG(); 2283 } 2284 2285 rb_link_node(&new->node, parent, p); 2286 rb_insert_color(&new->node, &osdc->event_tree); 2287 } 2288 2289 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 2290 u64 cookie) 2291 { 2292 struct rb_node **p = &osdc->event_tree.rb_node; 2293 struct rb_node *parent = NULL; 2294 struct ceph_osd_event *event = NULL; 2295 2296 while (*p) { 2297 parent = *p; 2298 event = rb_entry(parent, struct ceph_osd_event, node); 2299 if (cookie < event->cookie) 2300 p = &(*p)->rb_left; 2301 else if (cookie > event->cookie) 2302 p = &(*p)->rb_right; 2303 else 2304 return event; 2305 } 2306 return NULL; 2307 } 2308 2309 static void __remove_event(struct ceph_osd_event *event) 2310 { 2311 struct ceph_osd_client *osdc = event->osdc; 2312 2313 if (!RB_EMPTY_NODE(&event->node)) { 2314 dout("__remove_event removed %p\n", event); 2315 rb_erase(&event->node, &osdc->event_tree); 2316 ceph_osdc_put_event(event); 2317 } else { 2318 dout("__remove_event didn't remove %p\n", event); 2319 } 2320 } 2321 2322 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 2323 void (*event_cb)(u64, u64, u8, void *), 2324 void *data, struct ceph_osd_event **pevent) 2325 { 2326 struct ceph_osd_event *event; 2327 2328 event = kmalloc(sizeof(*event), GFP_NOIO); 2329 if (!event) 2330 return -ENOMEM; 2331 2332 dout("create_event %p\n", event); 2333 event->cb = event_cb; 2334 event->one_shot = 0; 2335 event->data = data; 2336 event->osdc = osdc; 2337 INIT_LIST_HEAD(&event->osd_node); 2338 RB_CLEAR_NODE(&event->node); 2339 kref_init(&event->kref); /* one ref for us */ 2340 kref_get(&event->kref); /* one ref for the caller */ 2341 2342 spin_lock(&osdc->event_lock); 2343 event->cookie = ++osdc->event_count; 2344 __insert_event(osdc, event); 2345 spin_unlock(&osdc->event_lock); 2346 2347 *pevent = event; 2348 return 0; 2349 } 2350 EXPORT_SYMBOL(ceph_osdc_create_event); 2351 2352 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 2353 { 2354 struct ceph_osd_client *osdc = event->osdc; 2355 2356 dout("cancel_event %p\n", event); 2357 spin_lock(&osdc->event_lock); 2358 __remove_event(event); 2359 spin_unlock(&osdc->event_lock); 2360 ceph_osdc_put_event(event); /* caller's */ 2361 } 2362 EXPORT_SYMBOL(ceph_osdc_cancel_event); 2363 2364 2365 static void do_event_work(struct work_struct *work) 2366 { 2367 struct ceph_osd_event_work *event_work = 2368 container_of(work, struct ceph_osd_event_work, work); 2369 struct ceph_osd_event *event = event_work->event; 2370 u64 ver = event_work->ver; 2371 u64 notify_id = event_work->notify_id; 2372 u8 opcode = event_work->opcode; 2373 2374 dout("do_event_work completing %p\n", event); 2375 event->cb(ver, notify_id, opcode, event->data); 2376 dout("do_event_work completed %p\n", event); 2377 ceph_osdc_put_event(event); 2378 kfree(event_work); 2379 } 2380 2381 2382 /* 2383 * Process osd watch notifications 2384 */ 2385 static void handle_watch_notify(struct ceph_osd_client *osdc, 2386 struct ceph_msg *msg) 2387 { 2388 void *p, *end; 2389 u8 proto_ver; 2390 u64 cookie, ver, notify_id; 2391 u8 opcode; 2392 struct ceph_osd_event *event; 2393 struct ceph_osd_event_work *event_work; 2394 2395 p = msg->front.iov_base; 2396 end = p + msg->front.iov_len; 2397 2398 ceph_decode_8_safe(&p, end, proto_ver, bad); 2399 ceph_decode_8_safe(&p, end, opcode, bad); 2400 ceph_decode_64_safe(&p, end, cookie, bad); 2401 ceph_decode_64_safe(&p, end, ver, bad); 2402 ceph_decode_64_safe(&p, end, notify_id, bad); 2403 2404 spin_lock(&osdc->event_lock); 2405 event = __find_event(osdc, cookie); 2406 if (event) { 2407 BUG_ON(event->one_shot); 2408 get_event(event); 2409 } 2410 spin_unlock(&osdc->event_lock); 2411 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 2412 cookie, ver, event); 2413 if (event) { 2414 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 2415 if (!event_work) { 2416 pr_err("couldn't allocate event_work\n"); 2417 ceph_osdc_put_event(event); 2418 return; 2419 } 2420 INIT_WORK(&event_work->work, do_event_work); 2421 event_work->event = event; 2422 event_work->ver = ver; 2423 event_work->notify_id = notify_id; 2424 event_work->opcode = opcode; 2425 2426 queue_work(osdc->notify_wq, &event_work->work); 2427 } 2428 2429 return; 2430 2431 bad: 2432 pr_err("osdc handle_watch_notify corrupt msg\n"); 2433 } 2434 2435 /* 2436 * build new request AND message 2437 * 2438 */ 2439 void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, 2440 struct ceph_snap_context *snapc, u64 snap_id, 2441 struct timespec *mtime) 2442 { 2443 struct ceph_msg *msg = req->r_request; 2444 void *p; 2445 size_t msg_size; 2446 int flags = req->r_flags; 2447 u64 data_len; 2448 unsigned int i; 2449 2450 req->r_snapid = snap_id; 2451 WARN_ON(snapc != req->r_snapc); 2452 2453 /* encode request */ 2454 msg->hdr.version = cpu_to_le16(4); 2455 2456 p = msg->front.iov_base; 2457 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 2458 req->r_request_osdmap_epoch = p; 2459 p += 4; 2460 req->r_request_flags = p; 2461 p += 4; 2462 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 2463 ceph_encode_timespec(p, mtime); 2464 p += sizeof(struct ceph_timespec); 2465 req->r_request_reassert_version = p; 2466 p += sizeof(struct ceph_eversion); /* will get filled in */ 2467 2468 /* oloc */ 2469 ceph_encode_8(&p, 4); 2470 ceph_encode_8(&p, 4); 2471 ceph_encode_32(&p, 8 + 4 + 4); 2472 req->r_request_pool = p; 2473 p += 8; 2474 ceph_encode_32(&p, -1); /* preferred */ 2475 ceph_encode_32(&p, 0); /* key len */ 2476 2477 ceph_encode_8(&p, 1); 2478 req->r_request_pgid = p; 2479 p += 8 + 4; 2480 ceph_encode_32(&p, -1); /* preferred */ 2481 2482 /* oid */ 2483 ceph_encode_32(&p, req->r_base_oid.name_len); 2484 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); 2485 dout("oid %*pE len %d\n", req->r_base_oid.name_len, 2486 req->r_base_oid.name, req->r_base_oid.name_len); 2487 p += req->r_base_oid.name_len; 2488 2489 /* ops--can imply data */ 2490 ceph_encode_16(&p, (u16)req->r_num_ops); 2491 data_len = 0; 2492 for (i = 0; i < req->r_num_ops; i++) { 2493 data_len += osd_req_encode_op(req, p, i); 2494 p += sizeof(struct ceph_osd_op); 2495 } 2496 2497 /* snaps */ 2498 ceph_encode_64(&p, req->r_snapid); 2499 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 2500 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 2501 if (req->r_snapc) { 2502 for (i = 0; i < req->r_snapc->num_snaps; i++) { 2503 ceph_encode_64(&p, req->r_snapc->snaps[i]); 2504 } 2505 } 2506 2507 req->r_request_attempts = p; 2508 p += 4; 2509 2510 /* data */ 2511 if (flags & CEPH_OSD_FLAG_WRITE) { 2512 u16 data_off; 2513 2514 /* 2515 * The header "data_off" is a hint to the receiver 2516 * allowing it to align received data into its 2517 * buffers such that there's no need to re-copy 2518 * it before writing it to disk (direct I/O). 2519 */ 2520 data_off = (u16) (off & 0xffff); 2521 req->r_request->hdr.data_off = cpu_to_le16(data_off); 2522 } 2523 req->r_request->hdr.data_len = cpu_to_le32(data_len); 2524 2525 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 2526 msg_size = p - msg->front.iov_base; 2527 msg->front.iov_len = msg_size; 2528 msg->hdr.front_len = cpu_to_le32(msg_size); 2529 2530 dout("build_request msg_size was %d\n", (int)msg_size); 2531 } 2532 EXPORT_SYMBOL(ceph_osdc_build_request); 2533 2534 /* 2535 * Register request, send initial attempt. 2536 */ 2537 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2538 struct ceph_osd_request *req, 2539 bool nofail) 2540 { 2541 int rc; 2542 2543 down_read(&osdc->map_sem); 2544 mutex_lock(&osdc->request_mutex); 2545 2546 rc = __ceph_osdc_start_request(osdc, req, nofail); 2547 2548 mutex_unlock(&osdc->request_mutex); 2549 up_read(&osdc->map_sem); 2550 2551 return rc; 2552 } 2553 EXPORT_SYMBOL(ceph_osdc_start_request); 2554 2555 /* 2556 * Unregister a registered request. The request is not completed (i.e. 2557 * no callbacks or wakeups) - higher layers are supposed to know what 2558 * they are canceling. 2559 */ 2560 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 2561 { 2562 struct ceph_osd_client *osdc = req->r_osdc; 2563 2564 mutex_lock(&osdc->request_mutex); 2565 if (req->r_linger) 2566 __unregister_linger_request(osdc, req); 2567 __unregister_request(osdc, req); 2568 mutex_unlock(&osdc->request_mutex); 2569 2570 dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid); 2571 } 2572 EXPORT_SYMBOL(ceph_osdc_cancel_request); 2573 2574 /* 2575 * wait for a request to complete 2576 */ 2577 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 2578 struct ceph_osd_request *req) 2579 { 2580 int rc; 2581 2582 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 2583 2584 rc = wait_for_completion_interruptible(&req->r_completion); 2585 if (rc < 0) { 2586 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); 2587 ceph_osdc_cancel_request(req); 2588 complete_request(req); 2589 return rc; 2590 } 2591 2592 dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid, 2593 req->r_result); 2594 return req->r_result; 2595 } 2596 EXPORT_SYMBOL(ceph_osdc_wait_request); 2597 2598 /* 2599 * sync - wait for all in-flight requests to flush. avoid starvation. 2600 */ 2601 void ceph_osdc_sync(struct ceph_osd_client *osdc) 2602 { 2603 struct ceph_osd_request *req; 2604 u64 last_tid, next_tid = 0; 2605 2606 mutex_lock(&osdc->request_mutex); 2607 last_tid = osdc->last_tid; 2608 while (1) { 2609 req = __lookup_request_ge(osdc, next_tid); 2610 if (!req) 2611 break; 2612 if (req->r_tid > last_tid) 2613 break; 2614 2615 next_tid = req->r_tid + 1; 2616 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 2617 continue; 2618 2619 ceph_osdc_get_request(req); 2620 mutex_unlock(&osdc->request_mutex); 2621 dout("sync waiting on tid %llu (last is %llu)\n", 2622 req->r_tid, last_tid); 2623 wait_for_completion(&req->r_safe_completion); 2624 mutex_lock(&osdc->request_mutex); 2625 ceph_osdc_put_request(req); 2626 } 2627 mutex_unlock(&osdc->request_mutex); 2628 dout("sync done (thru tid %llu)\n", last_tid); 2629 } 2630 EXPORT_SYMBOL(ceph_osdc_sync); 2631 2632 /* 2633 * Call all pending notify callbacks - for use after a watch is 2634 * unregistered, to make sure no more callbacks for it will be invoked 2635 */ 2636 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 2637 { 2638 flush_workqueue(osdc->notify_wq); 2639 } 2640 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 2641 2642 2643 /* 2644 * init, shutdown 2645 */ 2646 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 2647 { 2648 int err; 2649 2650 dout("init\n"); 2651 osdc->client = client; 2652 osdc->osdmap = NULL; 2653 init_rwsem(&osdc->map_sem); 2654 mutex_init(&osdc->request_mutex); 2655 osdc->last_tid = 0; 2656 osdc->osds = RB_ROOT; 2657 INIT_LIST_HEAD(&osdc->osd_lru); 2658 osdc->requests = RB_ROOT; 2659 INIT_LIST_HEAD(&osdc->req_lru); 2660 INIT_LIST_HEAD(&osdc->req_unsent); 2661 INIT_LIST_HEAD(&osdc->req_notarget); 2662 INIT_LIST_HEAD(&osdc->req_linger); 2663 osdc->num_requests = 0; 2664 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 2665 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 2666 spin_lock_init(&osdc->event_lock); 2667 osdc->event_tree = RB_ROOT; 2668 osdc->event_count = 0; 2669 2670 schedule_delayed_work(&osdc->osds_timeout_work, 2671 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 2672 2673 err = -ENOMEM; 2674 osdc->req_mempool = mempool_create_slab_pool(10, 2675 ceph_osd_request_cache); 2676 if (!osdc->req_mempool) 2677 goto out; 2678 2679 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2680 PAGE_SIZE, 10, true, "osd_op"); 2681 if (err < 0) 2682 goto out_mempool; 2683 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2684 PAGE_SIZE, 10, true, "osd_op_reply"); 2685 if (err < 0) 2686 goto out_msgpool; 2687 2688 err = -ENOMEM; 2689 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2690 if (!osdc->notify_wq) 2691 goto out_msgpool_reply; 2692 2693 return 0; 2694 2695 out_msgpool_reply: 2696 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2697 out_msgpool: 2698 ceph_msgpool_destroy(&osdc->msgpool_op); 2699 out_mempool: 2700 mempool_destroy(osdc->req_mempool); 2701 out: 2702 return err; 2703 } 2704 2705 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2706 { 2707 flush_workqueue(osdc->notify_wq); 2708 destroy_workqueue(osdc->notify_wq); 2709 cancel_delayed_work_sync(&osdc->timeout_work); 2710 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2711 2712 mutex_lock(&osdc->request_mutex); 2713 while (!RB_EMPTY_ROOT(&osdc->osds)) { 2714 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 2715 struct ceph_osd, o_node); 2716 remove_osd(osdc, osd); 2717 } 2718 mutex_unlock(&osdc->request_mutex); 2719 2720 if (osdc->osdmap) { 2721 ceph_osdmap_destroy(osdc->osdmap); 2722 osdc->osdmap = NULL; 2723 } 2724 mempool_destroy(osdc->req_mempool); 2725 ceph_msgpool_destroy(&osdc->msgpool_op); 2726 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2727 } 2728 2729 /* 2730 * Read some contiguous pages. If we cross a stripe boundary, shorten 2731 * *plen. Return number of bytes read, or error. 2732 */ 2733 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2734 struct ceph_vino vino, struct ceph_file_layout *layout, 2735 u64 off, u64 *plen, 2736 u32 truncate_seq, u64 truncate_size, 2737 struct page **pages, int num_pages, int page_align) 2738 { 2739 struct ceph_osd_request *req; 2740 int rc = 0; 2741 2742 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2743 vino.snap, off, *plen); 2744 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, 2745 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2746 NULL, truncate_seq, truncate_size, 2747 false); 2748 if (IS_ERR(req)) 2749 return PTR_ERR(req); 2750 2751 /* it may be a short read due to an object boundary */ 2752 2753 osd_req_op_extent_osd_data_pages(req, 0, 2754 pages, *plen, page_align, false, false); 2755 2756 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2757 off, *plen, *plen, page_align); 2758 2759 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); 2760 2761 rc = ceph_osdc_start_request(osdc, req, false); 2762 if (!rc) 2763 rc = ceph_osdc_wait_request(osdc, req); 2764 2765 ceph_osdc_put_request(req); 2766 dout("readpages result %d\n", rc); 2767 return rc; 2768 } 2769 EXPORT_SYMBOL(ceph_osdc_readpages); 2770 2771 /* 2772 * do a synchronous write on N pages 2773 */ 2774 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2775 struct ceph_file_layout *layout, 2776 struct ceph_snap_context *snapc, 2777 u64 off, u64 len, 2778 u32 truncate_seq, u64 truncate_size, 2779 struct timespec *mtime, 2780 struct page **pages, int num_pages) 2781 { 2782 struct ceph_osd_request *req; 2783 int rc = 0; 2784 int page_align = off & ~PAGE_MASK; 2785 2786 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ 2787 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 2788 CEPH_OSD_OP_WRITE, 2789 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2790 snapc, truncate_seq, truncate_size, 2791 true); 2792 if (IS_ERR(req)) 2793 return PTR_ERR(req); 2794 2795 /* it may be a short write due to an object boundary */ 2796 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 2797 false, false); 2798 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2799 2800 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2801 2802 rc = ceph_osdc_start_request(osdc, req, true); 2803 if (!rc) 2804 rc = ceph_osdc_wait_request(osdc, req); 2805 2806 ceph_osdc_put_request(req); 2807 if (rc == 0) 2808 rc = len; 2809 dout("writepages result %d\n", rc); 2810 return rc; 2811 } 2812 EXPORT_SYMBOL(ceph_osdc_writepages); 2813 2814 int ceph_osdc_setup(void) 2815 { 2816 size_t size = sizeof(struct ceph_osd_request) + 2817 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 2818 2819 BUG_ON(ceph_osd_request_cache); 2820 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 2821 0, 0, NULL); 2822 2823 return ceph_osd_request_cache ? 0 : -ENOMEM; 2824 } 2825 EXPORT_SYMBOL(ceph_osdc_setup); 2826 2827 void ceph_osdc_cleanup(void) 2828 { 2829 BUG_ON(!ceph_osd_request_cache); 2830 kmem_cache_destroy(ceph_osd_request_cache); 2831 ceph_osd_request_cache = NULL; 2832 } 2833 EXPORT_SYMBOL(ceph_osdc_cleanup); 2834 2835 /* 2836 * handle incoming message 2837 */ 2838 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2839 { 2840 struct ceph_osd *osd = con->private; 2841 struct ceph_osd_client *osdc; 2842 int type = le16_to_cpu(msg->hdr.type); 2843 2844 if (!osd) 2845 goto out; 2846 osdc = osd->o_osdc; 2847 2848 switch (type) { 2849 case CEPH_MSG_OSD_MAP: 2850 ceph_osdc_handle_map(osdc, msg); 2851 break; 2852 case CEPH_MSG_OSD_OPREPLY: 2853 handle_reply(osdc, msg); 2854 break; 2855 case CEPH_MSG_WATCH_NOTIFY: 2856 handle_watch_notify(osdc, msg); 2857 break; 2858 2859 default: 2860 pr_err("received unknown message type %d %s\n", type, 2861 ceph_msg_type_name(type)); 2862 } 2863 out: 2864 ceph_msg_put(msg); 2865 } 2866 2867 /* 2868 * Lookup and return message for incoming reply. Don't try to do 2869 * anything about a larger than preallocated data portion of the 2870 * message at the moment - for now, just skip the message. 2871 */ 2872 static struct ceph_msg *get_reply(struct ceph_connection *con, 2873 struct ceph_msg_header *hdr, 2874 int *skip) 2875 { 2876 struct ceph_osd *osd = con->private; 2877 struct ceph_osd_client *osdc = osd->o_osdc; 2878 struct ceph_msg *m; 2879 struct ceph_osd_request *req; 2880 int front_len = le32_to_cpu(hdr->front_len); 2881 int data_len = le32_to_cpu(hdr->data_len); 2882 u64 tid; 2883 2884 tid = le64_to_cpu(hdr->tid); 2885 mutex_lock(&osdc->request_mutex); 2886 req = lookup_request(&osdc->requests, tid); 2887 if (!req) { 2888 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 2889 osd->o_osd, tid); 2890 m = NULL; 2891 *skip = 1; 2892 goto out; 2893 } 2894 2895 ceph_msg_revoke_incoming(req->r_reply); 2896 2897 if (front_len > req->r_reply->front_alloc_len) { 2898 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 2899 __func__, osd->o_osd, req->r_tid, front_len, 2900 req->r_reply->front_alloc_len); 2901 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 2902 false); 2903 if (!m) 2904 goto out; 2905 ceph_msg_put(req->r_reply); 2906 req->r_reply = m; 2907 } 2908 2909 if (data_len > req->r_reply->data_length) { 2910 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 2911 __func__, osd->o_osd, req->r_tid, data_len, 2912 req->r_reply->data_length); 2913 m = NULL; 2914 *skip = 1; 2915 goto out; 2916 } 2917 2918 m = ceph_msg_get(req->r_reply); 2919 dout("get_reply tid %lld %p\n", tid, m); 2920 2921 out: 2922 mutex_unlock(&osdc->request_mutex); 2923 return m; 2924 } 2925 2926 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2927 struct ceph_msg_header *hdr, 2928 int *skip) 2929 { 2930 struct ceph_osd *osd = con->private; 2931 int type = le16_to_cpu(hdr->type); 2932 int front = le32_to_cpu(hdr->front_len); 2933 2934 *skip = 0; 2935 switch (type) { 2936 case CEPH_MSG_OSD_MAP: 2937 case CEPH_MSG_WATCH_NOTIFY: 2938 return ceph_msg_new(type, front, GFP_NOFS, false); 2939 case CEPH_MSG_OSD_OPREPLY: 2940 return get_reply(con, hdr, skip); 2941 default: 2942 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2943 osd->o_osd); 2944 *skip = 1; 2945 return NULL; 2946 } 2947 } 2948 2949 /* 2950 * Wrappers to refcount containing ceph_osd struct 2951 */ 2952 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2953 { 2954 struct ceph_osd *osd = con->private; 2955 if (get_osd(osd)) 2956 return con; 2957 return NULL; 2958 } 2959 2960 static void put_osd_con(struct ceph_connection *con) 2961 { 2962 struct ceph_osd *osd = con->private; 2963 put_osd(osd); 2964 } 2965 2966 /* 2967 * authentication 2968 */ 2969 /* 2970 * Note: returned pointer is the address of a structure that's 2971 * managed separately. Caller must *not* attempt to free it. 2972 */ 2973 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2974 int *proto, int force_new) 2975 { 2976 struct ceph_osd *o = con->private; 2977 struct ceph_osd_client *osdc = o->o_osdc; 2978 struct ceph_auth_client *ac = osdc->client->monc.auth; 2979 struct ceph_auth_handshake *auth = &o->o_auth; 2980 2981 if (force_new && auth->authorizer) { 2982 ceph_auth_destroy_authorizer(auth->authorizer); 2983 auth->authorizer = NULL; 2984 } 2985 if (!auth->authorizer) { 2986 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2987 auth); 2988 if (ret) 2989 return ERR_PTR(ret); 2990 } else { 2991 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2992 auth); 2993 if (ret) 2994 return ERR_PTR(ret); 2995 } 2996 *proto = ac->protocol; 2997 2998 return auth; 2999 } 3000 3001 3002 static int verify_authorizer_reply(struct ceph_connection *con, int len) 3003 { 3004 struct ceph_osd *o = con->private; 3005 struct ceph_osd_client *osdc = o->o_osdc; 3006 struct ceph_auth_client *ac = osdc->client->monc.auth; 3007 3008 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 3009 } 3010 3011 static int invalidate_authorizer(struct ceph_connection *con) 3012 { 3013 struct ceph_osd *o = con->private; 3014 struct ceph_osd_client *osdc = o->o_osdc; 3015 struct ceph_auth_client *ac = osdc->client->monc.auth; 3016 3017 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 3018 return ceph_monc_validate_auth(&osdc->client->monc); 3019 } 3020 3021 static int osd_sign_message(struct ceph_msg *msg) 3022 { 3023 struct ceph_osd *o = msg->con->private; 3024 struct ceph_auth_handshake *auth = &o->o_auth; 3025 3026 return ceph_auth_sign_message(auth, msg); 3027 } 3028 3029 static int osd_check_message_signature(struct ceph_msg *msg) 3030 { 3031 struct ceph_osd *o = msg->con->private; 3032 struct ceph_auth_handshake *auth = &o->o_auth; 3033 3034 return ceph_auth_check_message_signature(auth, msg); 3035 } 3036 3037 static const struct ceph_connection_operations osd_con_ops = { 3038 .get = get_osd_con, 3039 .put = put_osd_con, 3040 .dispatch = dispatch, 3041 .get_authorizer = get_authorizer, 3042 .verify_authorizer_reply = verify_authorizer_reply, 3043 .invalidate_authorizer = invalidate_authorizer, 3044 .alloc_msg = alloc_msg, 3045 .sign_message = osd_sign_message, 3046 .check_message_signature = osd_check_message_signature, 3047 .fault = osd_reset, 3048 }; 3049