1 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/err.h> 6 #include <linux/highmem.h> 7 #include <linux/mm.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/uaccess.h> 11 #ifdef CONFIG_BLOCK 12 #include <linux/bio.h> 13 #endif 14 15 #include <linux/ceph/libceph.h> 16 #include <linux/ceph/osd_client.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/auth.h> 20 #include <linux/ceph/pagelist.h> 21 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static struct kmem_cache *ceph_osd_request_cache; 25 26 static const struct ceph_connection_operations osd_con_ops; 27 28 static void __send_queued(struct ceph_osd_client *osdc); 29 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 30 static void __register_request(struct ceph_osd_client *osdc, 31 struct ceph_osd_request *req); 32 static void __unregister_request(struct ceph_osd_client *osdc, 33 struct ceph_osd_request *req); 34 static void __unregister_linger_request(struct ceph_osd_client *osdc, 35 struct ceph_osd_request *req); 36 static void __enqueue_request(struct ceph_osd_request *req); 37 static void __send_request(struct ceph_osd_client *osdc, 38 struct ceph_osd_request *req); 39 40 /* 41 * Implement client access to distributed object storage cluster. 42 * 43 * All data objects are stored within a cluster/cloud of OSDs, or 44 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 45 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 46 * remote daemons serving up and coordinating consistent and safe 47 * access to storage. 48 * 49 * Cluster membership and the mapping of data objects onto storage devices 50 * are described by the osd map. 51 * 52 * We keep track of pending OSD requests (read, write), resubmit 53 * requests to different OSDs when the cluster topology/data layout 54 * change, or retry the affected requests when the communications 55 * channel with an OSD is reset. 56 */ 57 58 /* 59 * calculate the mapping of a file extent onto an object, and fill out the 60 * request accordingly. shorten extent as necessary if it crosses an 61 * object boundary. 62 * 63 * fill osd op in request message. 64 */ 65 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 66 u64 *objnum, u64 *objoff, u64 *objlen) 67 { 68 u64 orig_len = *plen; 69 int r; 70 71 /* object extent? */ 72 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 73 objoff, objlen); 74 if (r < 0) 75 return r; 76 if (*objlen < orig_len) { 77 *plen = *objlen; 78 dout(" skipping last %llu, final file extent %llu~%llu\n", 79 orig_len - *plen, off, *plen); 80 } 81 82 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 83 84 return 0; 85 } 86 87 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 88 { 89 memset(osd_data, 0, sizeof (*osd_data)); 90 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 91 } 92 93 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 94 struct page **pages, u64 length, u32 alignment, 95 bool pages_from_pool, bool own_pages) 96 { 97 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 98 osd_data->pages = pages; 99 osd_data->length = length; 100 osd_data->alignment = alignment; 101 osd_data->pages_from_pool = pages_from_pool; 102 osd_data->own_pages = own_pages; 103 } 104 105 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 106 struct ceph_pagelist *pagelist) 107 { 108 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 109 osd_data->pagelist = pagelist; 110 } 111 112 #ifdef CONFIG_BLOCK 113 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 114 struct bio *bio, size_t bio_length) 115 { 116 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 117 osd_data->bio = bio; 118 osd_data->bio_length = bio_length; 119 } 120 #endif /* CONFIG_BLOCK */ 121 122 #define osd_req_op_data(oreq, whch, typ, fld) \ 123 ({ \ 124 struct ceph_osd_request *__oreq = (oreq); \ 125 unsigned int __whch = (whch); \ 126 BUG_ON(__whch >= __oreq->r_num_ops); \ 127 &__oreq->r_ops[__whch].typ.fld; \ 128 }) 129 130 static struct ceph_osd_data * 131 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 132 { 133 BUG_ON(which >= osd_req->r_num_ops); 134 135 return &osd_req->r_ops[which].raw_data_in; 136 } 137 138 struct ceph_osd_data * 139 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 140 unsigned int which) 141 { 142 return osd_req_op_data(osd_req, which, extent, osd_data); 143 } 144 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 145 146 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 147 unsigned int which, struct page **pages, 148 u64 length, u32 alignment, 149 bool pages_from_pool, bool own_pages) 150 { 151 struct ceph_osd_data *osd_data; 152 153 osd_data = osd_req_op_raw_data_in(osd_req, which); 154 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 155 pages_from_pool, own_pages); 156 } 157 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 158 159 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 160 unsigned int which, struct page **pages, 161 u64 length, u32 alignment, 162 bool pages_from_pool, bool own_pages) 163 { 164 struct ceph_osd_data *osd_data; 165 166 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 167 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 168 pages_from_pool, own_pages); 169 } 170 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 171 172 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 173 unsigned int which, struct ceph_pagelist *pagelist) 174 { 175 struct ceph_osd_data *osd_data; 176 177 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 178 ceph_osd_data_pagelist_init(osd_data, pagelist); 179 } 180 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 181 182 #ifdef CONFIG_BLOCK 183 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 184 unsigned int which, struct bio *bio, size_t bio_length) 185 { 186 struct ceph_osd_data *osd_data; 187 188 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 189 ceph_osd_data_bio_init(osd_data, bio, bio_length); 190 } 191 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 192 #endif /* CONFIG_BLOCK */ 193 194 static void osd_req_op_cls_request_info_pagelist( 195 struct ceph_osd_request *osd_req, 196 unsigned int which, struct ceph_pagelist *pagelist) 197 { 198 struct ceph_osd_data *osd_data; 199 200 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 201 ceph_osd_data_pagelist_init(osd_data, pagelist); 202 } 203 204 void osd_req_op_cls_request_data_pagelist( 205 struct ceph_osd_request *osd_req, 206 unsigned int which, struct ceph_pagelist *pagelist) 207 { 208 struct ceph_osd_data *osd_data; 209 210 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 211 ceph_osd_data_pagelist_init(osd_data, pagelist); 212 } 213 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 214 215 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 216 unsigned int which, struct page **pages, u64 length, 217 u32 alignment, bool pages_from_pool, bool own_pages) 218 { 219 struct ceph_osd_data *osd_data; 220 221 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 222 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 223 pages_from_pool, own_pages); 224 } 225 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 226 227 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 228 unsigned int which, struct page **pages, u64 length, 229 u32 alignment, bool pages_from_pool, bool own_pages) 230 { 231 struct ceph_osd_data *osd_data; 232 233 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 234 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 235 pages_from_pool, own_pages); 236 } 237 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 238 239 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 240 { 241 switch (osd_data->type) { 242 case CEPH_OSD_DATA_TYPE_NONE: 243 return 0; 244 case CEPH_OSD_DATA_TYPE_PAGES: 245 return osd_data->length; 246 case CEPH_OSD_DATA_TYPE_PAGELIST: 247 return (u64)osd_data->pagelist->length; 248 #ifdef CONFIG_BLOCK 249 case CEPH_OSD_DATA_TYPE_BIO: 250 return (u64)osd_data->bio_length; 251 #endif /* CONFIG_BLOCK */ 252 default: 253 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 254 return 0; 255 } 256 } 257 258 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 259 { 260 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 261 int num_pages; 262 263 num_pages = calc_pages_for((u64)osd_data->alignment, 264 (u64)osd_data->length); 265 ceph_release_page_vector(osd_data->pages, num_pages); 266 } 267 ceph_osd_data_init(osd_data); 268 } 269 270 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 271 unsigned int which) 272 { 273 struct ceph_osd_req_op *op; 274 275 BUG_ON(which >= osd_req->r_num_ops); 276 op = &osd_req->r_ops[which]; 277 278 switch (op->op) { 279 case CEPH_OSD_OP_READ: 280 case CEPH_OSD_OP_WRITE: 281 case CEPH_OSD_OP_WRITEFULL: 282 ceph_osd_data_release(&op->extent.osd_data); 283 break; 284 case CEPH_OSD_OP_CALL: 285 ceph_osd_data_release(&op->cls.request_info); 286 ceph_osd_data_release(&op->cls.request_data); 287 ceph_osd_data_release(&op->cls.response_data); 288 break; 289 case CEPH_OSD_OP_SETXATTR: 290 case CEPH_OSD_OP_CMPXATTR: 291 ceph_osd_data_release(&op->xattr.osd_data); 292 break; 293 case CEPH_OSD_OP_STAT: 294 ceph_osd_data_release(&op->raw_data_in); 295 break; 296 default: 297 break; 298 } 299 } 300 301 /* 302 * requests 303 */ 304 static void ceph_osdc_release_request(struct kref *kref) 305 { 306 struct ceph_osd_request *req = container_of(kref, 307 struct ceph_osd_request, r_kref); 308 unsigned int which; 309 310 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 311 req->r_request, req->r_reply); 312 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 313 WARN_ON(!list_empty(&req->r_req_lru_item)); 314 WARN_ON(!list_empty(&req->r_osd_item)); 315 WARN_ON(!list_empty(&req->r_linger_item)); 316 WARN_ON(!list_empty(&req->r_linger_osd_item)); 317 WARN_ON(req->r_osd); 318 319 if (req->r_request) 320 ceph_msg_put(req->r_request); 321 if (req->r_reply) { 322 ceph_msg_revoke_incoming(req->r_reply); 323 ceph_msg_put(req->r_reply); 324 } 325 326 for (which = 0; which < req->r_num_ops; which++) 327 osd_req_op_data_release(req, which); 328 329 ceph_oid_destroy(&req->r_base_oid); 330 ceph_oid_destroy(&req->r_target_oid); 331 ceph_put_snap_context(req->r_snapc); 332 333 if (req->r_mempool) 334 mempool_free(req, req->r_osdc->req_mempool); 335 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 336 kmem_cache_free(ceph_osd_request_cache, req); 337 else 338 kfree(req); 339 } 340 341 void ceph_osdc_get_request(struct ceph_osd_request *req) 342 { 343 dout("%s %p (was %d)\n", __func__, req, 344 atomic_read(&req->r_kref.refcount)); 345 kref_get(&req->r_kref); 346 } 347 EXPORT_SYMBOL(ceph_osdc_get_request); 348 349 void ceph_osdc_put_request(struct ceph_osd_request *req) 350 { 351 if (req) { 352 dout("%s %p (was %d)\n", __func__, req, 353 atomic_read(&req->r_kref.refcount)); 354 kref_put(&req->r_kref, ceph_osdc_release_request); 355 } 356 } 357 EXPORT_SYMBOL(ceph_osdc_put_request); 358 359 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 360 struct ceph_snap_context *snapc, 361 unsigned int num_ops, 362 bool use_mempool, 363 gfp_t gfp_flags) 364 { 365 struct ceph_osd_request *req; 366 367 if (use_mempool) { 368 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 369 req = mempool_alloc(osdc->req_mempool, gfp_flags); 370 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 371 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 372 } else { 373 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 374 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]), 375 gfp_flags); 376 } 377 if (unlikely(!req)) 378 return NULL; 379 380 /* req only, each op is zeroed in _osd_req_op_init() */ 381 memset(req, 0, sizeof(*req)); 382 383 req->r_osdc = osdc; 384 req->r_mempool = use_mempool; 385 req->r_num_ops = num_ops; 386 req->r_snapid = CEPH_NOSNAP; 387 req->r_snapc = ceph_get_snap_context(snapc); 388 389 kref_init(&req->r_kref); 390 init_completion(&req->r_completion); 391 init_completion(&req->r_safe_completion); 392 RB_CLEAR_NODE(&req->r_node); 393 INIT_LIST_HEAD(&req->r_unsafe_item); 394 INIT_LIST_HEAD(&req->r_linger_item); 395 INIT_LIST_HEAD(&req->r_linger_osd_item); 396 INIT_LIST_HEAD(&req->r_req_lru_item); 397 INIT_LIST_HEAD(&req->r_osd_item); 398 399 ceph_oid_init(&req->r_base_oid); 400 req->r_base_oloc.pool = -1; 401 ceph_oid_init(&req->r_target_oid); 402 req->r_target_oloc.pool = -1; 403 404 dout("%s req %p\n", __func__, req); 405 return req; 406 } 407 EXPORT_SYMBOL(ceph_osdc_alloc_request); 408 409 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 410 { 411 struct ceph_osd_client *osdc = req->r_osdc; 412 struct ceph_msg *msg; 413 int msg_size; 414 415 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 416 417 /* create request message */ 418 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ 419 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ 420 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 421 msg_size += 1 + 8 + 4 + 4; /* pgid */ 422 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 423 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 424 msg_size += 8; /* snapid */ 425 msg_size += 8; /* snap_seq */ 426 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 427 msg_size += 4; /* retry_attempt */ 428 429 if (req->r_mempool) 430 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 431 else 432 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); 433 if (!msg) 434 return -ENOMEM; 435 436 memset(msg->front.iov_base, 0, msg->front.iov_len); 437 req->r_request = msg; 438 439 /* create reply message */ 440 msg_size = OSD_OPREPLY_FRONT_LEN; 441 msg_size += req->r_base_oid.name_len; 442 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 443 444 if (req->r_mempool) 445 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 446 else 447 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); 448 if (!msg) 449 return -ENOMEM; 450 451 req->r_reply = msg; 452 453 return 0; 454 } 455 EXPORT_SYMBOL(ceph_osdc_alloc_messages); 456 457 static bool osd_req_opcode_valid(u16 opcode) 458 { 459 switch (opcode) { 460 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 461 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 462 #undef GENERATE_CASE 463 default: 464 return false; 465 } 466 } 467 468 /* 469 * This is an osd op init function for opcodes that have no data or 470 * other information associated with them. It also serves as a 471 * common init routine for all the other init functions, below. 472 */ 473 static struct ceph_osd_req_op * 474 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 475 u16 opcode, u32 flags) 476 { 477 struct ceph_osd_req_op *op; 478 479 BUG_ON(which >= osd_req->r_num_ops); 480 BUG_ON(!osd_req_opcode_valid(opcode)); 481 482 op = &osd_req->r_ops[which]; 483 memset(op, 0, sizeof (*op)); 484 op->op = opcode; 485 op->flags = flags; 486 487 return op; 488 } 489 490 void osd_req_op_init(struct ceph_osd_request *osd_req, 491 unsigned int which, u16 opcode, u32 flags) 492 { 493 (void)_osd_req_op_init(osd_req, which, opcode, flags); 494 } 495 EXPORT_SYMBOL(osd_req_op_init); 496 497 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 498 unsigned int which, u16 opcode, 499 u64 offset, u64 length, 500 u64 truncate_size, u32 truncate_seq) 501 { 502 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 503 opcode, 0); 504 size_t payload_len = 0; 505 506 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 507 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 508 opcode != CEPH_OSD_OP_TRUNCATE); 509 510 op->extent.offset = offset; 511 op->extent.length = length; 512 op->extent.truncate_size = truncate_size; 513 op->extent.truncate_seq = truncate_seq; 514 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 515 payload_len += length; 516 517 op->indata_len = payload_len; 518 } 519 EXPORT_SYMBOL(osd_req_op_extent_init); 520 521 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 522 unsigned int which, u64 length) 523 { 524 struct ceph_osd_req_op *op; 525 u64 previous; 526 527 BUG_ON(which >= osd_req->r_num_ops); 528 op = &osd_req->r_ops[which]; 529 previous = op->extent.length; 530 531 if (length == previous) 532 return; /* Nothing to do */ 533 BUG_ON(length > previous); 534 535 op->extent.length = length; 536 op->indata_len -= previous - length; 537 } 538 EXPORT_SYMBOL(osd_req_op_extent_update); 539 540 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 541 unsigned int which, u64 offset_inc) 542 { 543 struct ceph_osd_req_op *op, *prev_op; 544 545 BUG_ON(which + 1 >= osd_req->r_num_ops); 546 547 prev_op = &osd_req->r_ops[which]; 548 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 549 /* dup previous one */ 550 op->indata_len = prev_op->indata_len; 551 op->outdata_len = prev_op->outdata_len; 552 op->extent = prev_op->extent; 553 /* adjust offset */ 554 op->extent.offset += offset_inc; 555 op->extent.length -= offset_inc; 556 557 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 558 op->indata_len -= offset_inc; 559 } 560 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 561 562 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 563 u16 opcode, const char *class, const char *method) 564 { 565 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 566 opcode, 0); 567 struct ceph_pagelist *pagelist; 568 size_t payload_len = 0; 569 size_t size; 570 571 BUG_ON(opcode != CEPH_OSD_OP_CALL); 572 573 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 574 BUG_ON(!pagelist); 575 ceph_pagelist_init(pagelist); 576 577 op->cls.class_name = class; 578 size = strlen(class); 579 BUG_ON(size > (size_t) U8_MAX); 580 op->cls.class_len = size; 581 ceph_pagelist_append(pagelist, class, size); 582 payload_len += size; 583 584 op->cls.method_name = method; 585 size = strlen(method); 586 BUG_ON(size > (size_t) U8_MAX); 587 op->cls.method_len = size; 588 ceph_pagelist_append(pagelist, method, size); 589 payload_len += size; 590 591 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 592 593 op->cls.argc = 0; /* currently unused */ 594 595 op->indata_len = payload_len; 596 } 597 EXPORT_SYMBOL(osd_req_op_cls_init); 598 599 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 600 u16 opcode, const char *name, const void *value, 601 size_t size, u8 cmp_op, u8 cmp_mode) 602 { 603 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 604 opcode, 0); 605 struct ceph_pagelist *pagelist; 606 size_t payload_len; 607 608 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 609 610 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 611 if (!pagelist) 612 return -ENOMEM; 613 614 ceph_pagelist_init(pagelist); 615 616 payload_len = strlen(name); 617 op->xattr.name_len = payload_len; 618 ceph_pagelist_append(pagelist, name, payload_len); 619 620 op->xattr.value_len = size; 621 ceph_pagelist_append(pagelist, value, size); 622 payload_len += size; 623 624 op->xattr.cmp_op = cmp_op; 625 op->xattr.cmp_mode = cmp_mode; 626 627 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 628 op->indata_len = payload_len; 629 return 0; 630 } 631 EXPORT_SYMBOL(osd_req_op_xattr_init); 632 633 void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 634 unsigned int which, u16 opcode, 635 u64 cookie, u64 version, int flag) 636 { 637 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 638 opcode, 0); 639 640 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 641 642 op->watch.cookie = cookie; 643 op->watch.ver = version; 644 if (opcode == CEPH_OSD_OP_WATCH && flag) 645 op->watch.flag = (u8)1; 646 } 647 EXPORT_SYMBOL(osd_req_op_watch_init); 648 649 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 650 unsigned int which, 651 u64 expected_object_size, 652 u64 expected_write_size) 653 { 654 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 655 CEPH_OSD_OP_SETALLOCHINT, 656 0); 657 658 op->alloc_hint.expected_object_size = expected_object_size; 659 op->alloc_hint.expected_write_size = expected_write_size; 660 661 /* 662 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 663 * not worth a feature bit. Set FAILOK per-op flag to make 664 * sure older osds don't trip over an unsupported opcode. 665 */ 666 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 667 } 668 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 669 670 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 671 struct ceph_osd_data *osd_data) 672 { 673 u64 length = ceph_osd_data_length(osd_data); 674 675 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 676 BUG_ON(length > (u64) SIZE_MAX); 677 if (length) 678 ceph_msg_data_add_pages(msg, osd_data->pages, 679 length, osd_data->alignment); 680 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 681 BUG_ON(!length); 682 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 683 #ifdef CONFIG_BLOCK 684 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 685 ceph_msg_data_add_bio(msg, osd_data->bio, length); 686 #endif 687 } else { 688 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 689 } 690 } 691 692 static u64 osd_req_encode_op(struct ceph_osd_request *req, 693 struct ceph_osd_op *dst, unsigned int which) 694 { 695 struct ceph_osd_req_op *src; 696 struct ceph_osd_data *osd_data; 697 u64 request_data_len = 0; 698 u64 data_length; 699 700 BUG_ON(which >= req->r_num_ops); 701 src = &req->r_ops[which]; 702 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 703 pr_err("unrecognized osd opcode %d\n", src->op); 704 705 return 0; 706 } 707 708 switch (src->op) { 709 case CEPH_OSD_OP_STAT: 710 osd_data = &src->raw_data_in; 711 ceph_osdc_msg_data_add(req->r_reply, osd_data); 712 break; 713 case CEPH_OSD_OP_READ: 714 case CEPH_OSD_OP_WRITE: 715 case CEPH_OSD_OP_WRITEFULL: 716 case CEPH_OSD_OP_ZERO: 717 case CEPH_OSD_OP_TRUNCATE: 718 if (src->op == CEPH_OSD_OP_WRITE || 719 src->op == CEPH_OSD_OP_WRITEFULL) 720 request_data_len = src->extent.length; 721 dst->extent.offset = cpu_to_le64(src->extent.offset); 722 dst->extent.length = cpu_to_le64(src->extent.length); 723 dst->extent.truncate_size = 724 cpu_to_le64(src->extent.truncate_size); 725 dst->extent.truncate_seq = 726 cpu_to_le32(src->extent.truncate_seq); 727 osd_data = &src->extent.osd_data; 728 if (src->op == CEPH_OSD_OP_WRITE || 729 src->op == CEPH_OSD_OP_WRITEFULL) 730 ceph_osdc_msg_data_add(req->r_request, osd_data); 731 else 732 ceph_osdc_msg_data_add(req->r_reply, osd_data); 733 break; 734 case CEPH_OSD_OP_CALL: 735 dst->cls.class_len = src->cls.class_len; 736 dst->cls.method_len = src->cls.method_len; 737 osd_data = &src->cls.request_info; 738 ceph_osdc_msg_data_add(req->r_request, osd_data); 739 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST); 740 request_data_len = osd_data->pagelist->length; 741 742 osd_data = &src->cls.request_data; 743 data_length = ceph_osd_data_length(osd_data); 744 if (data_length) { 745 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); 746 dst->cls.indata_len = cpu_to_le32(data_length); 747 ceph_osdc_msg_data_add(req->r_request, osd_data); 748 src->indata_len += data_length; 749 request_data_len += data_length; 750 } 751 osd_data = &src->cls.response_data; 752 ceph_osdc_msg_data_add(req->r_reply, osd_data); 753 break; 754 case CEPH_OSD_OP_STARTSYNC: 755 break; 756 case CEPH_OSD_OP_NOTIFY_ACK: 757 case CEPH_OSD_OP_WATCH: 758 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 759 dst->watch.ver = cpu_to_le64(src->watch.ver); 760 dst->watch.flag = src->watch.flag; 761 break; 762 case CEPH_OSD_OP_SETALLOCHINT: 763 dst->alloc_hint.expected_object_size = 764 cpu_to_le64(src->alloc_hint.expected_object_size); 765 dst->alloc_hint.expected_write_size = 766 cpu_to_le64(src->alloc_hint.expected_write_size); 767 break; 768 case CEPH_OSD_OP_SETXATTR: 769 case CEPH_OSD_OP_CMPXATTR: 770 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 771 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 772 dst->xattr.cmp_op = src->xattr.cmp_op; 773 dst->xattr.cmp_mode = src->xattr.cmp_mode; 774 osd_data = &src->xattr.osd_data; 775 ceph_osdc_msg_data_add(req->r_request, osd_data); 776 request_data_len = osd_data->pagelist->length; 777 break; 778 case CEPH_OSD_OP_CREATE: 779 case CEPH_OSD_OP_DELETE: 780 break; 781 default: 782 pr_err("unsupported osd opcode %s\n", 783 ceph_osd_op_name(src->op)); 784 WARN_ON(1); 785 786 return 0; 787 } 788 789 dst->op = cpu_to_le16(src->op); 790 dst->flags = cpu_to_le32(src->flags); 791 dst->payload_len = cpu_to_le32(src->indata_len); 792 793 return request_data_len; 794 } 795 796 /* 797 * build new request AND message, calculate layout, and adjust file 798 * extent as needed. 799 * 800 * if the file was recently truncated, we include information about its 801 * old and new size so that the object can be updated appropriately. (we 802 * avoid synchronously deleting truncated objects because it's slow.) 803 * 804 * if @do_sync, include a 'startsync' command so that the osd will flush 805 * data quickly. 806 */ 807 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 808 struct ceph_file_layout *layout, 809 struct ceph_vino vino, 810 u64 off, u64 *plen, 811 unsigned int which, int num_ops, 812 int opcode, int flags, 813 struct ceph_snap_context *snapc, 814 u32 truncate_seq, 815 u64 truncate_size, 816 bool use_mempool) 817 { 818 struct ceph_osd_request *req; 819 u64 objnum = 0; 820 u64 objoff = 0; 821 u64 objlen = 0; 822 int r; 823 824 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 825 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 826 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 827 828 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 829 GFP_NOFS); 830 if (!req) { 831 r = -ENOMEM; 832 goto fail; 833 } 834 835 req->r_flags = flags; 836 837 /* calculate max write size */ 838 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 839 if (r) 840 goto fail; 841 842 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 843 osd_req_op_init(req, which, opcode, 0); 844 } else { 845 u32 object_size = le32_to_cpu(layout->fl_object_size); 846 u32 object_base = off - objoff; 847 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 848 if (truncate_size <= object_base) { 849 truncate_size = 0; 850 } else { 851 truncate_size -= object_base; 852 if (truncate_size > object_size) 853 truncate_size = object_size; 854 } 855 } 856 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 857 truncate_size, truncate_seq); 858 } 859 860 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); 861 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 862 863 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 864 if (r) 865 goto fail; 866 867 return req; 868 869 fail: 870 ceph_osdc_put_request(req); 871 return ERR_PTR(r); 872 } 873 EXPORT_SYMBOL(ceph_osdc_new_request); 874 875 /* 876 * We keep osd requests in an rbtree, sorted by ->r_tid. 877 */ 878 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 879 880 static struct ceph_osd_request * 881 __lookup_request_ge(struct ceph_osd_client *osdc, 882 u64 tid) 883 { 884 struct ceph_osd_request *req; 885 struct rb_node *n = osdc->requests.rb_node; 886 887 while (n) { 888 req = rb_entry(n, struct ceph_osd_request, r_node); 889 if (tid < req->r_tid) { 890 if (!n->rb_left) 891 return req; 892 n = n->rb_left; 893 } else if (tid > req->r_tid) { 894 n = n->rb_right; 895 } else { 896 return req; 897 } 898 } 899 return NULL; 900 } 901 902 static void __kick_linger_request(struct ceph_osd_request *req) 903 { 904 struct ceph_osd_client *osdc = req->r_osdc; 905 struct ceph_osd *osd = req->r_osd; 906 907 /* 908 * Linger requests need to be resent with a new tid to avoid 909 * the dup op detection logic on the OSDs. Achieve this with 910 * a re-register dance instead of open-coding. 911 */ 912 ceph_osdc_get_request(req); 913 if (!list_empty(&req->r_linger_item)) 914 __unregister_linger_request(osdc, req); 915 else 916 __unregister_request(osdc, req); 917 __register_request(osdc, req); 918 ceph_osdc_put_request(req); 919 920 /* 921 * Unless request has been registered as both normal and 922 * lingering, __unregister{,_linger}_request clears r_osd. 923 * However, here we need to preserve r_osd to make sure we 924 * requeue on the same OSD. 925 */ 926 WARN_ON(req->r_osd || !osd); 927 req->r_osd = osd; 928 929 dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid); 930 __enqueue_request(req); 931 } 932 933 /* 934 * Resubmit requests pending on the given osd. 935 */ 936 static void __kick_osd_requests(struct ceph_osd_client *osdc, 937 struct ceph_osd *osd) 938 { 939 struct ceph_osd_request *req, *nreq; 940 LIST_HEAD(resend); 941 LIST_HEAD(resend_linger); 942 int err; 943 944 dout("%s osd%d\n", __func__, osd->o_osd); 945 err = __reset_osd(osdc, osd); 946 if (err) 947 return; 948 949 /* 950 * Build up a list of requests to resend by traversing the 951 * osd's list of requests. Requests for a given object are 952 * sent in tid order, and that is also the order they're 953 * kept on this list. Therefore all requests that are in 954 * flight will be found first, followed by all requests that 955 * have not yet been sent. And to resend requests while 956 * preserving this order we will want to put any sent 957 * requests back on the front of the osd client's unsent 958 * list. 959 * 960 * So we build a separate ordered list of already-sent 961 * requests for the affected osd and splice it onto the 962 * front of the osd client's unsent list. Once we've seen a 963 * request that has not yet been sent we're done. Those 964 * requests are already sitting right where they belong. 965 */ 966 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 967 if (!req->r_sent) 968 break; 969 970 if (!req->r_linger) { 971 dout("%s requeueing %p tid %llu\n", __func__, req, 972 req->r_tid); 973 list_move_tail(&req->r_req_lru_item, &resend); 974 req->r_flags |= CEPH_OSD_FLAG_RETRY; 975 } else { 976 list_move_tail(&req->r_req_lru_item, &resend_linger); 977 } 978 } 979 list_splice(&resend, &osdc->req_unsent); 980 981 /* 982 * Both registered and not yet registered linger requests are 983 * enqueued with a new tid on the same OSD. We add/move them 984 * to req_unsent/o_requests at the end to keep things in tid 985 * order. 986 */ 987 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 988 r_linger_osd_item) { 989 WARN_ON(!list_empty(&req->r_req_lru_item)); 990 __kick_linger_request(req); 991 } 992 993 list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item) 994 __kick_linger_request(req); 995 } 996 997 /* 998 * If the osd connection drops, we need to resubmit all requests. 999 */ 1000 static void osd_reset(struct ceph_connection *con) 1001 { 1002 struct ceph_osd *osd = con->private; 1003 struct ceph_osd_client *osdc; 1004 1005 if (!osd) 1006 return; 1007 dout("osd_reset osd%d\n", osd->o_osd); 1008 osdc = osd->o_osdc; 1009 down_read(&osdc->map_sem); 1010 mutex_lock(&osdc->request_mutex); 1011 __kick_osd_requests(osdc, osd); 1012 __send_queued(osdc); 1013 mutex_unlock(&osdc->request_mutex); 1014 up_read(&osdc->map_sem); 1015 } 1016 1017 /* 1018 * Track open sessions with osds. 1019 */ 1020 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1021 { 1022 struct ceph_osd *osd; 1023 1024 osd = kzalloc(sizeof(*osd), GFP_NOFS); 1025 if (!osd) 1026 return NULL; 1027 1028 atomic_set(&osd->o_ref, 1); 1029 osd->o_osdc = osdc; 1030 osd->o_osd = onum; 1031 RB_CLEAR_NODE(&osd->o_node); 1032 INIT_LIST_HEAD(&osd->o_requests); 1033 INIT_LIST_HEAD(&osd->o_linger_requests); 1034 INIT_LIST_HEAD(&osd->o_osd_lru); 1035 osd->o_incarnation = 1; 1036 1037 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1038 1039 INIT_LIST_HEAD(&osd->o_keepalive_item); 1040 return osd; 1041 } 1042 1043 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1044 { 1045 if (atomic_inc_not_zero(&osd->o_ref)) { 1046 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 1047 atomic_read(&osd->o_ref)); 1048 return osd; 1049 } else { 1050 dout("get_osd %p FAIL\n", osd); 1051 return NULL; 1052 } 1053 } 1054 1055 static void put_osd(struct ceph_osd *osd) 1056 { 1057 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 1058 atomic_read(&osd->o_ref) - 1); 1059 if (atomic_dec_and_test(&osd->o_ref)) { 1060 if (osd->o_auth.authorizer) 1061 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1062 kfree(osd); 1063 } 1064 } 1065 1066 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) 1067 1068 /* 1069 * remove an osd from our map 1070 */ 1071 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1072 { 1073 dout("%s %p osd%d\n", __func__, osd, osd->o_osd); 1074 WARN_ON(!list_empty(&osd->o_requests)); 1075 WARN_ON(!list_empty(&osd->o_linger_requests)); 1076 1077 list_del_init(&osd->o_osd_lru); 1078 erase_osd(&osdc->osds, osd); 1079 } 1080 1081 static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1082 { 1083 dout("%s %p osd%d\n", __func__, osd, osd->o_osd); 1084 1085 if (!RB_EMPTY_NODE(&osd->o_node)) { 1086 ceph_con_close(&osd->o_con); 1087 __remove_osd(osdc, osd); 1088 put_osd(osd); 1089 } 1090 } 1091 1092 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 1093 struct ceph_osd *osd) 1094 { 1095 dout("%s %p\n", __func__, osd); 1096 BUG_ON(!list_empty(&osd->o_osd_lru)); 1097 1098 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1099 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1100 } 1101 1102 static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, 1103 struct ceph_osd *osd) 1104 { 1105 dout("%s %p\n", __func__, osd); 1106 1107 if (list_empty(&osd->o_requests) && 1108 list_empty(&osd->o_linger_requests)) 1109 __move_osd_to_lru(osdc, osd); 1110 } 1111 1112 static void __remove_osd_from_lru(struct ceph_osd *osd) 1113 { 1114 dout("__remove_osd_from_lru %p\n", osd); 1115 if (!list_empty(&osd->o_osd_lru)) 1116 list_del_init(&osd->o_osd_lru); 1117 } 1118 1119 /* 1120 * reset osd connect 1121 */ 1122 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1123 { 1124 struct ceph_entity_addr *peer_addr; 1125 1126 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 1127 if (list_empty(&osd->o_requests) && 1128 list_empty(&osd->o_linger_requests)) { 1129 remove_osd(osdc, osd); 1130 return -ENODEV; 1131 } 1132 1133 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 1134 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1135 !ceph_con_opened(&osd->o_con)) { 1136 struct ceph_osd_request *req; 1137 1138 dout("osd addr hasn't changed and connection never opened, " 1139 "letting msgr retry\n"); 1140 /* touch each r_stamp for handle_timeout()'s benfit */ 1141 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1142 req->r_stamp = jiffies; 1143 1144 return -EAGAIN; 1145 } 1146 1147 ceph_con_close(&osd->o_con); 1148 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1149 osd->o_incarnation++; 1150 1151 return 0; 1152 } 1153 1154 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 1155 { 1156 schedule_delayed_work(&osdc->timeout_work, 1157 osdc->client->options->osd_keepalive_timeout); 1158 } 1159 1160 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 1161 { 1162 cancel_delayed_work(&osdc->timeout_work); 1163 } 1164 1165 /* 1166 * Register request, assign tid. If this is the first request, set up 1167 * the timeout event. 1168 */ 1169 static void __register_request(struct ceph_osd_client *osdc, 1170 struct ceph_osd_request *req) 1171 { 1172 req->r_tid = ++osdc->last_tid; 1173 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 1174 dout("__register_request %p tid %lld\n", req, req->r_tid); 1175 insert_request(&osdc->requests, req); 1176 ceph_osdc_get_request(req); 1177 osdc->num_requests++; 1178 if (osdc->num_requests == 1) { 1179 dout(" first request, scheduling timeout\n"); 1180 __schedule_osd_timeout(osdc); 1181 } 1182 } 1183 1184 /* 1185 * called under osdc->request_mutex 1186 */ 1187 static void __unregister_request(struct ceph_osd_client *osdc, 1188 struct ceph_osd_request *req) 1189 { 1190 if (RB_EMPTY_NODE(&req->r_node)) { 1191 dout("__unregister_request %p tid %lld not registered\n", 1192 req, req->r_tid); 1193 return; 1194 } 1195 1196 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1197 erase_request(&osdc->requests, req); 1198 osdc->num_requests--; 1199 1200 if (req->r_osd) { 1201 /* make sure the original request isn't in flight. */ 1202 ceph_msg_revoke(req->r_request); 1203 1204 list_del_init(&req->r_osd_item); 1205 maybe_move_osd_to_lru(osdc, req->r_osd); 1206 if (list_empty(&req->r_linger_osd_item)) 1207 req->r_osd = NULL; 1208 } 1209 1210 list_del_init(&req->r_req_lru_item); 1211 ceph_osdc_put_request(req); 1212 1213 if (osdc->num_requests == 0) { 1214 dout(" no requests, canceling timeout\n"); 1215 __cancel_osd_timeout(osdc); 1216 } 1217 } 1218 1219 /* 1220 * Cancel a previously queued request message 1221 */ 1222 static void __cancel_request(struct ceph_osd_request *req) 1223 { 1224 if (req->r_sent && req->r_osd) { 1225 ceph_msg_revoke(req->r_request); 1226 req->r_sent = 0; 1227 } 1228 } 1229 1230 static void __register_linger_request(struct ceph_osd_client *osdc, 1231 struct ceph_osd_request *req) 1232 { 1233 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 1234 WARN_ON(!req->r_linger); 1235 1236 ceph_osdc_get_request(req); 1237 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1238 if (req->r_osd) 1239 list_add_tail(&req->r_linger_osd_item, 1240 &req->r_osd->o_linger_requests); 1241 } 1242 1243 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1244 struct ceph_osd_request *req) 1245 { 1246 WARN_ON(!req->r_linger); 1247 1248 if (list_empty(&req->r_linger_item)) { 1249 dout("%s %p tid %llu not registered\n", __func__, req, 1250 req->r_tid); 1251 return; 1252 } 1253 1254 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 1255 list_del_init(&req->r_linger_item); 1256 1257 if (req->r_osd) { 1258 list_del_init(&req->r_linger_osd_item); 1259 maybe_move_osd_to_lru(osdc, req->r_osd); 1260 if (list_empty(&req->r_osd_item)) 1261 req->r_osd = NULL; 1262 } 1263 ceph_osdc_put_request(req); 1264 } 1265 1266 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1267 struct ceph_osd_request *req) 1268 { 1269 if (!req->r_linger) { 1270 dout("set_request_linger %p\n", req); 1271 req->r_linger = 1; 1272 } 1273 } 1274 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1275 1276 /* 1277 * Returns whether a request should be blocked from being sent 1278 * based on the current osdmap and osd_client settings. 1279 * 1280 * Caller should hold map_sem for read. 1281 */ 1282 static bool __req_should_be_paused(struct ceph_osd_client *osdc, 1283 struct ceph_osd_request *req) 1284 { 1285 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); 1286 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || 1287 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 1288 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || 1289 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); 1290 } 1291 1292 /* 1293 * Calculate mapping of a request to a PG. Takes tiering into account. 1294 */ 1295 static int __calc_request_pg(struct ceph_osdmap *osdmap, 1296 struct ceph_osd_request *req, 1297 struct ceph_pg *pg_out) 1298 { 1299 bool need_check_tiering; 1300 1301 need_check_tiering = false; 1302 if (req->r_target_oloc.pool == -1) { 1303 req->r_target_oloc = req->r_base_oloc; /* struct */ 1304 need_check_tiering = true; 1305 } 1306 if (ceph_oid_empty(&req->r_target_oid)) { 1307 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); 1308 need_check_tiering = true; 1309 } 1310 1311 if (need_check_tiering && 1312 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1313 struct ceph_pg_pool_info *pi; 1314 1315 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); 1316 if (pi) { 1317 if ((req->r_flags & CEPH_OSD_FLAG_READ) && 1318 pi->read_tier >= 0) 1319 req->r_target_oloc.pool = pi->read_tier; 1320 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 1321 pi->write_tier >= 0) 1322 req->r_target_oloc.pool = pi->write_tier; 1323 } 1324 /* !pi is caught in ceph_oloc_oid_to_pg() */ 1325 } 1326 1327 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, 1328 &req->r_target_oid, pg_out); 1329 } 1330 1331 static void __enqueue_request(struct ceph_osd_request *req) 1332 { 1333 struct ceph_osd_client *osdc = req->r_osdc; 1334 1335 dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid, 1336 req->r_osd ? req->r_osd->o_osd : -1); 1337 1338 if (req->r_osd) { 1339 __remove_osd_from_lru(req->r_osd); 1340 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1341 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1342 } else { 1343 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1344 } 1345 } 1346 1347 /* 1348 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1349 * (as needed), and set the request r_osd appropriately. If there is 1350 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1351 * (unsent, homeless) or leave on in-flight lru. 1352 * 1353 * Return 0 if unchanged, 1 if changed, or negative on error. 1354 * 1355 * Caller should hold map_sem for read and request_mutex. 1356 */ 1357 static int __map_request(struct ceph_osd_client *osdc, 1358 struct ceph_osd_request *req, int force_resend) 1359 { 1360 struct ceph_pg pgid; 1361 int acting[CEPH_PG_MAX_SIZE]; 1362 int num, o; 1363 int err; 1364 bool was_paused; 1365 1366 dout("map_request %p tid %lld\n", req, req->r_tid); 1367 1368 err = __calc_request_pg(osdc->osdmap, req, &pgid); 1369 if (err) { 1370 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1371 return err; 1372 } 1373 req->r_pgid = pgid; 1374 1375 num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o); 1376 if (num < 0) 1377 num = 0; 1378 1379 was_paused = req->r_paused; 1380 req->r_paused = __req_should_be_paused(osdc, req); 1381 if (was_paused && !req->r_paused) 1382 force_resend = 1; 1383 1384 if ((!force_resend && 1385 req->r_osd && req->r_osd->o_osd == o && 1386 req->r_sent >= req->r_osd->o_incarnation && 1387 req->r_num_pg_osds == num && 1388 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1389 (req->r_osd == NULL && o == -1) || 1390 req->r_paused) 1391 return 0; /* no change */ 1392 1393 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1394 req->r_tid, pgid.pool, pgid.seed, o, 1395 req->r_osd ? req->r_osd->o_osd : -1); 1396 1397 /* record full pg acting set */ 1398 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1399 req->r_num_pg_osds = num; 1400 1401 if (req->r_osd) { 1402 __cancel_request(req); 1403 list_del_init(&req->r_osd_item); 1404 list_del_init(&req->r_linger_osd_item); 1405 req->r_osd = NULL; 1406 } 1407 1408 req->r_osd = lookup_osd(&osdc->osds, o); 1409 if (!req->r_osd && o >= 0) { 1410 err = -ENOMEM; 1411 req->r_osd = create_osd(osdc, o); 1412 if (!req->r_osd) { 1413 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1414 goto out; 1415 } 1416 1417 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1418 insert_osd(&osdc->osds, req->r_osd); 1419 1420 ceph_con_open(&req->r_osd->o_con, 1421 CEPH_ENTITY_TYPE_OSD, o, 1422 &osdc->osdmap->osd_addr[o]); 1423 } 1424 1425 __enqueue_request(req); 1426 err = 1; /* osd or pg changed */ 1427 1428 out: 1429 return err; 1430 } 1431 1432 /* 1433 * caller should hold map_sem (for read) and request_mutex 1434 */ 1435 static void __send_request(struct ceph_osd_client *osdc, 1436 struct ceph_osd_request *req) 1437 { 1438 void *p; 1439 1440 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1441 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1442 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1443 1444 /* fill in message content that changes each time we send it */ 1445 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1446 put_unaligned_le32(req->r_flags, req->r_request_flags); 1447 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); 1448 p = req->r_request_pgid; 1449 ceph_encode_64(&p, req->r_pgid.pool); 1450 ceph_encode_32(&p, req->r_pgid.seed); 1451 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1452 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1453 sizeof(req->r_reassert_version)); 1454 1455 req->r_stamp = jiffies; 1456 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1457 1458 ceph_msg_get(req->r_request); /* send consumes a ref */ 1459 1460 req->r_sent = req->r_osd->o_incarnation; 1461 1462 ceph_con_send(&req->r_osd->o_con, req->r_request); 1463 } 1464 1465 /* 1466 * Send any requests in the queue (req_unsent). 1467 */ 1468 static void __send_queued(struct ceph_osd_client *osdc) 1469 { 1470 struct ceph_osd_request *req, *tmp; 1471 1472 dout("__send_queued\n"); 1473 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1474 __send_request(osdc, req); 1475 } 1476 1477 /* 1478 * Caller should hold map_sem for read and request_mutex. 1479 */ 1480 static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, 1481 struct ceph_osd_request *req, 1482 bool nofail) 1483 { 1484 int rc; 1485 1486 __register_request(osdc, req); 1487 req->r_sent = 0; 1488 req->r_got_reply = 0; 1489 rc = __map_request(osdc, req, 0); 1490 if (rc < 0) { 1491 if (nofail) { 1492 dout("osdc_start_request failed map, " 1493 " will retry %lld\n", req->r_tid); 1494 rc = 0; 1495 } else { 1496 __unregister_request(osdc, req); 1497 } 1498 return rc; 1499 } 1500 1501 if (req->r_osd == NULL) { 1502 dout("send_request %p no up osds in pg\n", req); 1503 ceph_monc_request_next_osdmap(&osdc->client->monc); 1504 } else { 1505 __send_queued(osdc); 1506 } 1507 1508 return 0; 1509 } 1510 1511 /* 1512 * Timeout callback, called every N seconds when 1 or more osd 1513 * requests has been active for more than N seconds. When this 1514 * happens, we ping all OSDs with requests who have timed out to 1515 * ensure any communications channel reset is detected. Reset the 1516 * request timeouts another N seconds in the future as we go. 1517 * Reschedule the timeout event another N seconds in future (unless 1518 * there are no open requests). 1519 */ 1520 static void handle_timeout(struct work_struct *work) 1521 { 1522 struct ceph_osd_client *osdc = 1523 container_of(work, struct ceph_osd_client, timeout_work.work); 1524 struct ceph_options *opts = osdc->client->options; 1525 struct ceph_osd_request *req; 1526 struct ceph_osd *osd; 1527 struct list_head slow_osds; 1528 dout("timeout\n"); 1529 down_read(&osdc->map_sem); 1530 1531 ceph_monc_request_next_osdmap(&osdc->client->monc); 1532 1533 mutex_lock(&osdc->request_mutex); 1534 1535 /* 1536 * ping osds that are a bit slow. this ensures that if there 1537 * is a break in the TCP connection we will notice, and reopen 1538 * a connection with that osd (from the fault callback). 1539 */ 1540 INIT_LIST_HEAD(&slow_osds); 1541 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1542 if (time_before(jiffies, 1543 req->r_stamp + opts->osd_keepalive_timeout)) 1544 break; 1545 1546 osd = req->r_osd; 1547 BUG_ON(!osd); 1548 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1549 req->r_tid, osd->o_osd); 1550 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1551 } 1552 while (!list_empty(&slow_osds)) { 1553 osd = list_entry(slow_osds.next, struct ceph_osd, 1554 o_keepalive_item); 1555 list_del_init(&osd->o_keepalive_item); 1556 ceph_con_keepalive(&osd->o_con); 1557 } 1558 1559 __schedule_osd_timeout(osdc); 1560 __send_queued(osdc); 1561 mutex_unlock(&osdc->request_mutex); 1562 up_read(&osdc->map_sem); 1563 } 1564 1565 static void handle_osds_timeout(struct work_struct *work) 1566 { 1567 struct ceph_osd_client *osdc = 1568 container_of(work, struct ceph_osd_client, 1569 osds_timeout_work.work); 1570 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 1571 struct ceph_osd *osd, *nosd; 1572 1573 dout("%s osdc %p\n", __func__, osdc); 1574 down_read(&osdc->map_sem); 1575 mutex_lock(&osdc->request_mutex); 1576 1577 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 1578 if (time_before(jiffies, osd->lru_ttl)) 1579 break; 1580 1581 remove_osd(osdc, osd); 1582 } 1583 1584 mutex_unlock(&osdc->request_mutex); 1585 up_read(&osdc->map_sem); 1586 schedule_delayed_work(&osdc->osds_timeout_work, 1587 round_jiffies_relative(delay)); 1588 } 1589 1590 static int ceph_oloc_decode(void **p, void *end, 1591 struct ceph_object_locator *oloc) 1592 { 1593 u8 struct_v, struct_cv; 1594 u32 len; 1595 void *struct_end; 1596 int ret = 0; 1597 1598 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1599 struct_v = ceph_decode_8(p); 1600 struct_cv = ceph_decode_8(p); 1601 if (struct_v < 3) { 1602 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 1603 struct_v, struct_cv); 1604 goto e_inval; 1605 } 1606 if (struct_cv > 6) { 1607 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 1608 struct_v, struct_cv); 1609 goto e_inval; 1610 } 1611 len = ceph_decode_32(p); 1612 ceph_decode_need(p, end, len, e_inval); 1613 struct_end = *p + len; 1614 1615 oloc->pool = ceph_decode_64(p); 1616 *p += 4; /* skip preferred */ 1617 1618 len = ceph_decode_32(p); 1619 if (len > 0) { 1620 pr_warn("ceph_object_locator::key is set\n"); 1621 goto e_inval; 1622 } 1623 1624 if (struct_v >= 5) { 1625 len = ceph_decode_32(p); 1626 if (len > 0) { 1627 pr_warn("ceph_object_locator::nspace is set\n"); 1628 goto e_inval; 1629 } 1630 } 1631 1632 if (struct_v >= 6) { 1633 s64 hash = ceph_decode_64(p); 1634 if (hash != -1) { 1635 pr_warn("ceph_object_locator::hash is set\n"); 1636 goto e_inval; 1637 } 1638 } 1639 1640 /* skip the rest */ 1641 *p = struct_end; 1642 out: 1643 return ret; 1644 1645 e_inval: 1646 ret = -EINVAL; 1647 goto out; 1648 } 1649 1650 static int ceph_redirect_decode(void **p, void *end, 1651 struct ceph_request_redirect *redir) 1652 { 1653 u8 struct_v, struct_cv; 1654 u32 len; 1655 void *struct_end; 1656 int ret; 1657 1658 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1659 struct_v = ceph_decode_8(p); 1660 struct_cv = ceph_decode_8(p); 1661 if (struct_cv > 1) { 1662 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 1663 struct_v, struct_cv); 1664 goto e_inval; 1665 } 1666 len = ceph_decode_32(p); 1667 ceph_decode_need(p, end, len, e_inval); 1668 struct_end = *p + len; 1669 1670 ret = ceph_oloc_decode(p, end, &redir->oloc); 1671 if (ret) 1672 goto out; 1673 1674 len = ceph_decode_32(p); 1675 if (len > 0) { 1676 pr_warn("ceph_request_redirect::object_name is set\n"); 1677 goto e_inval; 1678 } 1679 1680 len = ceph_decode_32(p); 1681 *p += len; /* skip osd_instructions */ 1682 1683 /* skip the rest */ 1684 *p = struct_end; 1685 out: 1686 return ret; 1687 1688 e_inval: 1689 ret = -EINVAL; 1690 goto out; 1691 } 1692 1693 static void complete_request(struct ceph_osd_request *req) 1694 { 1695 complete_all(&req->r_safe_completion); /* fsync waiter */ 1696 } 1697 1698 /* 1699 * handle osd op reply. either call the callback if it is specified, 1700 * or do the completion to wake up the waiting thread. 1701 */ 1702 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1703 { 1704 void *p, *end; 1705 struct ceph_osd_request *req; 1706 struct ceph_request_redirect redir; 1707 u64 tid; 1708 int object_len; 1709 unsigned int numops; 1710 int payload_len, flags; 1711 s32 result; 1712 s32 retry_attempt; 1713 struct ceph_pg pg; 1714 int err; 1715 u32 reassert_epoch; 1716 u64 reassert_version; 1717 u32 osdmap_epoch; 1718 int already_completed; 1719 u32 bytes; 1720 u8 decode_redir; 1721 unsigned int i; 1722 1723 tid = le64_to_cpu(msg->hdr.tid); 1724 dout("handle_reply %p tid %llu\n", msg, tid); 1725 1726 p = msg->front.iov_base; 1727 end = p + msg->front.iov_len; 1728 1729 ceph_decode_need(&p, end, 4, bad); 1730 object_len = ceph_decode_32(&p); 1731 ceph_decode_need(&p, end, object_len, bad); 1732 p += object_len; 1733 1734 err = ceph_decode_pgid(&p, end, &pg); 1735 if (err) 1736 goto bad; 1737 1738 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1739 flags = ceph_decode_64(&p); 1740 result = ceph_decode_32(&p); 1741 reassert_epoch = ceph_decode_32(&p); 1742 reassert_version = ceph_decode_64(&p); 1743 osdmap_epoch = ceph_decode_32(&p); 1744 1745 /* lookup */ 1746 down_read(&osdc->map_sem); 1747 mutex_lock(&osdc->request_mutex); 1748 req = lookup_request(&osdc->requests, tid); 1749 if (req == NULL) { 1750 dout("handle_reply tid %llu dne\n", tid); 1751 goto bad_mutex; 1752 } 1753 ceph_osdc_get_request(req); 1754 1755 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1756 req, result); 1757 1758 ceph_decode_need(&p, end, 4, bad_put); 1759 numops = ceph_decode_32(&p); 1760 if (numops > CEPH_OSD_MAX_OPS) 1761 goto bad_put; 1762 if (numops != req->r_num_ops) 1763 goto bad_put; 1764 payload_len = 0; 1765 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); 1766 for (i = 0; i < numops; i++) { 1767 struct ceph_osd_op *op = p; 1768 int len; 1769 1770 len = le32_to_cpu(op->payload_len); 1771 req->r_ops[i].outdata_len = len; 1772 dout(" op %d has %d bytes\n", i, len); 1773 payload_len += len; 1774 p += sizeof(*op); 1775 } 1776 bytes = le32_to_cpu(msg->hdr.data_len); 1777 if (payload_len != bytes) { 1778 pr_warn("sum of op payload lens %d != data_len %d\n", 1779 payload_len, bytes); 1780 goto bad_put; 1781 } 1782 1783 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1784 retry_attempt = ceph_decode_32(&p); 1785 for (i = 0; i < numops; i++) 1786 req->r_ops[i].rval = ceph_decode_32(&p); 1787 1788 if (le16_to_cpu(msg->hdr.version) >= 6) { 1789 p += 8 + 4; /* skip replay_version */ 1790 p += 8; /* skip user_version */ 1791 1792 if (le16_to_cpu(msg->hdr.version) >= 7) 1793 ceph_decode_8_safe(&p, end, decode_redir, bad_put); 1794 else 1795 decode_redir = 1; 1796 } else { 1797 decode_redir = 0; 1798 } 1799 1800 if (decode_redir) { 1801 err = ceph_redirect_decode(&p, end, &redir); 1802 if (err) 1803 goto bad_put; 1804 } else { 1805 redir.oloc.pool = -1; 1806 } 1807 1808 if (redir.oloc.pool != -1) { 1809 dout("redirect pool %lld\n", redir.oloc.pool); 1810 1811 __unregister_request(osdc, req); 1812 1813 req->r_target_oloc = redir.oloc; /* struct */ 1814 1815 /* 1816 * Start redirect requests with nofail=true. If 1817 * mapping fails, request will end up on the notarget 1818 * list, waiting for the new osdmap (which can take 1819 * a while), even though the original request mapped 1820 * successfully. In the future we might want to follow 1821 * original request's nofail setting here. 1822 */ 1823 err = __ceph_osdc_start_request(osdc, req, true); 1824 BUG_ON(err); 1825 1826 goto out_unlock; 1827 } 1828 1829 already_completed = req->r_got_reply; 1830 if (!req->r_got_reply) { 1831 req->r_result = result; 1832 dout("handle_reply result %d bytes %d\n", req->r_result, 1833 bytes); 1834 if (req->r_result == 0) 1835 req->r_result = bytes; 1836 1837 /* in case this is a write and we need to replay, */ 1838 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1839 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1840 1841 req->r_got_reply = 1; 1842 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1843 dout("handle_reply tid %llu dup ack\n", tid); 1844 goto out_unlock; 1845 } 1846 1847 dout("handle_reply tid %llu flags %d\n", tid, flags); 1848 1849 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1850 __register_linger_request(osdc, req); 1851 1852 /* either this is a read, or we got the safe response */ 1853 if (result < 0 || 1854 (flags & CEPH_OSD_FLAG_ONDISK) || 1855 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1856 __unregister_request(osdc, req); 1857 1858 mutex_unlock(&osdc->request_mutex); 1859 up_read(&osdc->map_sem); 1860 1861 if (!already_completed) { 1862 if (req->r_unsafe_callback && 1863 result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) 1864 req->r_unsafe_callback(req, true); 1865 if (req->r_callback) 1866 req->r_callback(req, msg); 1867 else 1868 complete_all(&req->r_completion); 1869 } 1870 1871 if (flags & CEPH_OSD_FLAG_ONDISK) { 1872 if (req->r_unsafe_callback && already_completed) 1873 req->r_unsafe_callback(req, false); 1874 complete_request(req); 1875 } 1876 1877 out: 1878 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1879 ceph_osdc_put_request(req); 1880 return; 1881 out_unlock: 1882 mutex_unlock(&osdc->request_mutex); 1883 up_read(&osdc->map_sem); 1884 goto out; 1885 1886 bad_put: 1887 req->r_result = -EIO; 1888 __unregister_request(osdc, req); 1889 if (req->r_callback) 1890 req->r_callback(req, msg); 1891 else 1892 complete_all(&req->r_completion); 1893 complete_request(req); 1894 ceph_osdc_put_request(req); 1895 bad_mutex: 1896 mutex_unlock(&osdc->request_mutex); 1897 up_read(&osdc->map_sem); 1898 bad: 1899 pr_err("corrupt osd_op_reply got %d %d\n", 1900 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1901 ceph_msg_dump(msg); 1902 } 1903 1904 static void reset_changed_osds(struct ceph_osd_client *osdc) 1905 { 1906 struct rb_node *p, *n; 1907 1908 dout("%s %p\n", __func__, osdc); 1909 for (p = rb_first(&osdc->osds); p; p = n) { 1910 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1911 1912 n = rb_next(p); 1913 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1914 memcmp(&osd->o_con.peer_addr, 1915 ceph_osd_addr(osdc->osdmap, 1916 osd->o_osd), 1917 sizeof(struct ceph_entity_addr)) != 0) 1918 __reset_osd(osdc, osd); 1919 } 1920 } 1921 1922 /* 1923 * Requeue requests whose mapping to an OSD has changed. If requests map to 1924 * no osd, request a new map. 1925 * 1926 * Caller should hold map_sem for read. 1927 */ 1928 static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, 1929 bool force_resend_writes) 1930 { 1931 struct ceph_osd_request *req, *nreq; 1932 struct rb_node *p; 1933 int needmap = 0; 1934 int err; 1935 bool force_resend_req; 1936 1937 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", 1938 force_resend_writes ? " (force resend writes)" : ""); 1939 mutex_lock(&osdc->request_mutex); 1940 for (p = rb_first(&osdc->requests); p; ) { 1941 req = rb_entry(p, struct ceph_osd_request, r_node); 1942 p = rb_next(p); 1943 1944 /* 1945 * For linger requests that have not yet been 1946 * registered, move them to the linger list; they'll 1947 * be sent to the osd in the loop below. Unregister 1948 * the request before re-registering it as a linger 1949 * request to ensure the __map_request() below 1950 * will decide it needs to be sent. 1951 */ 1952 if (req->r_linger && list_empty(&req->r_linger_item)) { 1953 dout("%p tid %llu restart on osd%d\n", 1954 req, req->r_tid, 1955 req->r_osd ? req->r_osd->o_osd : -1); 1956 ceph_osdc_get_request(req); 1957 __unregister_request(osdc, req); 1958 __register_linger_request(osdc, req); 1959 ceph_osdc_put_request(req); 1960 continue; 1961 } 1962 1963 force_resend_req = force_resend || 1964 (force_resend_writes && 1965 req->r_flags & CEPH_OSD_FLAG_WRITE); 1966 err = __map_request(osdc, req, force_resend_req); 1967 if (err < 0) 1968 continue; /* error */ 1969 if (req->r_osd == NULL) { 1970 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1971 needmap++; /* request a newer map */ 1972 } else if (err > 0) { 1973 if (!req->r_linger) { 1974 dout("%p tid %llu requeued on osd%d\n", req, 1975 req->r_tid, 1976 req->r_osd ? req->r_osd->o_osd : -1); 1977 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1978 } 1979 } 1980 } 1981 1982 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1983 r_linger_item) { 1984 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1985 1986 err = __map_request(osdc, req, 1987 force_resend || force_resend_writes); 1988 dout("__map_request returned %d\n", err); 1989 if (err < 0) 1990 continue; /* hrm! */ 1991 if (req->r_osd == NULL || err > 0) { 1992 if (req->r_osd == NULL) { 1993 dout("lingering %p tid %llu maps to no osd\n", 1994 req, req->r_tid); 1995 /* 1996 * A homeless lingering request makes 1997 * no sense, as it's job is to keep 1998 * a particular OSD connection open. 1999 * Request a newer map and kick the 2000 * request, knowing that it won't be 2001 * resent until we actually get a map 2002 * that can tell us where to send it. 2003 */ 2004 needmap++; 2005 } 2006 2007 dout("kicking lingering %p tid %llu osd%d\n", req, 2008 req->r_tid, req->r_osd ? req->r_osd->o_osd : -1); 2009 __register_request(osdc, req); 2010 __unregister_linger_request(osdc, req); 2011 } 2012 } 2013 reset_changed_osds(osdc); 2014 mutex_unlock(&osdc->request_mutex); 2015 2016 if (needmap) { 2017 dout("%d requests for down osds, need new map\n", needmap); 2018 ceph_monc_request_next_osdmap(&osdc->client->monc); 2019 } 2020 } 2021 2022 2023 /* 2024 * Process updated osd map. 2025 * 2026 * The message contains any number of incremental and full maps, normally 2027 * indicating some sort of topology change in the cluster. Kick requests 2028 * off to different OSDs as needed. 2029 */ 2030 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 2031 { 2032 void *p, *end, *next; 2033 u32 nr_maps, maplen; 2034 u32 epoch; 2035 struct ceph_osdmap *newmap = NULL, *oldmap; 2036 int err; 2037 struct ceph_fsid fsid; 2038 bool was_full; 2039 2040 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 2041 p = msg->front.iov_base; 2042 end = p + msg->front.iov_len; 2043 2044 /* verify fsid */ 2045 ceph_decode_need(&p, end, sizeof(fsid), bad); 2046 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 2047 if (ceph_check_fsid(osdc->client, &fsid) < 0) 2048 return; 2049 2050 down_write(&osdc->map_sem); 2051 2052 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 2053 2054 /* incremental maps */ 2055 ceph_decode_32_safe(&p, end, nr_maps, bad); 2056 dout(" %d inc maps\n", nr_maps); 2057 while (nr_maps > 0) { 2058 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2059 epoch = ceph_decode_32(&p); 2060 maplen = ceph_decode_32(&p); 2061 ceph_decode_need(&p, end, maplen, bad); 2062 next = p + maplen; 2063 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 2064 dout("applying incremental map %u len %d\n", 2065 epoch, maplen); 2066 newmap = osdmap_apply_incremental(&p, next, 2067 osdc->osdmap); 2068 if (IS_ERR(newmap)) { 2069 err = PTR_ERR(newmap); 2070 goto bad; 2071 } 2072 BUG_ON(!newmap); 2073 if (newmap != osdc->osdmap) { 2074 ceph_osdmap_destroy(osdc->osdmap); 2075 osdc->osdmap = newmap; 2076 } 2077 was_full = was_full || 2078 ceph_osdmap_flag(osdc->osdmap, 2079 CEPH_OSDMAP_FULL); 2080 kick_requests(osdc, 0, was_full); 2081 } else { 2082 dout("ignoring incremental map %u len %d\n", 2083 epoch, maplen); 2084 } 2085 p = next; 2086 nr_maps--; 2087 } 2088 if (newmap) 2089 goto done; 2090 2091 /* full maps */ 2092 ceph_decode_32_safe(&p, end, nr_maps, bad); 2093 dout(" %d full maps\n", nr_maps); 2094 while (nr_maps) { 2095 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2096 epoch = ceph_decode_32(&p); 2097 maplen = ceph_decode_32(&p); 2098 ceph_decode_need(&p, end, maplen, bad); 2099 if (nr_maps > 1) { 2100 dout("skipping non-latest full map %u len %d\n", 2101 epoch, maplen); 2102 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 2103 dout("skipping full map %u len %d, " 2104 "older than our %u\n", epoch, maplen, 2105 osdc->osdmap->epoch); 2106 } else { 2107 int skipped_map = 0; 2108 2109 dout("taking full map %u len %d\n", epoch, maplen); 2110 newmap = ceph_osdmap_decode(&p, p+maplen); 2111 if (IS_ERR(newmap)) { 2112 err = PTR_ERR(newmap); 2113 goto bad; 2114 } 2115 BUG_ON(!newmap); 2116 oldmap = osdc->osdmap; 2117 osdc->osdmap = newmap; 2118 if (oldmap) { 2119 if (oldmap->epoch + 1 < newmap->epoch) 2120 skipped_map = 1; 2121 ceph_osdmap_destroy(oldmap); 2122 } 2123 was_full = was_full || 2124 ceph_osdmap_flag(osdc->osdmap, 2125 CEPH_OSDMAP_FULL); 2126 kick_requests(osdc, skipped_map, was_full); 2127 } 2128 p += maplen; 2129 nr_maps--; 2130 } 2131 2132 if (!osdc->osdmap) 2133 goto bad; 2134 done: 2135 downgrade_write(&osdc->map_sem); 2136 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 2137 osdc->osdmap->epoch); 2138 2139 /* 2140 * subscribe to subsequent osdmap updates if full to ensure 2141 * we find out when we are no longer full and stop returning 2142 * ENOSPC. 2143 */ 2144 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || 2145 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || 2146 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) 2147 ceph_monc_request_next_osdmap(&osdc->client->monc); 2148 2149 mutex_lock(&osdc->request_mutex); 2150 __send_queued(osdc); 2151 mutex_unlock(&osdc->request_mutex); 2152 up_read(&osdc->map_sem); 2153 wake_up_all(&osdc->client->auth_wq); 2154 return; 2155 2156 bad: 2157 pr_err("osdc handle_map corrupt msg\n"); 2158 ceph_msg_dump(msg); 2159 up_write(&osdc->map_sem); 2160 } 2161 2162 /* 2163 * watch/notify callback event infrastructure 2164 * 2165 * These callbacks are used both for watch and notify operations. 2166 */ 2167 static void __release_event(struct kref *kref) 2168 { 2169 struct ceph_osd_event *event = 2170 container_of(kref, struct ceph_osd_event, kref); 2171 2172 dout("__release_event %p\n", event); 2173 kfree(event); 2174 } 2175 2176 static void get_event(struct ceph_osd_event *event) 2177 { 2178 kref_get(&event->kref); 2179 } 2180 2181 void ceph_osdc_put_event(struct ceph_osd_event *event) 2182 { 2183 kref_put(&event->kref, __release_event); 2184 } 2185 EXPORT_SYMBOL(ceph_osdc_put_event); 2186 2187 static void __insert_event(struct ceph_osd_client *osdc, 2188 struct ceph_osd_event *new) 2189 { 2190 struct rb_node **p = &osdc->event_tree.rb_node; 2191 struct rb_node *parent = NULL; 2192 struct ceph_osd_event *event = NULL; 2193 2194 while (*p) { 2195 parent = *p; 2196 event = rb_entry(parent, struct ceph_osd_event, node); 2197 if (new->cookie < event->cookie) 2198 p = &(*p)->rb_left; 2199 else if (new->cookie > event->cookie) 2200 p = &(*p)->rb_right; 2201 else 2202 BUG(); 2203 } 2204 2205 rb_link_node(&new->node, parent, p); 2206 rb_insert_color(&new->node, &osdc->event_tree); 2207 } 2208 2209 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 2210 u64 cookie) 2211 { 2212 struct rb_node **p = &osdc->event_tree.rb_node; 2213 struct rb_node *parent = NULL; 2214 struct ceph_osd_event *event = NULL; 2215 2216 while (*p) { 2217 parent = *p; 2218 event = rb_entry(parent, struct ceph_osd_event, node); 2219 if (cookie < event->cookie) 2220 p = &(*p)->rb_left; 2221 else if (cookie > event->cookie) 2222 p = &(*p)->rb_right; 2223 else 2224 return event; 2225 } 2226 return NULL; 2227 } 2228 2229 static void __remove_event(struct ceph_osd_event *event) 2230 { 2231 struct ceph_osd_client *osdc = event->osdc; 2232 2233 if (!RB_EMPTY_NODE(&event->node)) { 2234 dout("__remove_event removed %p\n", event); 2235 rb_erase(&event->node, &osdc->event_tree); 2236 ceph_osdc_put_event(event); 2237 } else { 2238 dout("__remove_event didn't remove %p\n", event); 2239 } 2240 } 2241 2242 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 2243 void (*event_cb)(u64, u64, u8, void *), 2244 void *data, struct ceph_osd_event **pevent) 2245 { 2246 struct ceph_osd_event *event; 2247 2248 event = kmalloc(sizeof(*event), GFP_NOIO); 2249 if (!event) 2250 return -ENOMEM; 2251 2252 dout("create_event %p\n", event); 2253 event->cb = event_cb; 2254 event->one_shot = 0; 2255 event->data = data; 2256 event->osdc = osdc; 2257 INIT_LIST_HEAD(&event->osd_node); 2258 RB_CLEAR_NODE(&event->node); 2259 kref_init(&event->kref); /* one ref for us */ 2260 kref_get(&event->kref); /* one ref for the caller */ 2261 2262 spin_lock(&osdc->event_lock); 2263 event->cookie = ++osdc->event_count; 2264 __insert_event(osdc, event); 2265 spin_unlock(&osdc->event_lock); 2266 2267 *pevent = event; 2268 return 0; 2269 } 2270 EXPORT_SYMBOL(ceph_osdc_create_event); 2271 2272 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 2273 { 2274 struct ceph_osd_client *osdc = event->osdc; 2275 2276 dout("cancel_event %p\n", event); 2277 spin_lock(&osdc->event_lock); 2278 __remove_event(event); 2279 spin_unlock(&osdc->event_lock); 2280 ceph_osdc_put_event(event); /* caller's */ 2281 } 2282 EXPORT_SYMBOL(ceph_osdc_cancel_event); 2283 2284 2285 static void do_event_work(struct work_struct *work) 2286 { 2287 struct ceph_osd_event_work *event_work = 2288 container_of(work, struct ceph_osd_event_work, work); 2289 struct ceph_osd_event *event = event_work->event; 2290 u64 ver = event_work->ver; 2291 u64 notify_id = event_work->notify_id; 2292 u8 opcode = event_work->opcode; 2293 2294 dout("do_event_work completing %p\n", event); 2295 event->cb(ver, notify_id, opcode, event->data); 2296 dout("do_event_work completed %p\n", event); 2297 ceph_osdc_put_event(event); 2298 kfree(event_work); 2299 } 2300 2301 2302 /* 2303 * Process osd watch notifications 2304 */ 2305 static void handle_watch_notify(struct ceph_osd_client *osdc, 2306 struct ceph_msg *msg) 2307 { 2308 void *p, *end; 2309 u8 proto_ver; 2310 u64 cookie, ver, notify_id; 2311 u8 opcode; 2312 struct ceph_osd_event *event; 2313 struct ceph_osd_event_work *event_work; 2314 2315 p = msg->front.iov_base; 2316 end = p + msg->front.iov_len; 2317 2318 ceph_decode_8_safe(&p, end, proto_ver, bad); 2319 ceph_decode_8_safe(&p, end, opcode, bad); 2320 ceph_decode_64_safe(&p, end, cookie, bad); 2321 ceph_decode_64_safe(&p, end, ver, bad); 2322 ceph_decode_64_safe(&p, end, notify_id, bad); 2323 2324 spin_lock(&osdc->event_lock); 2325 event = __find_event(osdc, cookie); 2326 if (event) { 2327 BUG_ON(event->one_shot); 2328 get_event(event); 2329 } 2330 spin_unlock(&osdc->event_lock); 2331 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 2332 cookie, ver, event); 2333 if (event) { 2334 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 2335 if (!event_work) { 2336 pr_err("couldn't allocate event_work\n"); 2337 ceph_osdc_put_event(event); 2338 return; 2339 } 2340 INIT_WORK(&event_work->work, do_event_work); 2341 event_work->event = event; 2342 event_work->ver = ver; 2343 event_work->notify_id = notify_id; 2344 event_work->opcode = opcode; 2345 2346 queue_work(osdc->notify_wq, &event_work->work); 2347 } 2348 2349 return; 2350 2351 bad: 2352 pr_err("osdc handle_watch_notify corrupt msg\n"); 2353 } 2354 2355 /* 2356 * build new request AND message 2357 * 2358 */ 2359 void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, 2360 struct ceph_snap_context *snapc, u64 snap_id, 2361 struct timespec *mtime) 2362 { 2363 struct ceph_msg *msg = req->r_request; 2364 void *p; 2365 size_t msg_size; 2366 int flags = req->r_flags; 2367 u64 data_len; 2368 unsigned int i; 2369 2370 req->r_snapid = snap_id; 2371 WARN_ON(snapc != req->r_snapc); 2372 2373 /* encode request */ 2374 msg->hdr.version = cpu_to_le16(4); 2375 2376 p = msg->front.iov_base; 2377 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 2378 req->r_request_osdmap_epoch = p; 2379 p += 4; 2380 req->r_request_flags = p; 2381 p += 4; 2382 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 2383 ceph_encode_timespec(p, mtime); 2384 p += sizeof(struct ceph_timespec); 2385 req->r_request_reassert_version = p; 2386 p += sizeof(struct ceph_eversion); /* will get filled in */ 2387 2388 /* oloc */ 2389 ceph_encode_8(&p, 4); 2390 ceph_encode_8(&p, 4); 2391 ceph_encode_32(&p, 8 + 4 + 4); 2392 req->r_request_pool = p; 2393 p += 8; 2394 ceph_encode_32(&p, -1); /* preferred */ 2395 ceph_encode_32(&p, 0); /* key len */ 2396 2397 ceph_encode_8(&p, 1); 2398 req->r_request_pgid = p; 2399 p += 8 + 4; 2400 ceph_encode_32(&p, -1); /* preferred */ 2401 2402 /* oid */ 2403 ceph_encode_32(&p, req->r_base_oid.name_len); 2404 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); 2405 dout("oid %*pE len %d\n", req->r_base_oid.name_len, 2406 req->r_base_oid.name, req->r_base_oid.name_len); 2407 p += req->r_base_oid.name_len; 2408 2409 /* ops--can imply data */ 2410 ceph_encode_16(&p, (u16)req->r_num_ops); 2411 data_len = 0; 2412 for (i = 0; i < req->r_num_ops; i++) { 2413 data_len += osd_req_encode_op(req, p, i); 2414 p += sizeof(struct ceph_osd_op); 2415 } 2416 2417 /* snaps */ 2418 ceph_encode_64(&p, req->r_snapid); 2419 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 2420 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 2421 if (req->r_snapc) { 2422 for (i = 0; i < req->r_snapc->num_snaps; i++) { 2423 ceph_encode_64(&p, req->r_snapc->snaps[i]); 2424 } 2425 } 2426 2427 req->r_request_attempts = p; 2428 p += 4; 2429 2430 /* data */ 2431 if (flags & CEPH_OSD_FLAG_WRITE) { 2432 u16 data_off; 2433 2434 /* 2435 * The header "data_off" is a hint to the receiver 2436 * allowing it to align received data into its 2437 * buffers such that there's no need to re-copy 2438 * it before writing it to disk (direct I/O). 2439 */ 2440 data_off = (u16) (off & 0xffff); 2441 req->r_request->hdr.data_off = cpu_to_le16(data_off); 2442 } 2443 req->r_request->hdr.data_len = cpu_to_le32(data_len); 2444 2445 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 2446 msg_size = p - msg->front.iov_base; 2447 msg->front.iov_len = msg_size; 2448 msg->hdr.front_len = cpu_to_le32(msg_size); 2449 2450 dout("build_request msg_size was %d\n", (int)msg_size); 2451 } 2452 EXPORT_SYMBOL(ceph_osdc_build_request); 2453 2454 /* 2455 * Register request, send initial attempt. 2456 */ 2457 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2458 struct ceph_osd_request *req, 2459 bool nofail) 2460 { 2461 int rc; 2462 2463 down_read(&osdc->map_sem); 2464 mutex_lock(&osdc->request_mutex); 2465 2466 rc = __ceph_osdc_start_request(osdc, req, nofail); 2467 2468 mutex_unlock(&osdc->request_mutex); 2469 up_read(&osdc->map_sem); 2470 2471 return rc; 2472 } 2473 EXPORT_SYMBOL(ceph_osdc_start_request); 2474 2475 /* 2476 * Unregister a registered request. The request is not completed (i.e. 2477 * no callbacks or wakeups) - higher layers are supposed to know what 2478 * they are canceling. 2479 */ 2480 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 2481 { 2482 struct ceph_osd_client *osdc = req->r_osdc; 2483 2484 mutex_lock(&osdc->request_mutex); 2485 if (req->r_linger) 2486 __unregister_linger_request(osdc, req); 2487 __unregister_request(osdc, req); 2488 mutex_unlock(&osdc->request_mutex); 2489 2490 dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid); 2491 } 2492 EXPORT_SYMBOL(ceph_osdc_cancel_request); 2493 2494 /* 2495 * wait for a request to complete 2496 */ 2497 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 2498 struct ceph_osd_request *req) 2499 { 2500 int rc; 2501 2502 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 2503 2504 rc = wait_for_completion_interruptible(&req->r_completion); 2505 if (rc < 0) { 2506 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); 2507 ceph_osdc_cancel_request(req); 2508 complete_request(req); 2509 return rc; 2510 } 2511 2512 dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid, 2513 req->r_result); 2514 return req->r_result; 2515 } 2516 EXPORT_SYMBOL(ceph_osdc_wait_request); 2517 2518 /* 2519 * sync - wait for all in-flight requests to flush. avoid starvation. 2520 */ 2521 void ceph_osdc_sync(struct ceph_osd_client *osdc) 2522 { 2523 struct ceph_osd_request *req; 2524 u64 last_tid, next_tid = 0; 2525 2526 mutex_lock(&osdc->request_mutex); 2527 last_tid = osdc->last_tid; 2528 while (1) { 2529 req = __lookup_request_ge(osdc, next_tid); 2530 if (!req) 2531 break; 2532 if (req->r_tid > last_tid) 2533 break; 2534 2535 next_tid = req->r_tid + 1; 2536 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 2537 continue; 2538 2539 ceph_osdc_get_request(req); 2540 mutex_unlock(&osdc->request_mutex); 2541 dout("sync waiting on tid %llu (last is %llu)\n", 2542 req->r_tid, last_tid); 2543 wait_for_completion(&req->r_safe_completion); 2544 mutex_lock(&osdc->request_mutex); 2545 ceph_osdc_put_request(req); 2546 } 2547 mutex_unlock(&osdc->request_mutex); 2548 dout("sync done (thru tid %llu)\n", last_tid); 2549 } 2550 EXPORT_SYMBOL(ceph_osdc_sync); 2551 2552 /* 2553 * Call all pending notify callbacks - for use after a watch is 2554 * unregistered, to make sure no more callbacks for it will be invoked 2555 */ 2556 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 2557 { 2558 flush_workqueue(osdc->notify_wq); 2559 } 2560 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 2561 2562 2563 /* 2564 * init, shutdown 2565 */ 2566 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 2567 { 2568 int err; 2569 2570 dout("init\n"); 2571 osdc->client = client; 2572 osdc->osdmap = NULL; 2573 init_rwsem(&osdc->map_sem); 2574 mutex_init(&osdc->request_mutex); 2575 osdc->last_tid = 0; 2576 osdc->osds = RB_ROOT; 2577 INIT_LIST_HEAD(&osdc->osd_lru); 2578 osdc->requests = RB_ROOT; 2579 INIT_LIST_HEAD(&osdc->req_lru); 2580 INIT_LIST_HEAD(&osdc->req_unsent); 2581 INIT_LIST_HEAD(&osdc->req_notarget); 2582 INIT_LIST_HEAD(&osdc->req_linger); 2583 osdc->num_requests = 0; 2584 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 2585 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 2586 spin_lock_init(&osdc->event_lock); 2587 osdc->event_tree = RB_ROOT; 2588 osdc->event_count = 0; 2589 2590 schedule_delayed_work(&osdc->osds_timeout_work, 2591 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 2592 2593 err = -ENOMEM; 2594 osdc->req_mempool = mempool_create_slab_pool(10, 2595 ceph_osd_request_cache); 2596 if (!osdc->req_mempool) 2597 goto out; 2598 2599 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2600 PAGE_SIZE, 10, true, "osd_op"); 2601 if (err < 0) 2602 goto out_mempool; 2603 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2604 PAGE_SIZE, 10, true, "osd_op_reply"); 2605 if (err < 0) 2606 goto out_msgpool; 2607 2608 err = -ENOMEM; 2609 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2610 if (!osdc->notify_wq) 2611 goto out_msgpool_reply; 2612 2613 return 0; 2614 2615 out_msgpool_reply: 2616 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2617 out_msgpool: 2618 ceph_msgpool_destroy(&osdc->msgpool_op); 2619 out_mempool: 2620 mempool_destroy(osdc->req_mempool); 2621 out: 2622 return err; 2623 } 2624 2625 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2626 { 2627 flush_workqueue(osdc->notify_wq); 2628 destroy_workqueue(osdc->notify_wq); 2629 cancel_delayed_work_sync(&osdc->timeout_work); 2630 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2631 2632 mutex_lock(&osdc->request_mutex); 2633 while (!RB_EMPTY_ROOT(&osdc->osds)) { 2634 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 2635 struct ceph_osd, o_node); 2636 remove_osd(osdc, osd); 2637 } 2638 mutex_unlock(&osdc->request_mutex); 2639 2640 if (osdc->osdmap) { 2641 ceph_osdmap_destroy(osdc->osdmap); 2642 osdc->osdmap = NULL; 2643 } 2644 mempool_destroy(osdc->req_mempool); 2645 ceph_msgpool_destroy(&osdc->msgpool_op); 2646 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2647 } 2648 2649 /* 2650 * Read some contiguous pages. If we cross a stripe boundary, shorten 2651 * *plen. Return number of bytes read, or error. 2652 */ 2653 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2654 struct ceph_vino vino, struct ceph_file_layout *layout, 2655 u64 off, u64 *plen, 2656 u32 truncate_seq, u64 truncate_size, 2657 struct page **pages, int num_pages, int page_align) 2658 { 2659 struct ceph_osd_request *req; 2660 int rc = 0; 2661 2662 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2663 vino.snap, off, *plen); 2664 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, 2665 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2666 NULL, truncate_seq, truncate_size, 2667 false); 2668 if (IS_ERR(req)) 2669 return PTR_ERR(req); 2670 2671 /* it may be a short read due to an object boundary */ 2672 2673 osd_req_op_extent_osd_data_pages(req, 0, 2674 pages, *plen, page_align, false, false); 2675 2676 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2677 off, *plen, *plen, page_align); 2678 2679 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); 2680 2681 rc = ceph_osdc_start_request(osdc, req, false); 2682 if (!rc) 2683 rc = ceph_osdc_wait_request(osdc, req); 2684 2685 ceph_osdc_put_request(req); 2686 dout("readpages result %d\n", rc); 2687 return rc; 2688 } 2689 EXPORT_SYMBOL(ceph_osdc_readpages); 2690 2691 /* 2692 * do a synchronous write on N pages 2693 */ 2694 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2695 struct ceph_file_layout *layout, 2696 struct ceph_snap_context *snapc, 2697 u64 off, u64 len, 2698 u32 truncate_seq, u64 truncate_size, 2699 struct timespec *mtime, 2700 struct page **pages, int num_pages) 2701 { 2702 struct ceph_osd_request *req; 2703 int rc = 0; 2704 int page_align = off & ~PAGE_MASK; 2705 2706 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ 2707 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 2708 CEPH_OSD_OP_WRITE, 2709 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2710 snapc, truncate_seq, truncate_size, 2711 true); 2712 if (IS_ERR(req)) 2713 return PTR_ERR(req); 2714 2715 /* it may be a short write due to an object boundary */ 2716 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 2717 false, false); 2718 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2719 2720 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2721 2722 rc = ceph_osdc_start_request(osdc, req, true); 2723 if (!rc) 2724 rc = ceph_osdc_wait_request(osdc, req); 2725 2726 ceph_osdc_put_request(req); 2727 if (rc == 0) 2728 rc = len; 2729 dout("writepages result %d\n", rc); 2730 return rc; 2731 } 2732 EXPORT_SYMBOL(ceph_osdc_writepages); 2733 2734 int ceph_osdc_setup(void) 2735 { 2736 size_t size = sizeof(struct ceph_osd_request) + 2737 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 2738 2739 BUG_ON(ceph_osd_request_cache); 2740 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 2741 0, 0, NULL); 2742 2743 return ceph_osd_request_cache ? 0 : -ENOMEM; 2744 } 2745 EXPORT_SYMBOL(ceph_osdc_setup); 2746 2747 void ceph_osdc_cleanup(void) 2748 { 2749 BUG_ON(!ceph_osd_request_cache); 2750 kmem_cache_destroy(ceph_osd_request_cache); 2751 ceph_osd_request_cache = NULL; 2752 } 2753 EXPORT_SYMBOL(ceph_osdc_cleanup); 2754 2755 /* 2756 * handle incoming message 2757 */ 2758 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2759 { 2760 struct ceph_osd *osd = con->private; 2761 struct ceph_osd_client *osdc; 2762 int type = le16_to_cpu(msg->hdr.type); 2763 2764 if (!osd) 2765 goto out; 2766 osdc = osd->o_osdc; 2767 2768 switch (type) { 2769 case CEPH_MSG_OSD_MAP: 2770 ceph_osdc_handle_map(osdc, msg); 2771 break; 2772 case CEPH_MSG_OSD_OPREPLY: 2773 handle_reply(osdc, msg); 2774 break; 2775 case CEPH_MSG_WATCH_NOTIFY: 2776 handle_watch_notify(osdc, msg); 2777 break; 2778 2779 default: 2780 pr_err("received unknown message type %d %s\n", type, 2781 ceph_msg_type_name(type)); 2782 } 2783 out: 2784 ceph_msg_put(msg); 2785 } 2786 2787 /* 2788 * Lookup and return message for incoming reply. Don't try to do 2789 * anything about a larger than preallocated data portion of the 2790 * message at the moment - for now, just skip the message. 2791 */ 2792 static struct ceph_msg *get_reply(struct ceph_connection *con, 2793 struct ceph_msg_header *hdr, 2794 int *skip) 2795 { 2796 struct ceph_osd *osd = con->private; 2797 struct ceph_osd_client *osdc = osd->o_osdc; 2798 struct ceph_msg *m; 2799 struct ceph_osd_request *req; 2800 int front_len = le32_to_cpu(hdr->front_len); 2801 int data_len = le32_to_cpu(hdr->data_len); 2802 u64 tid; 2803 2804 tid = le64_to_cpu(hdr->tid); 2805 mutex_lock(&osdc->request_mutex); 2806 req = lookup_request(&osdc->requests, tid); 2807 if (!req) { 2808 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 2809 osd->o_osd, tid); 2810 m = NULL; 2811 *skip = 1; 2812 goto out; 2813 } 2814 2815 ceph_msg_revoke_incoming(req->r_reply); 2816 2817 if (front_len > req->r_reply->front_alloc_len) { 2818 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 2819 __func__, osd->o_osd, req->r_tid, front_len, 2820 req->r_reply->front_alloc_len); 2821 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 2822 false); 2823 if (!m) 2824 goto out; 2825 ceph_msg_put(req->r_reply); 2826 req->r_reply = m; 2827 } 2828 2829 if (data_len > req->r_reply->data_length) { 2830 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 2831 __func__, osd->o_osd, req->r_tid, data_len, 2832 req->r_reply->data_length); 2833 m = NULL; 2834 *skip = 1; 2835 goto out; 2836 } 2837 2838 m = ceph_msg_get(req->r_reply); 2839 dout("get_reply tid %lld %p\n", tid, m); 2840 2841 out: 2842 mutex_unlock(&osdc->request_mutex); 2843 return m; 2844 } 2845 2846 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2847 struct ceph_msg_header *hdr, 2848 int *skip) 2849 { 2850 struct ceph_osd *osd = con->private; 2851 int type = le16_to_cpu(hdr->type); 2852 int front = le32_to_cpu(hdr->front_len); 2853 2854 *skip = 0; 2855 switch (type) { 2856 case CEPH_MSG_OSD_MAP: 2857 case CEPH_MSG_WATCH_NOTIFY: 2858 return ceph_msg_new(type, front, GFP_NOFS, false); 2859 case CEPH_MSG_OSD_OPREPLY: 2860 return get_reply(con, hdr, skip); 2861 default: 2862 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2863 osd->o_osd); 2864 *skip = 1; 2865 return NULL; 2866 } 2867 } 2868 2869 /* 2870 * Wrappers to refcount containing ceph_osd struct 2871 */ 2872 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2873 { 2874 struct ceph_osd *osd = con->private; 2875 if (get_osd(osd)) 2876 return con; 2877 return NULL; 2878 } 2879 2880 static void put_osd_con(struct ceph_connection *con) 2881 { 2882 struct ceph_osd *osd = con->private; 2883 put_osd(osd); 2884 } 2885 2886 /* 2887 * authentication 2888 */ 2889 /* 2890 * Note: returned pointer is the address of a structure that's 2891 * managed separately. Caller must *not* attempt to free it. 2892 */ 2893 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2894 int *proto, int force_new) 2895 { 2896 struct ceph_osd *o = con->private; 2897 struct ceph_osd_client *osdc = o->o_osdc; 2898 struct ceph_auth_client *ac = osdc->client->monc.auth; 2899 struct ceph_auth_handshake *auth = &o->o_auth; 2900 2901 if (force_new && auth->authorizer) { 2902 ceph_auth_destroy_authorizer(auth->authorizer); 2903 auth->authorizer = NULL; 2904 } 2905 if (!auth->authorizer) { 2906 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2907 auth); 2908 if (ret) 2909 return ERR_PTR(ret); 2910 } else { 2911 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2912 auth); 2913 if (ret) 2914 return ERR_PTR(ret); 2915 } 2916 *proto = ac->protocol; 2917 2918 return auth; 2919 } 2920 2921 2922 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2923 { 2924 struct ceph_osd *o = con->private; 2925 struct ceph_osd_client *osdc = o->o_osdc; 2926 struct ceph_auth_client *ac = osdc->client->monc.auth; 2927 2928 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2929 } 2930 2931 static int invalidate_authorizer(struct ceph_connection *con) 2932 { 2933 struct ceph_osd *o = con->private; 2934 struct ceph_osd_client *osdc = o->o_osdc; 2935 struct ceph_auth_client *ac = osdc->client->monc.auth; 2936 2937 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2938 return ceph_monc_validate_auth(&osdc->client->monc); 2939 } 2940 2941 static int osd_sign_message(struct ceph_msg *msg) 2942 { 2943 struct ceph_osd *o = msg->con->private; 2944 struct ceph_auth_handshake *auth = &o->o_auth; 2945 2946 return ceph_auth_sign_message(auth, msg); 2947 } 2948 2949 static int osd_check_message_signature(struct ceph_msg *msg) 2950 { 2951 struct ceph_osd *o = msg->con->private; 2952 struct ceph_auth_handshake *auth = &o->o_auth; 2953 2954 return ceph_auth_check_message_signature(auth, msg); 2955 } 2956 2957 static const struct ceph_connection_operations osd_con_ops = { 2958 .get = get_osd_con, 2959 .put = put_osd_con, 2960 .dispatch = dispatch, 2961 .get_authorizer = get_authorizer, 2962 .verify_authorizer_reply = verify_authorizer_reply, 2963 .invalidate_authorizer = invalidate_authorizer, 2964 .alloc_msg = alloc_msg, 2965 .sign_message = osd_sign_message, 2966 .check_message_signature = osd_check_message_signature, 2967 .fault = osd_reset, 2968 }; 2969