1 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/err.h> 6 #include <linux/highmem.h> 7 #include <linux/mm.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/uaccess.h> 11 #ifdef CONFIG_BLOCK 12 #include <linux/bio.h> 13 #endif 14 15 #include <linux/ceph/libceph.h> 16 #include <linux/ceph/osd_client.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/auth.h> 20 #include <linux/ceph/pagelist.h> 21 22 #define OSD_OP_FRONT_LEN 4096 23 #define OSD_OPREPLY_FRONT_LEN 512 24 25 static struct kmem_cache *ceph_osd_request_cache; 26 27 static const struct ceph_connection_operations osd_con_ops; 28 29 static void __send_queued(struct ceph_osd_client *osdc); 30 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 31 static void __register_request(struct ceph_osd_client *osdc, 32 struct ceph_osd_request *req); 33 static void __unregister_request(struct ceph_osd_client *osdc, 34 struct ceph_osd_request *req); 35 static void __unregister_linger_request(struct ceph_osd_client *osdc, 36 struct ceph_osd_request *req); 37 static void __enqueue_request(struct ceph_osd_request *req); 38 static void __send_request(struct ceph_osd_client *osdc, 39 struct ceph_osd_request *req); 40 41 /* 42 * Implement client access to distributed object storage cluster. 43 * 44 * All data objects are stored within a cluster/cloud of OSDs, or 45 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 46 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 47 * remote daemons serving up and coordinating consistent and safe 48 * access to storage. 49 * 50 * Cluster membership and the mapping of data objects onto storage devices 51 * are described by the osd map. 52 * 53 * We keep track of pending OSD requests (read, write), resubmit 54 * requests to different OSDs when the cluster topology/data layout 55 * change, or retry the affected requests when the communications 56 * channel with an OSD is reset. 57 */ 58 59 /* 60 * calculate the mapping of a file extent onto an object, and fill out the 61 * request accordingly. shorten extent as necessary if it crosses an 62 * object boundary. 63 * 64 * fill osd op in request message. 65 */ 66 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 67 u64 *objnum, u64 *objoff, u64 *objlen) 68 { 69 u64 orig_len = *plen; 70 int r; 71 72 /* object extent? */ 73 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 74 objoff, objlen); 75 if (r < 0) 76 return r; 77 if (*objlen < orig_len) { 78 *plen = *objlen; 79 dout(" skipping last %llu, final file extent %llu~%llu\n", 80 orig_len - *plen, off, *plen); 81 } 82 83 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 84 85 return 0; 86 } 87 88 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 89 { 90 memset(osd_data, 0, sizeof (*osd_data)); 91 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 92 } 93 94 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 95 struct page **pages, u64 length, u32 alignment, 96 bool pages_from_pool, bool own_pages) 97 { 98 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 99 osd_data->pages = pages; 100 osd_data->length = length; 101 osd_data->alignment = alignment; 102 osd_data->pages_from_pool = pages_from_pool; 103 osd_data->own_pages = own_pages; 104 } 105 106 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 107 struct ceph_pagelist *pagelist) 108 { 109 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 110 osd_data->pagelist = pagelist; 111 } 112 113 #ifdef CONFIG_BLOCK 114 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 115 struct bio *bio, size_t bio_length) 116 { 117 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 118 osd_data->bio = bio; 119 osd_data->bio_length = bio_length; 120 } 121 #endif /* CONFIG_BLOCK */ 122 123 #define osd_req_op_data(oreq, whch, typ, fld) \ 124 ({ \ 125 BUG_ON(whch >= (oreq)->r_num_ops); \ 126 &(oreq)->r_ops[whch].typ.fld; \ 127 }) 128 129 static struct ceph_osd_data * 130 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 131 { 132 BUG_ON(which >= osd_req->r_num_ops); 133 134 return &osd_req->r_ops[which].raw_data_in; 135 } 136 137 struct ceph_osd_data * 138 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 139 unsigned int which) 140 { 141 return osd_req_op_data(osd_req, which, extent, osd_data); 142 } 143 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 144 145 struct ceph_osd_data * 146 osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, 147 unsigned int which) 148 { 149 return osd_req_op_data(osd_req, which, cls, response_data); 150 } 151 EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */ 152 153 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 154 unsigned int which, struct page **pages, 155 u64 length, u32 alignment, 156 bool pages_from_pool, bool own_pages) 157 { 158 struct ceph_osd_data *osd_data; 159 160 osd_data = osd_req_op_raw_data_in(osd_req, which); 161 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 162 pages_from_pool, own_pages); 163 } 164 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 165 166 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 167 unsigned int which, struct page **pages, 168 u64 length, u32 alignment, 169 bool pages_from_pool, bool own_pages) 170 { 171 struct ceph_osd_data *osd_data; 172 173 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 174 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 175 pages_from_pool, own_pages); 176 } 177 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 178 179 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 180 unsigned int which, struct ceph_pagelist *pagelist) 181 { 182 struct ceph_osd_data *osd_data; 183 184 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 185 ceph_osd_data_pagelist_init(osd_data, pagelist); 186 } 187 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 188 189 #ifdef CONFIG_BLOCK 190 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 191 unsigned int which, struct bio *bio, size_t bio_length) 192 { 193 struct ceph_osd_data *osd_data; 194 195 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 196 ceph_osd_data_bio_init(osd_data, bio, bio_length); 197 } 198 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 199 #endif /* CONFIG_BLOCK */ 200 201 static void osd_req_op_cls_request_info_pagelist( 202 struct ceph_osd_request *osd_req, 203 unsigned int which, struct ceph_pagelist *pagelist) 204 { 205 struct ceph_osd_data *osd_data; 206 207 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 208 ceph_osd_data_pagelist_init(osd_data, pagelist); 209 } 210 211 void osd_req_op_cls_request_data_pagelist( 212 struct ceph_osd_request *osd_req, 213 unsigned int which, struct ceph_pagelist *pagelist) 214 { 215 struct ceph_osd_data *osd_data; 216 217 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 218 ceph_osd_data_pagelist_init(osd_data, pagelist); 219 } 220 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 221 222 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 223 unsigned int which, struct page **pages, u64 length, 224 u32 alignment, bool pages_from_pool, bool own_pages) 225 { 226 struct ceph_osd_data *osd_data; 227 228 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 229 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 230 pages_from_pool, own_pages); 231 } 232 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 233 234 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 235 unsigned int which, struct page **pages, u64 length, 236 u32 alignment, bool pages_from_pool, bool own_pages) 237 { 238 struct ceph_osd_data *osd_data; 239 240 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 241 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 242 pages_from_pool, own_pages); 243 } 244 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 245 246 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 247 { 248 switch (osd_data->type) { 249 case CEPH_OSD_DATA_TYPE_NONE: 250 return 0; 251 case CEPH_OSD_DATA_TYPE_PAGES: 252 return osd_data->length; 253 case CEPH_OSD_DATA_TYPE_PAGELIST: 254 return (u64)osd_data->pagelist->length; 255 #ifdef CONFIG_BLOCK 256 case CEPH_OSD_DATA_TYPE_BIO: 257 return (u64)osd_data->bio_length; 258 #endif /* CONFIG_BLOCK */ 259 default: 260 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 261 return 0; 262 } 263 } 264 265 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 266 { 267 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 268 int num_pages; 269 270 num_pages = calc_pages_for((u64)osd_data->alignment, 271 (u64)osd_data->length); 272 ceph_release_page_vector(osd_data->pages, num_pages); 273 } 274 ceph_osd_data_init(osd_data); 275 } 276 277 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 278 unsigned int which) 279 { 280 struct ceph_osd_req_op *op; 281 282 BUG_ON(which >= osd_req->r_num_ops); 283 op = &osd_req->r_ops[which]; 284 285 switch (op->op) { 286 case CEPH_OSD_OP_READ: 287 case CEPH_OSD_OP_WRITE: 288 ceph_osd_data_release(&op->extent.osd_data); 289 break; 290 case CEPH_OSD_OP_CALL: 291 ceph_osd_data_release(&op->cls.request_info); 292 ceph_osd_data_release(&op->cls.request_data); 293 ceph_osd_data_release(&op->cls.response_data); 294 break; 295 case CEPH_OSD_OP_SETXATTR: 296 case CEPH_OSD_OP_CMPXATTR: 297 ceph_osd_data_release(&op->xattr.osd_data); 298 break; 299 default: 300 break; 301 } 302 } 303 304 /* 305 * requests 306 */ 307 static void ceph_osdc_release_request(struct kref *kref) 308 { 309 struct ceph_osd_request *req = container_of(kref, 310 struct ceph_osd_request, r_kref); 311 unsigned int which; 312 313 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 314 req->r_request, req->r_reply); 315 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 316 WARN_ON(!list_empty(&req->r_req_lru_item)); 317 WARN_ON(!list_empty(&req->r_osd_item)); 318 WARN_ON(!list_empty(&req->r_linger_item)); 319 WARN_ON(!list_empty(&req->r_linger_osd_item)); 320 WARN_ON(req->r_osd); 321 322 if (req->r_request) 323 ceph_msg_put(req->r_request); 324 if (req->r_reply) { 325 ceph_msg_revoke_incoming(req->r_reply); 326 ceph_msg_put(req->r_reply); 327 } 328 329 for (which = 0; which < req->r_num_ops; which++) 330 osd_req_op_data_release(req, which); 331 332 ceph_put_snap_context(req->r_snapc); 333 if (req->r_mempool) 334 mempool_free(req, req->r_osdc->req_mempool); 335 else 336 kmem_cache_free(ceph_osd_request_cache, req); 337 338 } 339 340 void ceph_osdc_get_request(struct ceph_osd_request *req) 341 { 342 dout("%s %p (was %d)\n", __func__, req, 343 atomic_read(&req->r_kref.refcount)); 344 kref_get(&req->r_kref); 345 } 346 EXPORT_SYMBOL(ceph_osdc_get_request); 347 348 void ceph_osdc_put_request(struct ceph_osd_request *req) 349 { 350 dout("%s %p (was %d)\n", __func__, req, 351 atomic_read(&req->r_kref.refcount)); 352 kref_put(&req->r_kref, ceph_osdc_release_request); 353 } 354 EXPORT_SYMBOL(ceph_osdc_put_request); 355 356 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 357 struct ceph_snap_context *snapc, 358 unsigned int num_ops, 359 bool use_mempool, 360 gfp_t gfp_flags) 361 { 362 struct ceph_osd_request *req; 363 struct ceph_msg *msg; 364 size_t msg_size; 365 366 BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX); 367 BUG_ON(num_ops > CEPH_OSD_MAX_OP); 368 369 msg_size = 4 + 4 + 8 + 8 + 4+8; 370 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 371 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 372 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ 373 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 374 msg_size += 8; /* snapid */ 375 msg_size += 8; /* snap_seq */ 376 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 377 msg_size += 4; 378 379 if (use_mempool) { 380 req = mempool_alloc(osdc->req_mempool, gfp_flags); 381 memset(req, 0, sizeof(*req)); 382 } else { 383 req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags); 384 } 385 if (req == NULL) 386 return NULL; 387 388 req->r_osdc = osdc; 389 req->r_mempool = use_mempool; 390 req->r_num_ops = num_ops; 391 392 kref_init(&req->r_kref); 393 init_completion(&req->r_completion); 394 init_completion(&req->r_safe_completion); 395 RB_CLEAR_NODE(&req->r_node); 396 INIT_LIST_HEAD(&req->r_unsafe_item); 397 INIT_LIST_HEAD(&req->r_linger_item); 398 INIT_LIST_HEAD(&req->r_linger_osd_item); 399 INIT_LIST_HEAD(&req->r_req_lru_item); 400 INIT_LIST_HEAD(&req->r_osd_item); 401 402 req->r_base_oloc.pool = -1; 403 req->r_target_oloc.pool = -1; 404 405 /* create reply message */ 406 if (use_mempool) 407 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 408 else 409 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 410 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 411 if (!msg) { 412 ceph_osdc_put_request(req); 413 return NULL; 414 } 415 req->r_reply = msg; 416 417 /* create request message; allow space for oid */ 418 if (use_mempool) 419 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 420 else 421 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 422 if (!msg) { 423 ceph_osdc_put_request(req); 424 return NULL; 425 } 426 427 memset(msg->front.iov_base, 0, msg->front.iov_len); 428 429 req->r_request = msg; 430 431 return req; 432 } 433 EXPORT_SYMBOL(ceph_osdc_alloc_request); 434 435 static bool osd_req_opcode_valid(u16 opcode) 436 { 437 switch (opcode) { 438 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 439 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 440 #undef GENERATE_CASE 441 default: 442 return false; 443 } 444 } 445 446 /* 447 * This is an osd op init function for opcodes that have no data or 448 * other information associated with them. It also serves as a 449 * common init routine for all the other init functions, below. 450 */ 451 static struct ceph_osd_req_op * 452 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 453 u16 opcode) 454 { 455 struct ceph_osd_req_op *op; 456 457 BUG_ON(which >= osd_req->r_num_ops); 458 BUG_ON(!osd_req_opcode_valid(opcode)); 459 460 op = &osd_req->r_ops[which]; 461 memset(op, 0, sizeof (*op)); 462 op->op = opcode; 463 464 return op; 465 } 466 467 void osd_req_op_init(struct ceph_osd_request *osd_req, 468 unsigned int which, u16 opcode) 469 { 470 (void)_osd_req_op_init(osd_req, which, opcode); 471 } 472 EXPORT_SYMBOL(osd_req_op_init); 473 474 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 475 unsigned int which, u16 opcode, 476 u64 offset, u64 length, 477 u64 truncate_size, u32 truncate_seq) 478 { 479 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 480 size_t payload_len = 0; 481 482 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 483 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE); 484 485 op->extent.offset = offset; 486 op->extent.length = length; 487 op->extent.truncate_size = truncate_size; 488 op->extent.truncate_seq = truncate_seq; 489 if (opcode == CEPH_OSD_OP_WRITE) 490 payload_len += length; 491 492 op->payload_len = payload_len; 493 } 494 EXPORT_SYMBOL(osd_req_op_extent_init); 495 496 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 497 unsigned int which, u64 length) 498 { 499 struct ceph_osd_req_op *op; 500 u64 previous; 501 502 BUG_ON(which >= osd_req->r_num_ops); 503 op = &osd_req->r_ops[which]; 504 previous = op->extent.length; 505 506 if (length == previous) 507 return; /* Nothing to do */ 508 BUG_ON(length > previous); 509 510 op->extent.length = length; 511 op->payload_len -= previous - length; 512 } 513 EXPORT_SYMBOL(osd_req_op_extent_update); 514 515 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 516 u16 opcode, const char *class, const char *method) 517 { 518 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 519 struct ceph_pagelist *pagelist; 520 size_t payload_len = 0; 521 size_t size; 522 523 BUG_ON(opcode != CEPH_OSD_OP_CALL); 524 525 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 526 BUG_ON(!pagelist); 527 ceph_pagelist_init(pagelist); 528 529 op->cls.class_name = class; 530 size = strlen(class); 531 BUG_ON(size > (size_t) U8_MAX); 532 op->cls.class_len = size; 533 ceph_pagelist_append(pagelist, class, size); 534 payload_len += size; 535 536 op->cls.method_name = method; 537 size = strlen(method); 538 BUG_ON(size > (size_t) U8_MAX); 539 op->cls.method_len = size; 540 ceph_pagelist_append(pagelist, method, size); 541 payload_len += size; 542 543 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 544 545 op->cls.argc = 0; /* currently unused */ 546 547 op->payload_len = payload_len; 548 } 549 EXPORT_SYMBOL(osd_req_op_cls_init); 550 551 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 552 u16 opcode, const char *name, const void *value, 553 size_t size, u8 cmp_op, u8 cmp_mode) 554 { 555 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 556 struct ceph_pagelist *pagelist; 557 size_t payload_len; 558 559 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 560 561 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 562 if (!pagelist) 563 return -ENOMEM; 564 565 ceph_pagelist_init(pagelist); 566 567 payload_len = strlen(name); 568 op->xattr.name_len = payload_len; 569 ceph_pagelist_append(pagelist, name, payload_len); 570 571 op->xattr.value_len = size; 572 ceph_pagelist_append(pagelist, value, size); 573 payload_len += size; 574 575 op->xattr.cmp_op = cmp_op; 576 op->xattr.cmp_mode = cmp_mode; 577 578 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 579 op->payload_len = payload_len; 580 return 0; 581 } 582 EXPORT_SYMBOL(osd_req_op_xattr_init); 583 584 void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 585 unsigned int which, u16 opcode, 586 u64 cookie, u64 version, int flag) 587 { 588 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 589 590 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 591 592 op->watch.cookie = cookie; 593 op->watch.ver = version; 594 if (opcode == CEPH_OSD_OP_WATCH && flag) 595 op->watch.flag = (u8)1; 596 } 597 EXPORT_SYMBOL(osd_req_op_watch_init); 598 599 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 600 unsigned int which, 601 u64 expected_object_size, 602 u64 expected_write_size) 603 { 604 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 605 CEPH_OSD_OP_SETALLOCHINT); 606 607 op->alloc_hint.expected_object_size = expected_object_size; 608 op->alloc_hint.expected_write_size = expected_write_size; 609 610 /* 611 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 612 * not worth a feature bit. Set FAILOK per-op flag to make 613 * sure older osds don't trip over an unsupported opcode. 614 */ 615 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 616 } 617 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 618 619 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 620 struct ceph_osd_data *osd_data) 621 { 622 u64 length = ceph_osd_data_length(osd_data); 623 624 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 625 BUG_ON(length > (u64) SIZE_MAX); 626 if (length) 627 ceph_msg_data_add_pages(msg, osd_data->pages, 628 length, osd_data->alignment); 629 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 630 BUG_ON(!length); 631 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 632 #ifdef CONFIG_BLOCK 633 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 634 ceph_msg_data_add_bio(msg, osd_data->bio, length); 635 #endif 636 } else { 637 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 638 } 639 } 640 641 static u64 osd_req_encode_op(struct ceph_osd_request *req, 642 struct ceph_osd_op *dst, unsigned int which) 643 { 644 struct ceph_osd_req_op *src; 645 struct ceph_osd_data *osd_data; 646 u64 request_data_len = 0; 647 u64 data_length; 648 649 BUG_ON(which >= req->r_num_ops); 650 src = &req->r_ops[which]; 651 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 652 pr_err("unrecognized osd opcode %d\n", src->op); 653 654 return 0; 655 } 656 657 switch (src->op) { 658 case CEPH_OSD_OP_STAT: 659 osd_data = &src->raw_data_in; 660 ceph_osdc_msg_data_add(req->r_reply, osd_data); 661 break; 662 case CEPH_OSD_OP_READ: 663 case CEPH_OSD_OP_WRITE: 664 case CEPH_OSD_OP_ZERO: 665 case CEPH_OSD_OP_TRUNCATE: 666 if (src->op == CEPH_OSD_OP_WRITE) 667 request_data_len = src->extent.length; 668 dst->extent.offset = cpu_to_le64(src->extent.offset); 669 dst->extent.length = cpu_to_le64(src->extent.length); 670 dst->extent.truncate_size = 671 cpu_to_le64(src->extent.truncate_size); 672 dst->extent.truncate_seq = 673 cpu_to_le32(src->extent.truncate_seq); 674 osd_data = &src->extent.osd_data; 675 if (src->op == CEPH_OSD_OP_WRITE) 676 ceph_osdc_msg_data_add(req->r_request, osd_data); 677 else 678 ceph_osdc_msg_data_add(req->r_reply, osd_data); 679 break; 680 case CEPH_OSD_OP_CALL: 681 dst->cls.class_len = src->cls.class_len; 682 dst->cls.method_len = src->cls.method_len; 683 osd_data = &src->cls.request_info; 684 ceph_osdc_msg_data_add(req->r_request, osd_data); 685 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST); 686 request_data_len = osd_data->pagelist->length; 687 688 osd_data = &src->cls.request_data; 689 data_length = ceph_osd_data_length(osd_data); 690 if (data_length) { 691 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); 692 dst->cls.indata_len = cpu_to_le32(data_length); 693 ceph_osdc_msg_data_add(req->r_request, osd_data); 694 src->payload_len += data_length; 695 request_data_len += data_length; 696 } 697 osd_data = &src->cls.response_data; 698 ceph_osdc_msg_data_add(req->r_reply, osd_data); 699 break; 700 case CEPH_OSD_OP_STARTSYNC: 701 break; 702 case CEPH_OSD_OP_NOTIFY_ACK: 703 case CEPH_OSD_OP_WATCH: 704 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 705 dst->watch.ver = cpu_to_le64(src->watch.ver); 706 dst->watch.flag = src->watch.flag; 707 break; 708 case CEPH_OSD_OP_SETALLOCHINT: 709 dst->alloc_hint.expected_object_size = 710 cpu_to_le64(src->alloc_hint.expected_object_size); 711 dst->alloc_hint.expected_write_size = 712 cpu_to_le64(src->alloc_hint.expected_write_size); 713 break; 714 case CEPH_OSD_OP_SETXATTR: 715 case CEPH_OSD_OP_CMPXATTR: 716 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 717 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 718 dst->xattr.cmp_op = src->xattr.cmp_op; 719 dst->xattr.cmp_mode = src->xattr.cmp_mode; 720 osd_data = &src->xattr.osd_data; 721 ceph_osdc_msg_data_add(req->r_request, osd_data); 722 request_data_len = osd_data->pagelist->length; 723 break; 724 case CEPH_OSD_OP_CREATE: 725 case CEPH_OSD_OP_DELETE: 726 break; 727 default: 728 pr_err("unsupported osd opcode %s\n", 729 ceph_osd_op_name(src->op)); 730 WARN_ON(1); 731 732 return 0; 733 } 734 735 dst->op = cpu_to_le16(src->op); 736 dst->flags = cpu_to_le32(src->flags); 737 dst->payload_len = cpu_to_le32(src->payload_len); 738 739 return request_data_len; 740 } 741 742 /* 743 * build new request AND message, calculate layout, and adjust file 744 * extent as needed. 745 * 746 * if the file was recently truncated, we include information about its 747 * old and new size so that the object can be updated appropriately. (we 748 * avoid synchronously deleting truncated objects because it's slow.) 749 * 750 * if @do_sync, include a 'startsync' command so that the osd will flush 751 * data quickly. 752 */ 753 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 754 struct ceph_file_layout *layout, 755 struct ceph_vino vino, 756 u64 off, u64 *plen, int num_ops, 757 int opcode, int flags, 758 struct ceph_snap_context *snapc, 759 u32 truncate_seq, 760 u64 truncate_size, 761 bool use_mempool) 762 { 763 struct ceph_osd_request *req; 764 u64 objnum = 0; 765 u64 objoff = 0; 766 u64 objlen = 0; 767 int r; 768 769 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 770 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 771 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 772 773 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 774 GFP_NOFS); 775 if (!req) 776 return ERR_PTR(-ENOMEM); 777 778 req->r_flags = flags; 779 780 /* calculate max write size */ 781 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 782 if (r < 0) { 783 ceph_osdc_put_request(req); 784 return ERR_PTR(r); 785 } 786 787 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 788 osd_req_op_init(req, 0, opcode); 789 } else { 790 u32 object_size = le32_to_cpu(layout->fl_object_size); 791 u32 object_base = off - objoff; 792 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 793 if (truncate_size <= object_base) { 794 truncate_size = 0; 795 } else { 796 truncate_size -= object_base; 797 if (truncate_size > object_size) 798 truncate_size = object_size; 799 } 800 } 801 802 osd_req_op_extent_init(req, 0, opcode, objoff, objlen, 803 truncate_size, truncate_seq); 804 } 805 /* 806 * A second op in the ops array means the caller wants to 807 * also issue a include a 'startsync' command so that the 808 * osd will flush data quickly. 809 */ 810 if (num_ops > 1) 811 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); 812 813 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); 814 815 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), 816 "%llx.%08llx", vino.ino, objnum); 817 req->r_base_oid.name_len = strlen(req->r_base_oid.name); 818 819 return req; 820 } 821 EXPORT_SYMBOL(ceph_osdc_new_request); 822 823 /* 824 * We keep osd requests in an rbtree, sorted by ->r_tid. 825 */ 826 static void __insert_request(struct ceph_osd_client *osdc, 827 struct ceph_osd_request *new) 828 { 829 struct rb_node **p = &osdc->requests.rb_node; 830 struct rb_node *parent = NULL; 831 struct ceph_osd_request *req = NULL; 832 833 while (*p) { 834 parent = *p; 835 req = rb_entry(parent, struct ceph_osd_request, r_node); 836 if (new->r_tid < req->r_tid) 837 p = &(*p)->rb_left; 838 else if (new->r_tid > req->r_tid) 839 p = &(*p)->rb_right; 840 else 841 BUG(); 842 } 843 844 rb_link_node(&new->r_node, parent, p); 845 rb_insert_color(&new->r_node, &osdc->requests); 846 } 847 848 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 849 u64 tid) 850 { 851 struct ceph_osd_request *req; 852 struct rb_node *n = osdc->requests.rb_node; 853 854 while (n) { 855 req = rb_entry(n, struct ceph_osd_request, r_node); 856 if (tid < req->r_tid) 857 n = n->rb_left; 858 else if (tid > req->r_tid) 859 n = n->rb_right; 860 else 861 return req; 862 } 863 return NULL; 864 } 865 866 static struct ceph_osd_request * 867 __lookup_request_ge(struct ceph_osd_client *osdc, 868 u64 tid) 869 { 870 struct ceph_osd_request *req; 871 struct rb_node *n = osdc->requests.rb_node; 872 873 while (n) { 874 req = rb_entry(n, struct ceph_osd_request, r_node); 875 if (tid < req->r_tid) { 876 if (!n->rb_left) 877 return req; 878 n = n->rb_left; 879 } else if (tid > req->r_tid) { 880 n = n->rb_right; 881 } else { 882 return req; 883 } 884 } 885 return NULL; 886 } 887 888 static void __kick_linger_request(struct ceph_osd_request *req) 889 { 890 struct ceph_osd_client *osdc = req->r_osdc; 891 struct ceph_osd *osd = req->r_osd; 892 893 /* 894 * Linger requests need to be resent with a new tid to avoid 895 * the dup op detection logic on the OSDs. Achieve this with 896 * a re-register dance instead of open-coding. 897 */ 898 ceph_osdc_get_request(req); 899 if (!list_empty(&req->r_linger_item)) 900 __unregister_linger_request(osdc, req); 901 else 902 __unregister_request(osdc, req); 903 __register_request(osdc, req); 904 ceph_osdc_put_request(req); 905 906 /* 907 * Unless request has been registered as both normal and 908 * lingering, __unregister{,_linger}_request clears r_osd. 909 * However, here we need to preserve r_osd to make sure we 910 * requeue on the same OSD. 911 */ 912 WARN_ON(req->r_osd || !osd); 913 req->r_osd = osd; 914 915 dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid); 916 __enqueue_request(req); 917 } 918 919 /* 920 * Resubmit requests pending on the given osd. 921 */ 922 static void __kick_osd_requests(struct ceph_osd_client *osdc, 923 struct ceph_osd *osd) 924 { 925 struct ceph_osd_request *req, *nreq; 926 LIST_HEAD(resend); 927 LIST_HEAD(resend_linger); 928 int err; 929 930 dout("%s osd%d\n", __func__, osd->o_osd); 931 err = __reset_osd(osdc, osd); 932 if (err) 933 return; 934 935 /* 936 * Build up a list of requests to resend by traversing the 937 * osd's list of requests. Requests for a given object are 938 * sent in tid order, and that is also the order they're 939 * kept on this list. Therefore all requests that are in 940 * flight will be found first, followed by all requests that 941 * have not yet been sent. And to resend requests while 942 * preserving this order we will want to put any sent 943 * requests back on the front of the osd client's unsent 944 * list. 945 * 946 * So we build a separate ordered list of already-sent 947 * requests for the affected osd and splice it onto the 948 * front of the osd client's unsent list. Once we've seen a 949 * request that has not yet been sent we're done. Those 950 * requests are already sitting right where they belong. 951 */ 952 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 953 if (!req->r_sent) 954 break; 955 956 if (!req->r_linger) { 957 dout("%s requeueing %p tid %llu\n", __func__, req, 958 req->r_tid); 959 list_move_tail(&req->r_req_lru_item, &resend); 960 req->r_flags |= CEPH_OSD_FLAG_RETRY; 961 } else { 962 list_move_tail(&req->r_req_lru_item, &resend_linger); 963 } 964 } 965 list_splice(&resend, &osdc->req_unsent); 966 967 /* 968 * Both registered and not yet registered linger requests are 969 * enqueued with a new tid on the same OSD. We add/move them 970 * to req_unsent/o_requests at the end to keep things in tid 971 * order. 972 */ 973 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 974 r_linger_osd_item) { 975 WARN_ON(!list_empty(&req->r_req_lru_item)); 976 __kick_linger_request(req); 977 } 978 979 list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item) 980 __kick_linger_request(req); 981 } 982 983 /* 984 * If the osd connection drops, we need to resubmit all requests. 985 */ 986 static void osd_reset(struct ceph_connection *con) 987 { 988 struct ceph_osd *osd = con->private; 989 struct ceph_osd_client *osdc; 990 991 if (!osd) 992 return; 993 dout("osd_reset osd%d\n", osd->o_osd); 994 osdc = osd->o_osdc; 995 down_read(&osdc->map_sem); 996 mutex_lock(&osdc->request_mutex); 997 __kick_osd_requests(osdc, osd); 998 __send_queued(osdc); 999 mutex_unlock(&osdc->request_mutex); 1000 up_read(&osdc->map_sem); 1001 } 1002 1003 /* 1004 * Track open sessions with osds. 1005 */ 1006 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1007 { 1008 struct ceph_osd *osd; 1009 1010 osd = kzalloc(sizeof(*osd), GFP_NOFS); 1011 if (!osd) 1012 return NULL; 1013 1014 atomic_set(&osd->o_ref, 1); 1015 osd->o_osdc = osdc; 1016 osd->o_osd = onum; 1017 RB_CLEAR_NODE(&osd->o_node); 1018 INIT_LIST_HEAD(&osd->o_requests); 1019 INIT_LIST_HEAD(&osd->o_linger_requests); 1020 INIT_LIST_HEAD(&osd->o_osd_lru); 1021 osd->o_incarnation = 1; 1022 1023 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1024 1025 INIT_LIST_HEAD(&osd->o_keepalive_item); 1026 return osd; 1027 } 1028 1029 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1030 { 1031 if (atomic_inc_not_zero(&osd->o_ref)) { 1032 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 1033 atomic_read(&osd->o_ref)); 1034 return osd; 1035 } else { 1036 dout("get_osd %p FAIL\n", osd); 1037 return NULL; 1038 } 1039 } 1040 1041 static void put_osd(struct ceph_osd *osd) 1042 { 1043 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 1044 atomic_read(&osd->o_ref) - 1); 1045 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 1046 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 1047 1048 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 1049 kfree(osd); 1050 } 1051 } 1052 1053 /* 1054 * remove an osd from our map 1055 */ 1056 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1057 { 1058 dout("__remove_osd %p\n", osd); 1059 WARN_ON(!list_empty(&osd->o_requests)); 1060 WARN_ON(!list_empty(&osd->o_linger_requests)); 1061 1062 rb_erase(&osd->o_node, &osdc->osds); 1063 list_del_init(&osd->o_osd_lru); 1064 ceph_con_close(&osd->o_con); 1065 put_osd(osd); 1066 } 1067 1068 static void remove_all_osds(struct ceph_osd_client *osdc) 1069 { 1070 dout("%s %p\n", __func__, osdc); 1071 mutex_lock(&osdc->request_mutex); 1072 while (!RB_EMPTY_ROOT(&osdc->osds)) { 1073 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 1074 struct ceph_osd, o_node); 1075 __remove_osd(osdc, osd); 1076 } 1077 mutex_unlock(&osdc->request_mutex); 1078 } 1079 1080 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 1081 struct ceph_osd *osd) 1082 { 1083 dout("%s %p\n", __func__, osd); 1084 BUG_ON(!list_empty(&osd->o_osd_lru)); 1085 1086 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1087 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 1088 } 1089 1090 static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, 1091 struct ceph_osd *osd) 1092 { 1093 dout("%s %p\n", __func__, osd); 1094 1095 if (list_empty(&osd->o_requests) && 1096 list_empty(&osd->o_linger_requests)) 1097 __move_osd_to_lru(osdc, osd); 1098 } 1099 1100 static void __remove_osd_from_lru(struct ceph_osd *osd) 1101 { 1102 dout("__remove_osd_from_lru %p\n", osd); 1103 if (!list_empty(&osd->o_osd_lru)) 1104 list_del_init(&osd->o_osd_lru); 1105 } 1106 1107 static void remove_old_osds(struct ceph_osd_client *osdc) 1108 { 1109 struct ceph_osd *osd, *nosd; 1110 1111 dout("__remove_old_osds %p\n", osdc); 1112 mutex_lock(&osdc->request_mutex); 1113 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 1114 if (time_before(jiffies, osd->lru_ttl)) 1115 break; 1116 __remove_osd(osdc, osd); 1117 } 1118 mutex_unlock(&osdc->request_mutex); 1119 } 1120 1121 /* 1122 * reset osd connect 1123 */ 1124 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1125 { 1126 struct ceph_entity_addr *peer_addr; 1127 1128 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 1129 if (list_empty(&osd->o_requests) && 1130 list_empty(&osd->o_linger_requests)) { 1131 __remove_osd(osdc, osd); 1132 1133 return -ENODEV; 1134 } 1135 1136 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 1137 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1138 !ceph_con_opened(&osd->o_con)) { 1139 struct ceph_osd_request *req; 1140 1141 dout("osd addr hasn't changed and connection never opened, " 1142 "letting msgr retry\n"); 1143 /* touch each r_stamp for handle_timeout()'s benfit */ 1144 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1145 req->r_stamp = jiffies; 1146 1147 return -EAGAIN; 1148 } 1149 1150 ceph_con_close(&osd->o_con); 1151 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1152 osd->o_incarnation++; 1153 1154 return 0; 1155 } 1156 1157 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 1158 { 1159 struct rb_node **p = &osdc->osds.rb_node; 1160 struct rb_node *parent = NULL; 1161 struct ceph_osd *osd = NULL; 1162 1163 dout("__insert_osd %p osd%d\n", new, new->o_osd); 1164 while (*p) { 1165 parent = *p; 1166 osd = rb_entry(parent, struct ceph_osd, o_node); 1167 if (new->o_osd < osd->o_osd) 1168 p = &(*p)->rb_left; 1169 else if (new->o_osd > osd->o_osd) 1170 p = &(*p)->rb_right; 1171 else 1172 BUG(); 1173 } 1174 1175 rb_link_node(&new->o_node, parent, p); 1176 rb_insert_color(&new->o_node, &osdc->osds); 1177 } 1178 1179 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 1180 { 1181 struct ceph_osd *osd; 1182 struct rb_node *n = osdc->osds.rb_node; 1183 1184 while (n) { 1185 osd = rb_entry(n, struct ceph_osd, o_node); 1186 if (o < osd->o_osd) 1187 n = n->rb_left; 1188 else if (o > osd->o_osd) 1189 n = n->rb_right; 1190 else 1191 return osd; 1192 } 1193 return NULL; 1194 } 1195 1196 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 1197 { 1198 schedule_delayed_work(&osdc->timeout_work, 1199 osdc->client->options->osd_keepalive_timeout * HZ); 1200 } 1201 1202 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 1203 { 1204 cancel_delayed_work(&osdc->timeout_work); 1205 } 1206 1207 /* 1208 * Register request, assign tid. If this is the first request, set up 1209 * the timeout event. 1210 */ 1211 static void __register_request(struct ceph_osd_client *osdc, 1212 struct ceph_osd_request *req) 1213 { 1214 req->r_tid = ++osdc->last_tid; 1215 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 1216 dout("__register_request %p tid %lld\n", req, req->r_tid); 1217 __insert_request(osdc, req); 1218 ceph_osdc_get_request(req); 1219 osdc->num_requests++; 1220 if (osdc->num_requests == 1) { 1221 dout(" first request, scheduling timeout\n"); 1222 __schedule_osd_timeout(osdc); 1223 } 1224 } 1225 1226 /* 1227 * called under osdc->request_mutex 1228 */ 1229 static void __unregister_request(struct ceph_osd_client *osdc, 1230 struct ceph_osd_request *req) 1231 { 1232 if (RB_EMPTY_NODE(&req->r_node)) { 1233 dout("__unregister_request %p tid %lld not registered\n", 1234 req, req->r_tid); 1235 return; 1236 } 1237 1238 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1239 rb_erase(&req->r_node, &osdc->requests); 1240 RB_CLEAR_NODE(&req->r_node); 1241 osdc->num_requests--; 1242 1243 if (req->r_osd) { 1244 /* make sure the original request isn't in flight. */ 1245 ceph_msg_revoke(req->r_request); 1246 1247 list_del_init(&req->r_osd_item); 1248 maybe_move_osd_to_lru(osdc, req->r_osd); 1249 if (list_empty(&req->r_linger_osd_item)) 1250 req->r_osd = NULL; 1251 } 1252 1253 list_del_init(&req->r_req_lru_item); 1254 ceph_osdc_put_request(req); 1255 1256 if (osdc->num_requests == 0) { 1257 dout(" no requests, canceling timeout\n"); 1258 __cancel_osd_timeout(osdc); 1259 } 1260 } 1261 1262 /* 1263 * Cancel a previously queued request message 1264 */ 1265 static void __cancel_request(struct ceph_osd_request *req) 1266 { 1267 if (req->r_sent && req->r_osd) { 1268 ceph_msg_revoke(req->r_request); 1269 req->r_sent = 0; 1270 } 1271 } 1272 1273 static void __register_linger_request(struct ceph_osd_client *osdc, 1274 struct ceph_osd_request *req) 1275 { 1276 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 1277 WARN_ON(!req->r_linger); 1278 1279 ceph_osdc_get_request(req); 1280 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1281 if (req->r_osd) 1282 list_add_tail(&req->r_linger_osd_item, 1283 &req->r_osd->o_linger_requests); 1284 } 1285 1286 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1287 struct ceph_osd_request *req) 1288 { 1289 WARN_ON(!req->r_linger); 1290 1291 if (list_empty(&req->r_linger_item)) { 1292 dout("%s %p tid %llu not registered\n", __func__, req, 1293 req->r_tid); 1294 return; 1295 } 1296 1297 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 1298 list_del_init(&req->r_linger_item); 1299 1300 if (req->r_osd) { 1301 list_del_init(&req->r_linger_osd_item); 1302 maybe_move_osd_to_lru(osdc, req->r_osd); 1303 if (list_empty(&req->r_osd_item)) 1304 req->r_osd = NULL; 1305 } 1306 1307 list_del_init(&req->r_req_lru_item); /* can be on notarget */ 1308 ceph_osdc_put_request(req); 1309 } 1310 1311 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1312 struct ceph_osd_request *req) 1313 { 1314 if (!req->r_linger) { 1315 dout("set_request_linger %p\n", req); 1316 req->r_linger = 1; 1317 } 1318 } 1319 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1320 1321 /* 1322 * Returns whether a request should be blocked from being sent 1323 * based on the current osdmap and osd_client settings. 1324 * 1325 * Caller should hold map_sem for read. 1326 */ 1327 static bool __req_should_be_paused(struct ceph_osd_client *osdc, 1328 struct ceph_osd_request *req) 1329 { 1330 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); 1331 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || 1332 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 1333 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || 1334 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); 1335 } 1336 1337 /* 1338 * Calculate mapping of a request to a PG. Takes tiering into account. 1339 */ 1340 static int __calc_request_pg(struct ceph_osdmap *osdmap, 1341 struct ceph_osd_request *req, 1342 struct ceph_pg *pg_out) 1343 { 1344 bool need_check_tiering; 1345 1346 need_check_tiering = false; 1347 if (req->r_target_oloc.pool == -1) { 1348 req->r_target_oloc = req->r_base_oloc; /* struct */ 1349 need_check_tiering = true; 1350 } 1351 if (req->r_target_oid.name_len == 0) { 1352 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); 1353 need_check_tiering = true; 1354 } 1355 1356 if (need_check_tiering && 1357 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1358 struct ceph_pg_pool_info *pi; 1359 1360 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); 1361 if (pi) { 1362 if ((req->r_flags & CEPH_OSD_FLAG_READ) && 1363 pi->read_tier >= 0) 1364 req->r_target_oloc.pool = pi->read_tier; 1365 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 1366 pi->write_tier >= 0) 1367 req->r_target_oloc.pool = pi->write_tier; 1368 } 1369 /* !pi is caught in ceph_oloc_oid_to_pg() */ 1370 } 1371 1372 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, 1373 &req->r_target_oid, pg_out); 1374 } 1375 1376 static void __enqueue_request(struct ceph_osd_request *req) 1377 { 1378 struct ceph_osd_client *osdc = req->r_osdc; 1379 1380 dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid, 1381 req->r_osd ? req->r_osd->o_osd : -1); 1382 1383 if (req->r_osd) { 1384 __remove_osd_from_lru(req->r_osd); 1385 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1386 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1387 } else { 1388 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1389 } 1390 } 1391 1392 /* 1393 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1394 * (as needed), and set the request r_osd appropriately. If there is 1395 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1396 * (unsent, homeless) or leave on in-flight lru. 1397 * 1398 * Return 0 if unchanged, 1 if changed, or negative on error. 1399 * 1400 * Caller should hold map_sem for read and request_mutex. 1401 */ 1402 static int __map_request(struct ceph_osd_client *osdc, 1403 struct ceph_osd_request *req, int force_resend) 1404 { 1405 struct ceph_pg pgid; 1406 int acting[CEPH_PG_MAX_SIZE]; 1407 int num, o; 1408 int err; 1409 bool was_paused; 1410 1411 dout("map_request %p tid %lld\n", req, req->r_tid); 1412 1413 err = __calc_request_pg(osdc->osdmap, req, &pgid); 1414 if (err) { 1415 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1416 return err; 1417 } 1418 req->r_pgid = pgid; 1419 1420 num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o); 1421 if (num < 0) 1422 num = 0; 1423 1424 was_paused = req->r_paused; 1425 req->r_paused = __req_should_be_paused(osdc, req); 1426 if (was_paused && !req->r_paused) 1427 force_resend = 1; 1428 1429 if ((!force_resend && 1430 req->r_osd && req->r_osd->o_osd == o && 1431 req->r_sent >= req->r_osd->o_incarnation && 1432 req->r_num_pg_osds == num && 1433 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1434 (req->r_osd == NULL && o == -1) || 1435 req->r_paused) 1436 return 0; /* no change */ 1437 1438 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1439 req->r_tid, pgid.pool, pgid.seed, o, 1440 req->r_osd ? req->r_osd->o_osd : -1); 1441 1442 /* record full pg acting set */ 1443 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1444 req->r_num_pg_osds = num; 1445 1446 if (req->r_osd) { 1447 __cancel_request(req); 1448 list_del_init(&req->r_osd_item); 1449 list_del_init(&req->r_linger_osd_item); 1450 req->r_osd = NULL; 1451 } 1452 1453 req->r_osd = __lookup_osd(osdc, o); 1454 if (!req->r_osd && o >= 0) { 1455 err = -ENOMEM; 1456 req->r_osd = create_osd(osdc, o); 1457 if (!req->r_osd) { 1458 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1459 goto out; 1460 } 1461 1462 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1463 __insert_osd(osdc, req->r_osd); 1464 1465 ceph_con_open(&req->r_osd->o_con, 1466 CEPH_ENTITY_TYPE_OSD, o, 1467 &osdc->osdmap->osd_addr[o]); 1468 } 1469 1470 __enqueue_request(req); 1471 err = 1; /* osd or pg changed */ 1472 1473 out: 1474 return err; 1475 } 1476 1477 /* 1478 * caller should hold map_sem (for read) and request_mutex 1479 */ 1480 static void __send_request(struct ceph_osd_client *osdc, 1481 struct ceph_osd_request *req) 1482 { 1483 void *p; 1484 1485 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1486 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1487 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1488 1489 /* fill in message content that changes each time we send it */ 1490 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1491 put_unaligned_le32(req->r_flags, req->r_request_flags); 1492 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); 1493 p = req->r_request_pgid; 1494 ceph_encode_64(&p, req->r_pgid.pool); 1495 ceph_encode_32(&p, req->r_pgid.seed); 1496 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1497 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1498 sizeof(req->r_reassert_version)); 1499 1500 req->r_stamp = jiffies; 1501 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1502 1503 ceph_msg_get(req->r_request); /* send consumes a ref */ 1504 1505 req->r_sent = req->r_osd->o_incarnation; 1506 1507 ceph_con_send(&req->r_osd->o_con, req->r_request); 1508 } 1509 1510 /* 1511 * Send any requests in the queue (req_unsent). 1512 */ 1513 static void __send_queued(struct ceph_osd_client *osdc) 1514 { 1515 struct ceph_osd_request *req, *tmp; 1516 1517 dout("__send_queued\n"); 1518 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1519 __send_request(osdc, req); 1520 } 1521 1522 /* 1523 * Caller should hold map_sem for read and request_mutex. 1524 */ 1525 static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, 1526 struct ceph_osd_request *req, 1527 bool nofail) 1528 { 1529 int rc; 1530 1531 __register_request(osdc, req); 1532 req->r_sent = 0; 1533 req->r_got_reply = 0; 1534 rc = __map_request(osdc, req, 0); 1535 if (rc < 0) { 1536 if (nofail) { 1537 dout("osdc_start_request failed map, " 1538 " will retry %lld\n", req->r_tid); 1539 rc = 0; 1540 } else { 1541 __unregister_request(osdc, req); 1542 } 1543 return rc; 1544 } 1545 1546 if (req->r_osd == NULL) { 1547 dout("send_request %p no up osds in pg\n", req); 1548 ceph_monc_request_next_osdmap(&osdc->client->monc); 1549 } else { 1550 __send_queued(osdc); 1551 } 1552 1553 return 0; 1554 } 1555 1556 /* 1557 * Timeout callback, called every N seconds when 1 or more osd 1558 * requests has been active for more than N seconds. When this 1559 * happens, we ping all OSDs with requests who have timed out to 1560 * ensure any communications channel reset is detected. Reset the 1561 * request timeouts another N seconds in the future as we go. 1562 * Reschedule the timeout event another N seconds in future (unless 1563 * there are no open requests). 1564 */ 1565 static void handle_timeout(struct work_struct *work) 1566 { 1567 struct ceph_osd_client *osdc = 1568 container_of(work, struct ceph_osd_client, timeout_work.work); 1569 struct ceph_osd_request *req; 1570 struct ceph_osd *osd; 1571 unsigned long keepalive = 1572 osdc->client->options->osd_keepalive_timeout * HZ; 1573 struct list_head slow_osds; 1574 dout("timeout\n"); 1575 down_read(&osdc->map_sem); 1576 1577 ceph_monc_request_next_osdmap(&osdc->client->monc); 1578 1579 mutex_lock(&osdc->request_mutex); 1580 1581 /* 1582 * ping osds that are a bit slow. this ensures that if there 1583 * is a break in the TCP connection we will notice, and reopen 1584 * a connection with that osd (from the fault callback). 1585 */ 1586 INIT_LIST_HEAD(&slow_osds); 1587 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1588 if (time_before(jiffies, req->r_stamp + keepalive)) 1589 break; 1590 1591 osd = req->r_osd; 1592 BUG_ON(!osd); 1593 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1594 req->r_tid, osd->o_osd); 1595 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1596 } 1597 while (!list_empty(&slow_osds)) { 1598 osd = list_entry(slow_osds.next, struct ceph_osd, 1599 o_keepalive_item); 1600 list_del_init(&osd->o_keepalive_item); 1601 ceph_con_keepalive(&osd->o_con); 1602 } 1603 1604 __schedule_osd_timeout(osdc); 1605 __send_queued(osdc); 1606 mutex_unlock(&osdc->request_mutex); 1607 up_read(&osdc->map_sem); 1608 } 1609 1610 static void handle_osds_timeout(struct work_struct *work) 1611 { 1612 struct ceph_osd_client *osdc = 1613 container_of(work, struct ceph_osd_client, 1614 osds_timeout_work.work); 1615 unsigned long delay = 1616 osdc->client->options->osd_idle_ttl * HZ >> 2; 1617 1618 dout("osds timeout\n"); 1619 down_read(&osdc->map_sem); 1620 remove_old_osds(osdc); 1621 up_read(&osdc->map_sem); 1622 1623 schedule_delayed_work(&osdc->osds_timeout_work, 1624 round_jiffies_relative(delay)); 1625 } 1626 1627 static int ceph_oloc_decode(void **p, void *end, 1628 struct ceph_object_locator *oloc) 1629 { 1630 u8 struct_v, struct_cv; 1631 u32 len; 1632 void *struct_end; 1633 int ret = 0; 1634 1635 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1636 struct_v = ceph_decode_8(p); 1637 struct_cv = ceph_decode_8(p); 1638 if (struct_v < 3) { 1639 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 1640 struct_v, struct_cv); 1641 goto e_inval; 1642 } 1643 if (struct_cv > 6) { 1644 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 1645 struct_v, struct_cv); 1646 goto e_inval; 1647 } 1648 len = ceph_decode_32(p); 1649 ceph_decode_need(p, end, len, e_inval); 1650 struct_end = *p + len; 1651 1652 oloc->pool = ceph_decode_64(p); 1653 *p += 4; /* skip preferred */ 1654 1655 len = ceph_decode_32(p); 1656 if (len > 0) { 1657 pr_warn("ceph_object_locator::key is set\n"); 1658 goto e_inval; 1659 } 1660 1661 if (struct_v >= 5) { 1662 len = ceph_decode_32(p); 1663 if (len > 0) { 1664 pr_warn("ceph_object_locator::nspace is set\n"); 1665 goto e_inval; 1666 } 1667 } 1668 1669 if (struct_v >= 6) { 1670 s64 hash = ceph_decode_64(p); 1671 if (hash != -1) { 1672 pr_warn("ceph_object_locator::hash is set\n"); 1673 goto e_inval; 1674 } 1675 } 1676 1677 /* skip the rest */ 1678 *p = struct_end; 1679 out: 1680 return ret; 1681 1682 e_inval: 1683 ret = -EINVAL; 1684 goto out; 1685 } 1686 1687 static int ceph_redirect_decode(void **p, void *end, 1688 struct ceph_request_redirect *redir) 1689 { 1690 u8 struct_v, struct_cv; 1691 u32 len; 1692 void *struct_end; 1693 int ret; 1694 1695 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1696 struct_v = ceph_decode_8(p); 1697 struct_cv = ceph_decode_8(p); 1698 if (struct_cv > 1) { 1699 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 1700 struct_v, struct_cv); 1701 goto e_inval; 1702 } 1703 len = ceph_decode_32(p); 1704 ceph_decode_need(p, end, len, e_inval); 1705 struct_end = *p + len; 1706 1707 ret = ceph_oloc_decode(p, end, &redir->oloc); 1708 if (ret) 1709 goto out; 1710 1711 len = ceph_decode_32(p); 1712 if (len > 0) { 1713 pr_warn("ceph_request_redirect::object_name is set\n"); 1714 goto e_inval; 1715 } 1716 1717 len = ceph_decode_32(p); 1718 *p += len; /* skip osd_instructions */ 1719 1720 /* skip the rest */ 1721 *p = struct_end; 1722 out: 1723 return ret; 1724 1725 e_inval: 1726 ret = -EINVAL; 1727 goto out; 1728 } 1729 1730 static void complete_request(struct ceph_osd_request *req) 1731 { 1732 complete_all(&req->r_safe_completion); /* fsync waiter */ 1733 } 1734 1735 /* 1736 * handle osd op reply. either call the callback if it is specified, 1737 * or do the completion to wake up the waiting thread. 1738 */ 1739 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1740 struct ceph_connection *con) 1741 { 1742 void *p, *end; 1743 struct ceph_osd_request *req; 1744 struct ceph_request_redirect redir; 1745 u64 tid; 1746 int object_len; 1747 unsigned int numops; 1748 int payload_len, flags; 1749 s32 result; 1750 s32 retry_attempt; 1751 struct ceph_pg pg; 1752 int err; 1753 u32 reassert_epoch; 1754 u64 reassert_version; 1755 u32 osdmap_epoch; 1756 int already_completed; 1757 u32 bytes; 1758 unsigned int i; 1759 1760 tid = le64_to_cpu(msg->hdr.tid); 1761 dout("handle_reply %p tid %llu\n", msg, tid); 1762 1763 p = msg->front.iov_base; 1764 end = p + msg->front.iov_len; 1765 1766 ceph_decode_need(&p, end, 4, bad); 1767 object_len = ceph_decode_32(&p); 1768 ceph_decode_need(&p, end, object_len, bad); 1769 p += object_len; 1770 1771 err = ceph_decode_pgid(&p, end, &pg); 1772 if (err) 1773 goto bad; 1774 1775 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1776 flags = ceph_decode_64(&p); 1777 result = ceph_decode_32(&p); 1778 reassert_epoch = ceph_decode_32(&p); 1779 reassert_version = ceph_decode_64(&p); 1780 osdmap_epoch = ceph_decode_32(&p); 1781 1782 /* lookup */ 1783 down_read(&osdc->map_sem); 1784 mutex_lock(&osdc->request_mutex); 1785 req = __lookup_request(osdc, tid); 1786 if (req == NULL) { 1787 dout("handle_reply tid %llu dne\n", tid); 1788 goto bad_mutex; 1789 } 1790 ceph_osdc_get_request(req); 1791 1792 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1793 req, result); 1794 1795 ceph_decode_need(&p, end, 4, bad_put); 1796 numops = ceph_decode_32(&p); 1797 if (numops > CEPH_OSD_MAX_OP) 1798 goto bad_put; 1799 if (numops != req->r_num_ops) 1800 goto bad_put; 1801 payload_len = 0; 1802 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); 1803 for (i = 0; i < numops; i++) { 1804 struct ceph_osd_op *op = p; 1805 int len; 1806 1807 len = le32_to_cpu(op->payload_len); 1808 req->r_reply_op_len[i] = len; 1809 dout(" op %d has %d bytes\n", i, len); 1810 payload_len += len; 1811 p += sizeof(*op); 1812 } 1813 bytes = le32_to_cpu(msg->hdr.data_len); 1814 if (payload_len != bytes) { 1815 pr_warn("sum of op payload lens %d != data_len %d\n", 1816 payload_len, bytes); 1817 goto bad_put; 1818 } 1819 1820 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1821 retry_attempt = ceph_decode_32(&p); 1822 for (i = 0; i < numops; i++) 1823 req->r_reply_op_result[i] = ceph_decode_32(&p); 1824 1825 if (le16_to_cpu(msg->hdr.version) >= 6) { 1826 p += 8 + 4; /* skip replay_version */ 1827 p += 8; /* skip user_version */ 1828 1829 err = ceph_redirect_decode(&p, end, &redir); 1830 if (err) 1831 goto bad_put; 1832 } else { 1833 redir.oloc.pool = -1; 1834 } 1835 1836 if (redir.oloc.pool != -1) { 1837 dout("redirect pool %lld\n", redir.oloc.pool); 1838 1839 __unregister_request(osdc, req); 1840 1841 req->r_target_oloc = redir.oloc; /* struct */ 1842 1843 /* 1844 * Start redirect requests with nofail=true. If 1845 * mapping fails, request will end up on the notarget 1846 * list, waiting for the new osdmap (which can take 1847 * a while), even though the original request mapped 1848 * successfully. In the future we might want to follow 1849 * original request's nofail setting here. 1850 */ 1851 err = __ceph_osdc_start_request(osdc, req, true); 1852 BUG_ON(err); 1853 1854 goto out_unlock; 1855 } 1856 1857 already_completed = req->r_got_reply; 1858 if (!req->r_got_reply) { 1859 req->r_result = result; 1860 dout("handle_reply result %d bytes %d\n", req->r_result, 1861 bytes); 1862 if (req->r_result == 0) 1863 req->r_result = bytes; 1864 1865 /* in case this is a write and we need to replay, */ 1866 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1867 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1868 1869 req->r_got_reply = 1; 1870 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1871 dout("handle_reply tid %llu dup ack\n", tid); 1872 goto out_unlock; 1873 } 1874 1875 dout("handle_reply tid %llu flags %d\n", tid, flags); 1876 1877 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1878 __register_linger_request(osdc, req); 1879 1880 /* either this is a read, or we got the safe response */ 1881 if (result < 0 || 1882 (flags & CEPH_OSD_FLAG_ONDISK) || 1883 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1884 __unregister_request(osdc, req); 1885 1886 mutex_unlock(&osdc->request_mutex); 1887 up_read(&osdc->map_sem); 1888 1889 if (!already_completed) { 1890 if (req->r_unsafe_callback && 1891 result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) 1892 req->r_unsafe_callback(req, true); 1893 if (req->r_callback) 1894 req->r_callback(req, msg); 1895 else 1896 complete_all(&req->r_completion); 1897 } 1898 1899 if (flags & CEPH_OSD_FLAG_ONDISK) { 1900 if (req->r_unsafe_callback && already_completed) 1901 req->r_unsafe_callback(req, false); 1902 complete_request(req); 1903 } 1904 1905 out: 1906 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1907 ceph_osdc_put_request(req); 1908 return; 1909 out_unlock: 1910 mutex_unlock(&osdc->request_mutex); 1911 up_read(&osdc->map_sem); 1912 goto out; 1913 1914 bad_put: 1915 req->r_result = -EIO; 1916 __unregister_request(osdc, req); 1917 if (req->r_callback) 1918 req->r_callback(req, msg); 1919 else 1920 complete_all(&req->r_completion); 1921 complete_request(req); 1922 ceph_osdc_put_request(req); 1923 bad_mutex: 1924 mutex_unlock(&osdc->request_mutex); 1925 up_read(&osdc->map_sem); 1926 bad: 1927 pr_err("corrupt osd_op_reply got %d %d\n", 1928 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1929 ceph_msg_dump(msg); 1930 } 1931 1932 static void reset_changed_osds(struct ceph_osd_client *osdc) 1933 { 1934 struct rb_node *p, *n; 1935 1936 for (p = rb_first(&osdc->osds); p; p = n) { 1937 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1938 1939 n = rb_next(p); 1940 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1941 memcmp(&osd->o_con.peer_addr, 1942 ceph_osd_addr(osdc->osdmap, 1943 osd->o_osd), 1944 sizeof(struct ceph_entity_addr)) != 0) 1945 __reset_osd(osdc, osd); 1946 } 1947 } 1948 1949 /* 1950 * Requeue requests whose mapping to an OSD has changed. If requests map to 1951 * no osd, request a new map. 1952 * 1953 * Caller should hold map_sem for read. 1954 */ 1955 static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, 1956 bool force_resend_writes) 1957 { 1958 struct ceph_osd_request *req, *nreq; 1959 struct rb_node *p; 1960 int needmap = 0; 1961 int err; 1962 bool force_resend_req; 1963 1964 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", 1965 force_resend_writes ? " (force resend writes)" : ""); 1966 mutex_lock(&osdc->request_mutex); 1967 for (p = rb_first(&osdc->requests); p; ) { 1968 req = rb_entry(p, struct ceph_osd_request, r_node); 1969 p = rb_next(p); 1970 1971 /* 1972 * For linger requests that have not yet been 1973 * registered, move them to the linger list; they'll 1974 * be sent to the osd in the loop below. Unregister 1975 * the request before re-registering it as a linger 1976 * request to ensure the __map_request() below 1977 * will decide it needs to be sent. 1978 */ 1979 if (req->r_linger && list_empty(&req->r_linger_item)) { 1980 dout("%p tid %llu restart on osd%d\n", 1981 req, req->r_tid, 1982 req->r_osd ? req->r_osd->o_osd : -1); 1983 ceph_osdc_get_request(req); 1984 __unregister_request(osdc, req); 1985 __register_linger_request(osdc, req); 1986 ceph_osdc_put_request(req); 1987 continue; 1988 } 1989 1990 force_resend_req = force_resend || 1991 (force_resend_writes && 1992 req->r_flags & CEPH_OSD_FLAG_WRITE); 1993 err = __map_request(osdc, req, force_resend_req); 1994 if (err < 0) 1995 continue; /* error */ 1996 if (req->r_osd == NULL) { 1997 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1998 needmap++; /* request a newer map */ 1999 } else if (err > 0) { 2000 if (!req->r_linger) { 2001 dout("%p tid %llu requeued on osd%d\n", req, 2002 req->r_tid, 2003 req->r_osd ? req->r_osd->o_osd : -1); 2004 req->r_flags |= CEPH_OSD_FLAG_RETRY; 2005 } 2006 } 2007 } 2008 2009 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 2010 r_linger_item) { 2011 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 2012 2013 err = __map_request(osdc, req, 2014 force_resend || force_resend_writes); 2015 dout("__map_request returned %d\n", err); 2016 if (err == 0) 2017 continue; /* no change and no osd was specified */ 2018 if (err < 0) 2019 continue; /* hrm! */ 2020 if (req->r_osd == NULL) { 2021 dout("tid %llu maps to no valid osd\n", req->r_tid); 2022 needmap++; /* request a newer map */ 2023 continue; 2024 } 2025 2026 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 2027 req->r_osd ? req->r_osd->o_osd : -1); 2028 __register_request(osdc, req); 2029 __unregister_linger_request(osdc, req); 2030 } 2031 reset_changed_osds(osdc); 2032 mutex_unlock(&osdc->request_mutex); 2033 2034 if (needmap) { 2035 dout("%d requests for down osds, need new map\n", needmap); 2036 ceph_monc_request_next_osdmap(&osdc->client->monc); 2037 } 2038 } 2039 2040 2041 /* 2042 * Process updated osd map. 2043 * 2044 * The message contains any number of incremental and full maps, normally 2045 * indicating some sort of topology change in the cluster. Kick requests 2046 * off to different OSDs as needed. 2047 */ 2048 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 2049 { 2050 void *p, *end, *next; 2051 u32 nr_maps, maplen; 2052 u32 epoch; 2053 struct ceph_osdmap *newmap = NULL, *oldmap; 2054 int err; 2055 struct ceph_fsid fsid; 2056 bool was_full; 2057 2058 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 2059 p = msg->front.iov_base; 2060 end = p + msg->front.iov_len; 2061 2062 /* verify fsid */ 2063 ceph_decode_need(&p, end, sizeof(fsid), bad); 2064 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 2065 if (ceph_check_fsid(osdc->client, &fsid) < 0) 2066 return; 2067 2068 down_write(&osdc->map_sem); 2069 2070 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 2071 2072 /* incremental maps */ 2073 ceph_decode_32_safe(&p, end, nr_maps, bad); 2074 dout(" %d inc maps\n", nr_maps); 2075 while (nr_maps > 0) { 2076 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2077 epoch = ceph_decode_32(&p); 2078 maplen = ceph_decode_32(&p); 2079 ceph_decode_need(&p, end, maplen, bad); 2080 next = p + maplen; 2081 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 2082 dout("applying incremental map %u len %d\n", 2083 epoch, maplen); 2084 newmap = osdmap_apply_incremental(&p, next, 2085 osdc->osdmap, 2086 &osdc->client->msgr); 2087 if (IS_ERR(newmap)) { 2088 err = PTR_ERR(newmap); 2089 goto bad; 2090 } 2091 BUG_ON(!newmap); 2092 if (newmap != osdc->osdmap) { 2093 ceph_osdmap_destroy(osdc->osdmap); 2094 osdc->osdmap = newmap; 2095 } 2096 was_full = was_full || 2097 ceph_osdmap_flag(osdc->osdmap, 2098 CEPH_OSDMAP_FULL); 2099 kick_requests(osdc, 0, was_full); 2100 } else { 2101 dout("ignoring incremental map %u len %d\n", 2102 epoch, maplen); 2103 } 2104 p = next; 2105 nr_maps--; 2106 } 2107 if (newmap) 2108 goto done; 2109 2110 /* full maps */ 2111 ceph_decode_32_safe(&p, end, nr_maps, bad); 2112 dout(" %d full maps\n", nr_maps); 2113 while (nr_maps) { 2114 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2115 epoch = ceph_decode_32(&p); 2116 maplen = ceph_decode_32(&p); 2117 ceph_decode_need(&p, end, maplen, bad); 2118 if (nr_maps > 1) { 2119 dout("skipping non-latest full map %u len %d\n", 2120 epoch, maplen); 2121 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 2122 dout("skipping full map %u len %d, " 2123 "older than our %u\n", epoch, maplen, 2124 osdc->osdmap->epoch); 2125 } else { 2126 int skipped_map = 0; 2127 2128 dout("taking full map %u len %d\n", epoch, maplen); 2129 newmap = ceph_osdmap_decode(&p, p+maplen); 2130 if (IS_ERR(newmap)) { 2131 err = PTR_ERR(newmap); 2132 goto bad; 2133 } 2134 BUG_ON(!newmap); 2135 oldmap = osdc->osdmap; 2136 osdc->osdmap = newmap; 2137 if (oldmap) { 2138 if (oldmap->epoch + 1 < newmap->epoch) 2139 skipped_map = 1; 2140 ceph_osdmap_destroy(oldmap); 2141 } 2142 was_full = was_full || 2143 ceph_osdmap_flag(osdc->osdmap, 2144 CEPH_OSDMAP_FULL); 2145 kick_requests(osdc, skipped_map, was_full); 2146 } 2147 p += maplen; 2148 nr_maps--; 2149 } 2150 2151 if (!osdc->osdmap) 2152 goto bad; 2153 done: 2154 downgrade_write(&osdc->map_sem); 2155 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 2156 2157 /* 2158 * subscribe to subsequent osdmap updates if full to ensure 2159 * we find out when we are no longer full and stop returning 2160 * ENOSPC. 2161 */ 2162 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || 2163 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || 2164 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) 2165 ceph_monc_request_next_osdmap(&osdc->client->monc); 2166 2167 mutex_lock(&osdc->request_mutex); 2168 __send_queued(osdc); 2169 mutex_unlock(&osdc->request_mutex); 2170 up_read(&osdc->map_sem); 2171 wake_up_all(&osdc->client->auth_wq); 2172 return; 2173 2174 bad: 2175 pr_err("osdc handle_map corrupt msg\n"); 2176 ceph_msg_dump(msg); 2177 up_write(&osdc->map_sem); 2178 } 2179 2180 /* 2181 * watch/notify callback event infrastructure 2182 * 2183 * These callbacks are used both for watch and notify operations. 2184 */ 2185 static void __release_event(struct kref *kref) 2186 { 2187 struct ceph_osd_event *event = 2188 container_of(kref, struct ceph_osd_event, kref); 2189 2190 dout("__release_event %p\n", event); 2191 kfree(event); 2192 } 2193 2194 static void get_event(struct ceph_osd_event *event) 2195 { 2196 kref_get(&event->kref); 2197 } 2198 2199 void ceph_osdc_put_event(struct ceph_osd_event *event) 2200 { 2201 kref_put(&event->kref, __release_event); 2202 } 2203 EXPORT_SYMBOL(ceph_osdc_put_event); 2204 2205 static void __insert_event(struct ceph_osd_client *osdc, 2206 struct ceph_osd_event *new) 2207 { 2208 struct rb_node **p = &osdc->event_tree.rb_node; 2209 struct rb_node *parent = NULL; 2210 struct ceph_osd_event *event = NULL; 2211 2212 while (*p) { 2213 parent = *p; 2214 event = rb_entry(parent, struct ceph_osd_event, node); 2215 if (new->cookie < event->cookie) 2216 p = &(*p)->rb_left; 2217 else if (new->cookie > event->cookie) 2218 p = &(*p)->rb_right; 2219 else 2220 BUG(); 2221 } 2222 2223 rb_link_node(&new->node, parent, p); 2224 rb_insert_color(&new->node, &osdc->event_tree); 2225 } 2226 2227 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 2228 u64 cookie) 2229 { 2230 struct rb_node **p = &osdc->event_tree.rb_node; 2231 struct rb_node *parent = NULL; 2232 struct ceph_osd_event *event = NULL; 2233 2234 while (*p) { 2235 parent = *p; 2236 event = rb_entry(parent, struct ceph_osd_event, node); 2237 if (cookie < event->cookie) 2238 p = &(*p)->rb_left; 2239 else if (cookie > event->cookie) 2240 p = &(*p)->rb_right; 2241 else 2242 return event; 2243 } 2244 return NULL; 2245 } 2246 2247 static void __remove_event(struct ceph_osd_event *event) 2248 { 2249 struct ceph_osd_client *osdc = event->osdc; 2250 2251 if (!RB_EMPTY_NODE(&event->node)) { 2252 dout("__remove_event removed %p\n", event); 2253 rb_erase(&event->node, &osdc->event_tree); 2254 ceph_osdc_put_event(event); 2255 } else { 2256 dout("__remove_event didn't remove %p\n", event); 2257 } 2258 } 2259 2260 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 2261 void (*event_cb)(u64, u64, u8, void *), 2262 void *data, struct ceph_osd_event **pevent) 2263 { 2264 struct ceph_osd_event *event; 2265 2266 event = kmalloc(sizeof(*event), GFP_NOIO); 2267 if (!event) 2268 return -ENOMEM; 2269 2270 dout("create_event %p\n", event); 2271 event->cb = event_cb; 2272 event->one_shot = 0; 2273 event->data = data; 2274 event->osdc = osdc; 2275 INIT_LIST_HEAD(&event->osd_node); 2276 RB_CLEAR_NODE(&event->node); 2277 kref_init(&event->kref); /* one ref for us */ 2278 kref_get(&event->kref); /* one ref for the caller */ 2279 2280 spin_lock(&osdc->event_lock); 2281 event->cookie = ++osdc->event_count; 2282 __insert_event(osdc, event); 2283 spin_unlock(&osdc->event_lock); 2284 2285 *pevent = event; 2286 return 0; 2287 } 2288 EXPORT_SYMBOL(ceph_osdc_create_event); 2289 2290 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 2291 { 2292 struct ceph_osd_client *osdc = event->osdc; 2293 2294 dout("cancel_event %p\n", event); 2295 spin_lock(&osdc->event_lock); 2296 __remove_event(event); 2297 spin_unlock(&osdc->event_lock); 2298 ceph_osdc_put_event(event); /* caller's */ 2299 } 2300 EXPORT_SYMBOL(ceph_osdc_cancel_event); 2301 2302 2303 static void do_event_work(struct work_struct *work) 2304 { 2305 struct ceph_osd_event_work *event_work = 2306 container_of(work, struct ceph_osd_event_work, work); 2307 struct ceph_osd_event *event = event_work->event; 2308 u64 ver = event_work->ver; 2309 u64 notify_id = event_work->notify_id; 2310 u8 opcode = event_work->opcode; 2311 2312 dout("do_event_work completing %p\n", event); 2313 event->cb(ver, notify_id, opcode, event->data); 2314 dout("do_event_work completed %p\n", event); 2315 ceph_osdc_put_event(event); 2316 kfree(event_work); 2317 } 2318 2319 2320 /* 2321 * Process osd watch notifications 2322 */ 2323 static void handle_watch_notify(struct ceph_osd_client *osdc, 2324 struct ceph_msg *msg) 2325 { 2326 void *p, *end; 2327 u8 proto_ver; 2328 u64 cookie, ver, notify_id; 2329 u8 opcode; 2330 struct ceph_osd_event *event; 2331 struct ceph_osd_event_work *event_work; 2332 2333 p = msg->front.iov_base; 2334 end = p + msg->front.iov_len; 2335 2336 ceph_decode_8_safe(&p, end, proto_ver, bad); 2337 ceph_decode_8_safe(&p, end, opcode, bad); 2338 ceph_decode_64_safe(&p, end, cookie, bad); 2339 ceph_decode_64_safe(&p, end, ver, bad); 2340 ceph_decode_64_safe(&p, end, notify_id, bad); 2341 2342 spin_lock(&osdc->event_lock); 2343 event = __find_event(osdc, cookie); 2344 if (event) { 2345 BUG_ON(event->one_shot); 2346 get_event(event); 2347 } 2348 spin_unlock(&osdc->event_lock); 2349 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 2350 cookie, ver, event); 2351 if (event) { 2352 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 2353 if (!event_work) { 2354 pr_err("couldn't allocate event_work\n"); 2355 ceph_osdc_put_event(event); 2356 return; 2357 } 2358 INIT_WORK(&event_work->work, do_event_work); 2359 event_work->event = event; 2360 event_work->ver = ver; 2361 event_work->notify_id = notify_id; 2362 event_work->opcode = opcode; 2363 2364 queue_work(osdc->notify_wq, &event_work->work); 2365 } 2366 2367 return; 2368 2369 bad: 2370 pr_err("osdc handle_watch_notify corrupt msg\n"); 2371 } 2372 2373 /* 2374 * build new request AND message 2375 * 2376 */ 2377 void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, 2378 struct ceph_snap_context *snapc, u64 snap_id, 2379 struct timespec *mtime) 2380 { 2381 struct ceph_msg *msg = req->r_request; 2382 void *p; 2383 size_t msg_size; 2384 int flags = req->r_flags; 2385 u64 data_len; 2386 unsigned int i; 2387 2388 req->r_snapid = snap_id; 2389 req->r_snapc = ceph_get_snap_context(snapc); 2390 2391 /* encode request */ 2392 msg->hdr.version = cpu_to_le16(4); 2393 2394 p = msg->front.iov_base; 2395 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 2396 req->r_request_osdmap_epoch = p; 2397 p += 4; 2398 req->r_request_flags = p; 2399 p += 4; 2400 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 2401 ceph_encode_timespec(p, mtime); 2402 p += sizeof(struct ceph_timespec); 2403 req->r_request_reassert_version = p; 2404 p += sizeof(struct ceph_eversion); /* will get filled in */ 2405 2406 /* oloc */ 2407 ceph_encode_8(&p, 4); 2408 ceph_encode_8(&p, 4); 2409 ceph_encode_32(&p, 8 + 4 + 4); 2410 req->r_request_pool = p; 2411 p += 8; 2412 ceph_encode_32(&p, -1); /* preferred */ 2413 ceph_encode_32(&p, 0); /* key len */ 2414 2415 ceph_encode_8(&p, 1); 2416 req->r_request_pgid = p; 2417 p += 8 + 4; 2418 ceph_encode_32(&p, -1); /* preferred */ 2419 2420 /* oid */ 2421 ceph_encode_32(&p, req->r_base_oid.name_len); 2422 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); 2423 dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, 2424 req->r_base_oid.name, req->r_base_oid.name_len); 2425 p += req->r_base_oid.name_len; 2426 2427 /* ops--can imply data */ 2428 ceph_encode_16(&p, (u16)req->r_num_ops); 2429 data_len = 0; 2430 for (i = 0; i < req->r_num_ops; i++) { 2431 data_len += osd_req_encode_op(req, p, i); 2432 p += sizeof(struct ceph_osd_op); 2433 } 2434 2435 /* snaps */ 2436 ceph_encode_64(&p, req->r_snapid); 2437 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 2438 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 2439 if (req->r_snapc) { 2440 for (i = 0; i < snapc->num_snaps; i++) { 2441 ceph_encode_64(&p, req->r_snapc->snaps[i]); 2442 } 2443 } 2444 2445 req->r_request_attempts = p; 2446 p += 4; 2447 2448 /* data */ 2449 if (flags & CEPH_OSD_FLAG_WRITE) { 2450 u16 data_off; 2451 2452 /* 2453 * The header "data_off" is a hint to the receiver 2454 * allowing it to align received data into its 2455 * buffers such that there's no need to re-copy 2456 * it before writing it to disk (direct I/O). 2457 */ 2458 data_off = (u16) (off & 0xffff); 2459 req->r_request->hdr.data_off = cpu_to_le16(data_off); 2460 } 2461 req->r_request->hdr.data_len = cpu_to_le32(data_len); 2462 2463 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 2464 msg_size = p - msg->front.iov_base; 2465 msg->front.iov_len = msg_size; 2466 msg->hdr.front_len = cpu_to_le32(msg_size); 2467 2468 dout("build_request msg_size was %d\n", (int)msg_size); 2469 } 2470 EXPORT_SYMBOL(ceph_osdc_build_request); 2471 2472 /* 2473 * Register request, send initial attempt. 2474 */ 2475 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2476 struct ceph_osd_request *req, 2477 bool nofail) 2478 { 2479 int rc; 2480 2481 down_read(&osdc->map_sem); 2482 mutex_lock(&osdc->request_mutex); 2483 2484 rc = __ceph_osdc_start_request(osdc, req, nofail); 2485 2486 mutex_unlock(&osdc->request_mutex); 2487 up_read(&osdc->map_sem); 2488 2489 return rc; 2490 } 2491 EXPORT_SYMBOL(ceph_osdc_start_request); 2492 2493 /* 2494 * Unregister a registered request. The request is not completed (i.e. 2495 * no callbacks or wakeups) - higher layers are supposed to know what 2496 * they are canceling. 2497 */ 2498 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 2499 { 2500 struct ceph_osd_client *osdc = req->r_osdc; 2501 2502 mutex_lock(&osdc->request_mutex); 2503 if (req->r_linger) 2504 __unregister_linger_request(osdc, req); 2505 __unregister_request(osdc, req); 2506 mutex_unlock(&osdc->request_mutex); 2507 2508 dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid); 2509 } 2510 EXPORT_SYMBOL(ceph_osdc_cancel_request); 2511 2512 /* 2513 * wait for a request to complete 2514 */ 2515 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 2516 struct ceph_osd_request *req) 2517 { 2518 int rc; 2519 2520 dout("%s %p tid %llu\n", __func__, req, req->r_tid); 2521 2522 rc = wait_for_completion_interruptible(&req->r_completion); 2523 if (rc < 0) { 2524 dout("%s %p tid %llu interrupted\n", __func__, req, req->r_tid); 2525 ceph_osdc_cancel_request(req); 2526 complete_request(req); 2527 return rc; 2528 } 2529 2530 dout("%s %p tid %llu result %d\n", __func__, req, req->r_tid, 2531 req->r_result); 2532 return req->r_result; 2533 } 2534 EXPORT_SYMBOL(ceph_osdc_wait_request); 2535 2536 /* 2537 * sync - wait for all in-flight requests to flush. avoid starvation. 2538 */ 2539 void ceph_osdc_sync(struct ceph_osd_client *osdc) 2540 { 2541 struct ceph_osd_request *req; 2542 u64 last_tid, next_tid = 0; 2543 2544 mutex_lock(&osdc->request_mutex); 2545 last_tid = osdc->last_tid; 2546 while (1) { 2547 req = __lookup_request_ge(osdc, next_tid); 2548 if (!req) 2549 break; 2550 if (req->r_tid > last_tid) 2551 break; 2552 2553 next_tid = req->r_tid + 1; 2554 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 2555 continue; 2556 2557 ceph_osdc_get_request(req); 2558 mutex_unlock(&osdc->request_mutex); 2559 dout("sync waiting on tid %llu (last is %llu)\n", 2560 req->r_tid, last_tid); 2561 wait_for_completion(&req->r_safe_completion); 2562 mutex_lock(&osdc->request_mutex); 2563 ceph_osdc_put_request(req); 2564 } 2565 mutex_unlock(&osdc->request_mutex); 2566 dout("sync done (thru tid %llu)\n", last_tid); 2567 } 2568 EXPORT_SYMBOL(ceph_osdc_sync); 2569 2570 /* 2571 * Call all pending notify callbacks - for use after a watch is 2572 * unregistered, to make sure no more callbacks for it will be invoked 2573 */ 2574 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 2575 { 2576 flush_workqueue(osdc->notify_wq); 2577 } 2578 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 2579 2580 2581 /* 2582 * init, shutdown 2583 */ 2584 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 2585 { 2586 int err; 2587 2588 dout("init\n"); 2589 osdc->client = client; 2590 osdc->osdmap = NULL; 2591 init_rwsem(&osdc->map_sem); 2592 init_completion(&osdc->map_waiters); 2593 osdc->last_requested_map = 0; 2594 mutex_init(&osdc->request_mutex); 2595 osdc->last_tid = 0; 2596 osdc->osds = RB_ROOT; 2597 INIT_LIST_HEAD(&osdc->osd_lru); 2598 osdc->requests = RB_ROOT; 2599 INIT_LIST_HEAD(&osdc->req_lru); 2600 INIT_LIST_HEAD(&osdc->req_unsent); 2601 INIT_LIST_HEAD(&osdc->req_notarget); 2602 INIT_LIST_HEAD(&osdc->req_linger); 2603 osdc->num_requests = 0; 2604 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 2605 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 2606 spin_lock_init(&osdc->event_lock); 2607 osdc->event_tree = RB_ROOT; 2608 osdc->event_count = 0; 2609 2610 schedule_delayed_work(&osdc->osds_timeout_work, 2611 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 2612 2613 err = -ENOMEM; 2614 osdc->req_mempool = mempool_create_kmalloc_pool(10, 2615 sizeof(struct ceph_osd_request)); 2616 if (!osdc->req_mempool) 2617 goto out; 2618 2619 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2620 OSD_OP_FRONT_LEN, 10, true, 2621 "osd_op"); 2622 if (err < 0) 2623 goto out_mempool; 2624 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2625 OSD_OPREPLY_FRONT_LEN, 10, true, 2626 "osd_op_reply"); 2627 if (err < 0) 2628 goto out_msgpool; 2629 2630 err = -ENOMEM; 2631 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2632 if (!osdc->notify_wq) 2633 goto out_msgpool_reply; 2634 2635 return 0; 2636 2637 out_msgpool_reply: 2638 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2639 out_msgpool: 2640 ceph_msgpool_destroy(&osdc->msgpool_op); 2641 out_mempool: 2642 mempool_destroy(osdc->req_mempool); 2643 out: 2644 return err; 2645 } 2646 2647 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2648 { 2649 flush_workqueue(osdc->notify_wq); 2650 destroy_workqueue(osdc->notify_wq); 2651 cancel_delayed_work_sync(&osdc->timeout_work); 2652 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2653 if (osdc->osdmap) { 2654 ceph_osdmap_destroy(osdc->osdmap); 2655 osdc->osdmap = NULL; 2656 } 2657 remove_all_osds(osdc); 2658 mempool_destroy(osdc->req_mempool); 2659 ceph_msgpool_destroy(&osdc->msgpool_op); 2660 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2661 } 2662 2663 /* 2664 * Read some contiguous pages. If we cross a stripe boundary, shorten 2665 * *plen. Return number of bytes read, or error. 2666 */ 2667 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2668 struct ceph_vino vino, struct ceph_file_layout *layout, 2669 u64 off, u64 *plen, 2670 u32 truncate_seq, u64 truncate_size, 2671 struct page **pages, int num_pages, int page_align) 2672 { 2673 struct ceph_osd_request *req; 2674 int rc = 0; 2675 2676 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2677 vino.snap, off, *plen); 2678 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, 2679 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2680 NULL, truncate_seq, truncate_size, 2681 false); 2682 if (IS_ERR(req)) 2683 return PTR_ERR(req); 2684 2685 /* it may be a short read due to an object boundary */ 2686 2687 osd_req_op_extent_osd_data_pages(req, 0, 2688 pages, *plen, page_align, false, false); 2689 2690 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2691 off, *plen, *plen, page_align); 2692 2693 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); 2694 2695 rc = ceph_osdc_start_request(osdc, req, false); 2696 if (!rc) 2697 rc = ceph_osdc_wait_request(osdc, req); 2698 2699 ceph_osdc_put_request(req); 2700 dout("readpages result %d\n", rc); 2701 return rc; 2702 } 2703 EXPORT_SYMBOL(ceph_osdc_readpages); 2704 2705 /* 2706 * do a synchronous write on N pages 2707 */ 2708 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2709 struct ceph_file_layout *layout, 2710 struct ceph_snap_context *snapc, 2711 u64 off, u64 len, 2712 u32 truncate_seq, u64 truncate_size, 2713 struct timespec *mtime, 2714 struct page **pages, int num_pages) 2715 { 2716 struct ceph_osd_request *req; 2717 int rc = 0; 2718 int page_align = off & ~PAGE_MASK; 2719 2720 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ 2721 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, 2722 CEPH_OSD_OP_WRITE, 2723 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2724 snapc, truncate_seq, truncate_size, 2725 true); 2726 if (IS_ERR(req)) 2727 return PTR_ERR(req); 2728 2729 /* it may be a short write due to an object boundary */ 2730 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 2731 false, false); 2732 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2733 2734 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2735 2736 rc = ceph_osdc_start_request(osdc, req, true); 2737 if (!rc) 2738 rc = ceph_osdc_wait_request(osdc, req); 2739 2740 ceph_osdc_put_request(req); 2741 if (rc == 0) 2742 rc = len; 2743 dout("writepages result %d\n", rc); 2744 return rc; 2745 } 2746 EXPORT_SYMBOL(ceph_osdc_writepages); 2747 2748 int ceph_osdc_setup(void) 2749 { 2750 BUG_ON(ceph_osd_request_cache); 2751 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", 2752 sizeof (struct ceph_osd_request), 2753 __alignof__(struct ceph_osd_request), 2754 0, NULL); 2755 2756 return ceph_osd_request_cache ? 0 : -ENOMEM; 2757 } 2758 EXPORT_SYMBOL(ceph_osdc_setup); 2759 2760 void ceph_osdc_cleanup(void) 2761 { 2762 BUG_ON(!ceph_osd_request_cache); 2763 kmem_cache_destroy(ceph_osd_request_cache); 2764 ceph_osd_request_cache = NULL; 2765 } 2766 EXPORT_SYMBOL(ceph_osdc_cleanup); 2767 2768 /* 2769 * handle incoming message 2770 */ 2771 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2772 { 2773 struct ceph_osd *osd = con->private; 2774 struct ceph_osd_client *osdc; 2775 int type = le16_to_cpu(msg->hdr.type); 2776 2777 if (!osd) 2778 goto out; 2779 osdc = osd->o_osdc; 2780 2781 switch (type) { 2782 case CEPH_MSG_OSD_MAP: 2783 ceph_osdc_handle_map(osdc, msg); 2784 break; 2785 case CEPH_MSG_OSD_OPREPLY: 2786 handle_reply(osdc, msg, con); 2787 break; 2788 case CEPH_MSG_WATCH_NOTIFY: 2789 handle_watch_notify(osdc, msg); 2790 break; 2791 2792 default: 2793 pr_err("received unknown message type %d %s\n", type, 2794 ceph_msg_type_name(type)); 2795 } 2796 out: 2797 ceph_msg_put(msg); 2798 } 2799 2800 /* 2801 * lookup and return message for incoming reply. set up reply message 2802 * pages. 2803 */ 2804 static struct ceph_msg *get_reply(struct ceph_connection *con, 2805 struct ceph_msg_header *hdr, 2806 int *skip) 2807 { 2808 struct ceph_osd *osd = con->private; 2809 struct ceph_osd_client *osdc = osd->o_osdc; 2810 struct ceph_msg *m; 2811 struct ceph_osd_request *req; 2812 int front_len = le32_to_cpu(hdr->front_len); 2813 int data_len = le32_to_cpu(hdr->data_len); 2814 u64 tid; 2815 2816 tid = le64_to_cpu(hdr->tid); 2817 mutex_lock(&osdc->request_mutex); 2818 req = __lookup_request(osdc, tid); 2819 if (!req) { 2820 *skip = 1; 2821 m = NULL; 2822 dout("get_reply unknown tid %llu from osd%d\n", tid, 2823 osd->o_osd); 2824 goto out; 2825 } 2826 2827 if (req->r_reply->con) 2828 dout("%s revoking msg %p from old con %p\n", __func__, 2829 req->r_reply, req->r_reply->con); 2830 ceph_msg_revoke_incoming(req->r_reply); 2831 2832 if (front_len > req->r_reply->front_alloc_len) { 2833 pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n", 2834 front_len, req->r_reply->front_alloc_len, 2835 (unsigned int)con->peer_name.type, 2836 le64_to_cpu(con->peer_name.num)); 2837 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 2838 false); 2839 if (!m) 2840 goto out; 2841 ceph_msg_put(req->r_reply); 2842 req->r_reply = m; 2843 } 2844 m = ceph_msg_get(req->r_reply); 2845 2846 if (data_len > 0) { 2847 struct ceph_osd_data *osd_data; 2848 2849 /* 2850 * XXX This is assuming there is only one op containing 2851 * XXX page data. Probably OK for reads, but this 2852 * XXX ought to be done more generally. 2853 */ 2854 osd_data = osd_req_op_extent_osd_data(req, 0); 2855 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2856 if (osd_data->pages && 2857 unlikely(osd_data->length < data_len)) { 2858 2859 pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n", 2860 tid, data_len, osd_data->length); 2861 *skip = 1; 2862 ceph_msg_put(m); 2863 m = NULL; 2864 goto out; 2865 } 2866 } 2867 } 2868 *skip = 0; 2869 dout("get_reply tid %lld %p\n", tid, m); 2870 2871 out: 2872 mutex_unlock(&osdc->request_mutex); 2873 return m; 2874 2875 } 2876 2877 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2878 struct ceph_msg_header *hdr, 2879 int *skip) 2880 { 2881 struct ceph_osd *osd = con->private; 2882 int type = le16_to_cpu(hdr->type); 2883 int front = le32_to_cpu(hdr->front_len); 2884 2885 *skip = 0; 2886 switch (type) { 2887 case CEPH_MSG_OSD_MAP: 2888 case CEPH_MSG_WATCH_NOTIFY: 2889 return ceph_msg_new(type, front, GFP_NOFS, false); 2890 case CEPH_MSG_OSD_OPREPLY: 2891 return get_reply(con, hdr, skip); 2892 default: 2893 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2894 osd->o_osd); 2895 *skip = 1; 2896 return NULL; 2897 } 2898 } 2899 2900 /* 2901 * Wrappers to refcount containing ceph_osd struct 2902 */ 2903 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2904 { 2905 struct ceph_osd *osd = con->private; 2906 if (get_osd(osd)) 2907 return con; 2908 return NULL; 2909 } 2910 2911 static void put_osd_con(struct ceph_connection *con) 2912 { 2913 struct ceph_osd *osd = con->private; 2914 put_osd(osd); 2915 } 2916 2917 /* 2918 * authentication 2919 */ 2920 /* 2921 * Note: returned pointer is the address of a structure that's 2922 * managed separately. Caller must *not* attempt to free it. 2923 */ 2924 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2925 int *proto, int force_new) 2926 { 2927 struct ceph_osd *o = con->private; 2928 struct ceph_osd_client *osdc = o->o_osdc; 2929 struct ceph_auth_client *ac = osdc->client->monc.auth; 2930 struct ceph_auth_handshake *auth = &o->o_auth; 2931 2932 if (force_new && auth->authorizer) { 2933 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2934 auth->authorizer = NULL; 2935 } 2936 if (!auth->authorizer) { 2937 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2938 auth); 2939 if (ret) 2940 return ERR_PTR(ret); 2941 } else { 2942 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2943 auth); 2944 if (ret) 2945 return ERR_PTR(ret); 2946 } 2947 *proto = ac->protocol; 2948 2949 return auth; 2950 } 2951 2952 2953 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2954 { 2955 struct ceph_osd *o = con->private; 2956 struct ceph_osd_client *osdc = o->o_osdc; 2957 struct ceph_auth_client *ac = osdc->client->monc.auth; 2958 2959 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2960 } 2961 2962 static int invalidate_authorizer(struct ceph_connection *con) 2963 { 2964 struct ceph_osd *o = con->private; 2965 struct ceph_osd_client *osdc = o->o_osdc; 2966 struct ceph_auth_client *ac = osdc->client->monc.auth; 2967 2968 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2969 return ceph_monc_validate_auth(&osdc->client->monc); 2970 } 2971 2972 static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) 2973 { 2974 struct ceph_osd *o = con->private; 2975 struct ceph_auth_handshake *auth = &o->o_auth; 2976 return ceph_auth_sign_message(auth, msg); 2977 } 2978 2979 static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) 2980 { 2981 struct ceph_osd *o = con->private; 2982 struct ceph_auth_handshake *auth = &o->o_auth; 2983 return ceph_auth_check_message_signature(auth, msg); 2984 } 2985 2986 static const struct ceph_connection_operations osd_con_ops = { 2987 .get = get_osd_con, 2988 .put = put_osd_con, 2989 .dispatch = dispatch, 2990 .get_authorizer = get_authorizer, 2991 .verify_authorizer_reply = verify_authorizer_reply, 2992 .invalidate_authorizer = invalidate_authorizer, 2993 .alloc_msg = alloc_msg, 2994 .sign_message = sign_message, 2995 .check_message_signature = check_message_signature, 2996 .fault = osd_reset, 2997 }; 2998