1 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/err.h> 6 #include <linux/highmem.h> 7 #include <linux/mm.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/uaccess.h> 11 #ifdef CONFIG_BLOCK 12 #include <linux/bio.h> 13 #endif 14 15 #include <linux/ceph/libceph.h> 16 #include <linux/ceph/osd_client.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/auth.h> 20 #include <linux/ceph/pagelist.h> 21 22 #define OSD_OP_FRONT_LEN 4096 23 #define OSD_OPREPLY_FRONT_LEN 512 24 25 static struct kmem_cache *ceph_osd_request_cache; 26 27 static const struct ceph_connection_operations osd_con_ops; 28 29 static void __send_queued(struct ceph_osd_client *osdc); 30 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd); 31 static void __register_request(struct ceph_osd_client *osdc, 32 struct ceph_osd_request *req); 33 static void __unregister_linger_request(struct ceph_osd_client *osdc, 34 struct ceph_osd_request *req); 35 static void __send_request(struct ceph_osd_client *osdc, 36 struct ceph_osd_request *req); 37 38 /* 39 * Implement client access to distributed object storage cluster. 40 * 41 * All data objects are stored within a cluster/cloud of OSDs, or 42 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 43 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 44 * remote daemons serving up and coordinating consistent and safe 45 * access to storage. 46 * 47 * Cluster membership and the mapping of data objects onto storage devices 48 * are described by the osd map. 49 * 50 * We keep track of pending OSD requests (read, write), resubmit 51 * requests to different OSDs when the cluster topology/data layout 52 * change, or retry the affected requests when the communications 53 * channel with an OSD is reset. 54 */ 55 56 /* 57 * calculate the mapping of a file extent onto an object, and fill out the 58 * request accordingly. shorten extent as necessary if it crosses an 59 * object boundary. 60 * 61 * fill osd op in request message. 62 */ 63 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 64 u64 *objnum, u64 *objoff, u64 *objlen) 65 { 66 u64 orig_len = *plen; 67 int r; 68 69 /* object extent? */ 70 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 71 objoff, objlen); 72 if (r < 0) 73 return r; 74 if (*objlen < orig_len) { 75 *plen = *objlen; 76 dout(" skipping last %llu, final file extent %llu~%llu\n", 77 orig_len - *plen, off, *plen); 78 } 79 80 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 81 82 return 0; 83 } 84 85 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 86 { 87 memset(osd_data, 0, sizeof (*osd_data)); 88 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 89 } 90 91 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 92 struct page **pages, u64 length, u32 alignment, 93 bool pages_from_pool, bool own_pages) 94 { 95 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 96 osd_data->pages = pages; 97 osd_data->length = length; 98 osd_data->alignment = alignment; 99 osd_data->pages_from_pool = pages_from_pool; 100 osd_data->own_pages = own_pages; 101 } 102 103 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 104 struct ceph_pagelist *pagelist) 105 { 106 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 107 osd_data->pagelist = pagelist; 108 } 109 110 #ifdef CONFIG_BLOCK 111 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 112 struct bio *bio, size_t bio_length) 113 { 114 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 115 osd_data->bio = bio; 116 osd_data->bio_length = bio_length; 117 } 118 #endif /* CONFIG_BLOCK */ 119 120 #define osd_req_op_data(oreq, whch, typ, fld) \ 121 ({ \ 122 BUG_ON(whch >= (oreq)->r_num_ops); \ 123 &(oreq)->r_ops[whch].typ.fld; \ 124 }) 125 126 static struct ceph_osd_data * 127 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 128 { 129 BUG_ON(which >= osd_req->r_num_ops); 130 131 return &osd_req->r_ops[which].raw_data_in; 132 } 133 134 struct ceph_osd_data * 135 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 136 unsigned int which) 137 { 138 return osd_req_op_data(osd_req, which, extent, osd_data); 139 } 140 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 141 142 struct ceph_osd_data * 143 osd_req_op_cls_response_data(struct ceph_osd_request *osd_req, 144 unsigned int which) 145 { 146 return osd_req_op_data(osd_req, which, cls, response_data); 147 } 148 EXPORT_SYMBOL(osd_req_op_cls_response_data); /* ??? */ 149 150 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 151 unsigned int which, struct page **pages, 152 u64 length, u32 alignment, 153 bool pages_from_pool, bool own_pages) 154 { 155 struct ceph_osd_data *osd_data; 156 157 osd_data = osd_req_op_raw_data_in(osd_req, which); 158 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 159 pages_from_pool, own_pages); 160 } 161 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 162 163 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 164 unsigned int which, struct page **pages, 165 u64 length, u32 alignment, 166 bool pages_from_pool, bool own_pages) 167 { 168 struct ceph_osd_data *osd_data; 169 170 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 171 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 172 pages_from_pool, own_pages); 173 } 174 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 175 176 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 177 unsigned int which, struct ceph_pagelist *pagelist) 178 { 179 struct ceph_osd_data *osd_data; 180 181 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 182 ceph_osd_data_pagelist_init(osd_data, pagelist); 183 } 184 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 185 186 #ifdef CONFIG_BLOCK 187 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 188 unsigned int which, struct bio *bio, size_t bio_length) 189 { 190 struct ceph_osd_data *osd_data; 191 192 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 193 ceph_osd_data_bio_init(osd_data, bio, bio_length); 194 } 195 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 196 #endif /* CONFIG_BLOCK */ 197 198 static void osd_req_op_cls_request_info_pagelist( 199 struct ceph_osd_request *osd_req, 200 unsigned int which, struct ceph_pagelist *pagelist) 201 { 202 struct ceph_osd_data *osd_data; 203 204 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 205 ceph_osd_data_pagelist_init(osd_data, pagelist); 206 } 207 208 void osd_req_op_cls_request_data_pagelist( 209 struct ceph_osd_request *osd_req, 210 unsigned int which, struct ceph_pagelist *pagelist) 211 { 212 struct ceph_osd_data *osd_data; 213 214 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 215 ceph_osd_data_pagelist_init(osd_data, pagelist); 216 } 217 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 218 219 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 220 unsigned int which, struct page **pages, u64 length, 221 u32 alignment, bool pages_from_pool, bool own_pages) 222 { 223 struct ceph_osd_data *osd_data; 224 225 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 226 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 227 pages_from_pool, own_pages); 228 } 229 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 230 231 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 232 unsigned int which, struct page **pages, u64 length, 233 u32 alignment, bool pages_from_pool, bool own_pages) 234 { 235 struct ceph_osd_data *osd_data; 236 237 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 238 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 239 pages_from_pool, own_pages); 240 } 241 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 242 243 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 244 { 245 switch (osd_data->type) { 246 case CEPH_OSD_DATA_TYPE_NONE: 247 return 0; 248 case CEPH_OSD_DATA_TYPE_PAGES: 249 return osd_data->length; 250 case CEPH_OSD_DATA_TYPE_PAGELIST: 251 return (u64)osd_data->pagelist->length; 252 #ifdef CONFIG_BLOCK 253 case CEPH_OSD_DATA_TYPE_BIO: 254 return (u64)osd_data->bio_length; 255 #endif /* CONFIG_BLOCK */ 256 default: 257 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 258 return 0; 259 } 260 } 261 262 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 263 { 264 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 265 int num_pages; 266 267 num_pages = calc_pages_for((u64)osd_data->alignment, 268 (u64)osd_data->length); 269 ceph_release_page_vector(osd_data->pages, num_pages); 270 } 271 ceph_osd_data_init(osd_data); 272 } 273 274 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 275 unsigned int which) 276 { 277 struct ceph_osd_req_op *op; 278 279 BUG_ON(which >= osd_req->r_num_ops); 280 op = &osd_req->r_ops[which]; 281 282 switch (op->op) { 283 case CEPH_OSD_OP_READ: 284 case CEPH_OSD_OP_WRITE: 285 ceph_osd_data_release(&op->extent.osd_data); 286 break; 287 case CEPH_OSD_OP_CALL: 288 ceph_osd_data_release(&op->cls.request_info); 289 ceph_osd_data_release(&op->cls.request_data); 290 ceph_osd_data_release(&op->cls.response_data); 291 break; 292 default: 293 break; 294 } 295 } 296 297 /* 298 * requests 299 */ 300 void ceph_osdc_release_request(struct kref *kref) 301 { 302 struct ceph_osd_request *req; 303 unsigned int which; 304 305 req = container_of(kref, struct ceph_osd_request, r_kref); 306 if (req->r_request) 307 ceph_msg_put(req->r_request); 308 if (req->r_reply) { 309 ceph_msg_revoke_incoming(req->r_reply); 310 ceph_msg_put(req->r_reply); 311 } 312 313 for (which = 0; which < req->r_num_ops; which++) 314 osd_req_op_data_release(req, which); 315 316 ceph_put_snap_context(req->r_snapc); 317 if (req->r_mempool) 318 mempool_free(req, req->r_osdc->req_mempool); 319 else 320 kmem_cache_free(ceph_osd_request_cache, req); 321 322 } 323 EXPORT_SYMBOL(ceph_osdc_release_request); 324 325 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 326 struct ceph_snap_context *snapc, 327 unsigned int num_ops, 328 bool use_mempool, 329 gfp_t gfp_flags) 330 { 331 struct ceph_osd_request *req; 332 struct ceph_msg *msg; 333 size_t msg_size; 334 335 BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX); 336 BUG_ON(num_ops > CEPH_OSD_MAX_OP); 337 338 msg_size = 4 + 4 + 8 + 8 + 4+8; 339 msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */ 340 msg_size += 1 + 8 + 4 + 4; /* pg_t */ 341 msg_size += 4 + CEPH_MAX_OID_NAME_LEN; /* oid */ 342 msg_size += 2 + num_ops*sizeof(struct ceph_osd_op); 343 msg_size += 8; /* snapid */ 344 msg_size += 8; /* snap_seq */ 345 msg_size += 8 * (snapc ? snapc->num_snaps : 0); /* snaps */ 346 msg_size += 4; 347 348 if (use_mempool) { 349 req = mempool_alloc(osdc->req_mempool, gfp_flags); 350 memset(req, 0, sizeof(*req)); 351 } else { 352 req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags); 353 } 354 if (req == NULL) 355 return NULL; 356 357 req->r_osdc = osdc; 358 req->r_mempool = use_mempool; 359 req->r_num_ops = num_ops; 360 361 kref_init(&req->r_kref); 362 init_completion(&req->r_completion); 363 init_completion(&req->r_safe_completion); 364 RB_CLEAR_NODE(&req->r_node); 365 INIT_LIST_HEAD(&req->r_unsafe_item); 366 INIT_LIST_HEAD(&req->r_linger_item); 367 INIT_LIST_HEAD(&req->r_linger_osd); 368 INIT_LIST_HEAD(&req->r_req_lru_item); 369 INIT_LIST_HEAD(&req->r_osd_item); 370 371 req->r_base_oloc.pool = -1; 372 req->r_target_oloc.pool = -1; 373 374 /* create reply message */ 375 if (use_mempool) 376 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 377 else 378 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, 379 OSD_OPREPLY_FRONT_LEN, gfp_flags, true); 380 if (!msg) { 381 ceph_osdc_put_request(req); 382 return NULL; 383 } 384 req->r_reply = msg; 385 386 /* create request message; allow space for oid */ 387 if (use_mempool) 388 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 389 else 390 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags, true); 391 if (!msg) { 392 ceph_osdc_put_request(req); 393 return NULL; 394 } 395 396 memset(msg->front.iov_base, 0, msg->front.iov_len); 397 398 req->r_request = msg; 399 400 return req; 401 } 402 EXPORT_SYMBOL(ceph_osdc_alloc_request); 403 404 static bool osd_req_opcode_valid(u16 opcode) 405 { 406 switch (opcode) { 407 case CEPH_OSD_OP_READ: 408 case CEPH_OSD_OP_STAT: 409 case CEPH_OSD_OP_MAPEXT: 410 case CEPH_OSD_OP_MASKTRUNC: 411 case CEPH_OSD_OP_SPARSE_READ: 412 case CEPH_OSD_OP_NOTIFY: 413 case CEPH_OSD_OP_NOTIFY_ACK: 414 case CEPH_OSD_OP_ASSERT_VER: 415 case CEPH_OSD_OP_WRITE: 416 case CEPH_OSD_OP_WRITEFULL: 417 case CEPH_OSD_OP_TRUNCATE: 418 case CEPH_OSD_OP_ZERO: 419 case CEPH_OSD_OP_DELETE: 420 case CEPH_OSD_OP_APPEND: 421 case CEPH_OSD_OP_STARTSYNC: 422 case CEPH_OSD_OP_SETTRUNC: 423 case CEPH_OSD_OP_TRIMTRUNC: 424 case CEPH_OSD_OP_TMAPUP: 425 case CEPH_OSD_OP_TMAPPUT: 426 case CEPH_OSD_OP_TMAPGET: 427 case CEPH_OSD_OP_CREATE: 428 case CEPH_OSD_OP_ROLLBACK: 429 case CEPH_OSD_OP_WATCH: 430 case CEPH_OSD_OP_OMAPGETKEYS: 431 case CEPH_OSD_OP_OMAPGETVALS: 432 case CEPH_OSD_OP_OMAPGETHEADER: 433 case CEPH_OSD_OP_OMAPGETVALSBYKEYS: 434 case CEPH_OSD_OP_OMAPSETVALS: 435 case CEPH_OSD_OP_OMAPSETHEADER: 436 case CEPH_OSD_OP_OMAPCLEAR: 437 case CEPH_OSD_OP_OMAPRMKEYS: 438 case CEPH_OSD_OP_OMAP_CMP: 439 case CEPH_OSD_OP_SETALLOCHINT: 440 case CEPH_OSD_OP_CLONERANGE: 441 case CEPH_OSD_OP_ASSERT_SRC_VERSION: 442 case CEPH_OSD_OP_SRC_CMPXATTR: 443 case CEPH_OSD_OP_GETXATTR: 444 case CEPH_OSD_OP_GETXATTRS: 445 case CEPH_OSD_OP_CMPXATTR: 446 case CEPH_OSD_OP_SETXATTR: 447 case CEPH_OSD_OP_SETXATTRS: 448 case CEPH_OSD_OP_RESETXATTRS: 449 case CEPH_OSD_OP_RMXATTR: 450 case CEPH_OSD_OP_PULL: 451 case CEPH_OSD_OP_PUSH: 452 case CEPH_OSD_OP_BALANCEREADS: 453 case CEPH_OSD_OP_UNBALANCEREADS: 454 case CEPH_OSD_OP_SCRUB: 455 case CEPH_OSD_OP_SCRUB_RESERVE: 456 case CEPH_OSD_OP_SCRUB_UNRESERVE: 457 case CEPH_OSD_OP_SCRUB_STOP: 458 case CEPH_OSD_OP_SCRUB_MAP: 459 case CEPH_OSD_OP_WRLOCK: 460 case CEPH_OSD_OP_WRUNLOCK: 461 case CEPH_OSD_OP_RDLOCK: 462 case CEPH_OSD_OP_RDUNLOCK: 463 case CEPH_OSD_OP_UPLOCK: 464 case CEPH_OSD_OP_DNLOCK: 465 case CEPH_OSD_OP_CALL: 466 case CEPH_OSD_OP_PGLS: 467 case CEPH_OSD_OP_PGLS_FILTER: 468 return true; 469 default: 470 return false; 471 } 472 } 473 474 /* 475 * This is an osd op init function for opcodes that have no data or 476 * other information associated with them. It also serves as a 477 * common init routine for all the other init functions, below. 478 */ 479 static struct ceph_osd_req_op * 480 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 481 u16 opcode) 482 { 483 struct ceph_osd_req_op *op; 484 485 BUG_ON(which >= osd_req->r_num_ops); 486 BUG_ON(!osd_req_opcode_valid(opcode)); 487 488 op = &osd_req->r_ops[which]; 489 memset(op, 0, sizeof (*op)); 490 op->op = opcode; 491 492 return op; 493 } 494 495 void osd_req_op_init(struct ceph_osd_request *osd_req, 496 unsigned int which, u16 opcode) 497 { 498 (void)_osd_req_op_init(osd_req, which, opcode); 499 } 500 EXPORT_SYMBOL(osd_req_op_init); 501 502 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 503 unsigned int which, u16 opcode, 504 u64 offset, u64 length, 505 u64 truncate_size, u32 truncate_seq) 506 { 507 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 508 size_t payload_len = 0; 509 510 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 511 opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO && 512 opcode != CEPH_OSD_OP_TRUNCATE); 513 514 op->extent.offset = offset; 515 op->extent.length = length; 516 op->extent.truncate_size = truncate_size; 517 op->extent.truncate_seq = truncate_seq; 518 if (opcode == CEPH_OSD_OP_WRITE) 519 payload_len += length; 520 521 op->payload_len = payload_len; 522 } 523 EXPORT_SYMBOL(osd_req_op_extent_init); 524 525 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 526 unsigned int which, u64 length) 527 { 528 struct ceph_osd_req_op *op; 529 u64 previous; 530 531 BUG_ON(which >= osd_req->r_num_ops); 532 op = &osd_req->r_ops[which]; 533 previous = op->extent.length; 534 535 if (length == previous) 536 return; /* Nothing to do */ 537 BUG_ON(length > previous); 538 539 op->extent.length = length; 540 op->payload_len -= previous - length; 541 } 542 EXPORT_SYMBOL(osd_req_op_extent_update); 543 544 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 545 u16 opcode, const char *class, const char *method) 546 { 547 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 548 struct ceph_pagelist *pagelist; 549 size_t payload_len = 0; 550 size_t size; 551 552 BUG_ON(opcode != CEPH_OSD_OP_CALL); 553 554 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 555 BUG_ON(!pagelist); 556 ceph_pagelist_init(pagelist); 557 558 op->cls.class_name = class; 559 size = strlen(class); 560 BUG_ON(size > (size_t) U8_MAX); 561 op->cls.class_len = size; 562 ceph_pagelist_append(pagelist, class, size); 563 payload_len += size; 564 565 op->cls.method_name = method; 566 size = strlen(method); 567 BUG_ON(size > (size_t) U8_MAX); 568 op->cls.method_len = size; 569 ceph_pagelist_append(pagelist, method, size); 570 payload_len += size; 571 572 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 573 574 op->cls.argc = 0; /* currently unused */ 575 576 op->payload_len = payload_len; 577 } 578 EXPORT_SYMBOL(osd_req_op_cls_init); 579 580 void osd_req_op_watch_init(struct ceph_osd_request *osd_req, 581 unsigned int which, u16 opcode, 582 u64 cookie, u64 version, int flag) 583 { 584 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); 585 586 BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); 587 588 op->watch.cookie = cookie; 589 op->watch.ver = version; 590 if (opcode == CEPH_OSD_OP_WATCH && flag) 591 op->watch.flag = (u8)1; 592 } 593 EXPORT_SYMBOL(osd_req_op_watch_init); 594 595 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 596 unsigned int which, 597 u64 expected_object_size, 598 u64 expected_write_size) 599 { 600 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 601 CEPH_OSD_OP_SETALLOCHINT); 602 603 op->alloc_hint.expected_object_size = expected_object_size; 604 op->alloc_hint.expected_write_size = expected_write_size; 605 606 /* 607 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 608 * not worth a feature bit. Set FAILOK per-op flag to make 609 * sure older osds don't trip over an unsupported opcode. 610 */ 611 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 612 } 613 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 614 615 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 616 struct ceph_osd_data *osd_data) 617 { 618 u64 length = ceph_osd_data_length(osd_data); 619 620 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 621 BUG_ON(length > (u64) SIZE_MAX); 622 if (length) 623 ceph_msg_data_add_pages(msg, osd_data->pages, 624 length, osd_data->alignment); 625 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 626 BUG_ON(!length); 627 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 628 #ifdef CONFIG_BLOCK 629 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 630 ceph_msg_data_add_bio(msg, osd_data->bio, length); 631 #endif 632 } else { 633 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 634 } 635 } 636 637 static u64 osd_req_encode_op(struct ceph_osd_request *req, 638 struct ceph_osd_op *dst, unsigned int which) 639 { 640 struct ceph_osd_req_op *src; 641 struct ceph_osd_data *osd_data; 642 u64 request_data_len = 0; 643 u64 data_length; 644 645 BUG_ON(which >= req->r_num_ops); 646 src = &req->r_ops[which]; 647 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 648 pr_err("unrecognized osd opcode %d\n", src->op); 649 650 return 0; 651 } 652 653 switch (src->op) { 654 case CEPH_OSD_OP_STAT: 655 osd_data = &src->raw_data_in; 656 ceph_osdc_msg_data_add(req->r_reply, osd_data); 657 break; 658 case CEPH_OSD_OP_READ: 659 case CEPH_OSD_OP_WRITE: 660 case CEPH_OSD_OP_ZERO: 661 case CEPH_OSD_OP_DELETE: 662 case CEPH_OSD_OP_TRUNCATE: 663 if (src->op == CEPH_OSD_OP_WRITE) 664 request_data_len = src->extent.length; 665 dst->extent.offset = cpu_to_le64(src->extent.offset); 666 dst->extent.length = cpu_to_le64(src->extent.length); 667 dst->extent.truncate_size = 668 cpu_to_le64(src->extent.truncate_size); 669 dst->extent.truncate_seq = 670 cpu_to_le32(src->extent.truncate_seq); 671 osd_data = &src->extent.osd_data; 672 if (src->op == CEPH_OSD_OP_WRITE) 673 ceph_osdc_msg_data_add(req->r_request, osd_data); 674 else 675 ceph_osdc_msg_data_add(req->r_reply, osd_data); 676 break; 677 case CEPH_OSD_OP_CALL: 678 dst->cls.class_len = src->cls.class_len; 679 dst->cls.method_len = src->cls.method_len; 680 osd_data = &src->cls.request_info; 681 ceph_osdc_msg_data_add(req->r_request, osd_data); 682 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGELIST); 683 request_data_len = osd_data->pagelist->length; 684 685 osd_data = &src->cls.request_data; 686 data_length = ceph_osd_data_length(osd_data); 687 if (data_length) { 688 BUG_ON(osd_data->type == CEPH_OSD_DATA_TYPE_NONE); 689 dst->cls.indata_len = cpu_to_le32(data_length); 690 ceph_osdc_msg_data_add(req->r_request, osd_data); 691 src->payload_len += data_length; 692 request_data_len += data_length; 693 } 694 osd_data = &src->cls.response_data; 695 ceph_osdc_msg_data_add(req->r_reply, osd_data); 696 break; 697 case CEPH_OSD_OP_STARTSYNC: 698 break; 699 case CEPH_OSD_OP_NOTIFY_ACK: 700 case CEPH_OSD_OP_WATCH: 701 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 702 dst->watch.ver = cpu_to_le64(src->watch.ver); 703 dst->watch.flag = src->watch.flag; 704 break; 705 case CEPH_OSD_OP_SETALLOCHINT: 706 dst->alloc_hint.expected_object_size = 707 cpu_to_le64(src->alloc_hint.expected_object_size); 708 dst->alloc_hint.expected_write_size = 709 cpu_to_le64(src->alloc_hint.expected_write_size); 710 break; 711 default: 712 pr_err("unsupported osd opcode %s\n", 713 ceph_osd_op_name(src->op)); 714 WARN_ON(1); 715 716 return 0; 717 } 718 719 dst->op = cpu_to_le16(src->op); 720 dst->flags = cpu_to_le32(src->flags); 721 dst->payload_len = cpu_to_le32(src->payload_len); 722 723 return request_data_len; 724 } 725 726 /* 727 * build new request AND message, calculate layout, and adjust file 728 * extent as needed. 729 * 730 * if the file was recently truncated, we include information about its 731 * old and new size so that the object can be updated appropriately. (we 732 * avoid synchronously deleting truncated objects because it's slow.) 733 * 734 * if @do_sync, include a 'startsync' command so that the osd will flush 735 * data quickly. 736 */ 737 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 738 struct ceph_file_layout *layout, 739 struct ceph_vino vino, 740 u64 off, u64 *plen, int num_ops, 741 int opcode, int flags, 742 struct ceph_snap_context *snapc, 743 u32 truncate_seq, 744 u64 truncate_size, 745 bool use_mempool) 746 { 747 struct ceph_osd_request *req; 748 u64 objnum = 0; 749 u64 objoff = 0; 750 u64 objlen = 0; 751 u32 object_size; 752 u64 object_base; 753 int r; 754 755 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 756 opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO && 757 opcode != CEPH_OSD_OP_TRUNCATE); 758 759 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 760 GFP_NOFS); 761 if (!req) 762 return ERR_PTR(-ENOMEM); 763 764 req->r_flags = flags; 765 766 /* calculate max write size */ 767 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 768 if (r < 0) { 769 ceph_osdc_put_request(req); 770 return ERR_PTR(r); 771 } 772 773 object_size = le32_to_cpu(layout->fl_object_size); 774 object_base = off - objoff; 775 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 776 if (truncate_size <= object_base) { 777 truncate_size = 0; 778 } else { 779 truncate_size -= object_base; 780 if (truncate_size > object_size) 781 truncate_size = object_size; 782 } 783 } 784 785 osd_req_op_extent_init(req, 0, opcode, objoff, objlen, 786 truncate_size, truncate_seq); 787 788 /* 789 * A second op in the ops array means the caller wants to 790 * also issue a include a 'startsync' command so that the 791 * osd will flush data quickly. 792 */ 793 if (num_ops > 1) 794 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); 795 796 req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); 797 798 snprintf(req->r_base_oid.name, sizeof(req->r_base_oid.name), 799 "%llx.%08llx", vino.ino, objnum); 800 req->r_base_oid.name_len = strlen(req->r_base_oid.name); 801 802 return req; 803 } 804 EXPORT_SYMBOL(ceph_osdc_new_request); 805 806 /* 807 * We keep osd requests in an rbtree, sorted by ->r_tid. 808 */ 809 static void __insert_request(struct ceph_osd_client *osdc, 810 struct ceph_osd_request *new) 811 { 812 struct rb_node **p = &osdc->requests.rb_node; 813 struct rb_node *parent = NULL; 814 struct ceph_osd_request *req = NULL; 815 816 while (*p) { 817 parent = *p; 818 req = rb_entry(parent, struct ceph_osd_request, r_node); 819 if (new->r_tid < req->r_tid) 820 p = &(*p)->rb_left; 821 else if (new->r_tid > req->r_tid) 822 p = &(*p)->rb_right; 823 else 824 BUG(); 825 } 826 827 rb_link_node(&new->r_node, parent, p); 828 rb_insert_color(&new->r_node, &osdc->requests); 829 } 830 831 static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc, 832 u64 tid) 833 { 834 struct ceph_osd_request *req; 835 struct rb_node *n = osdc->requests.rb_node; 836 837 while (n) { 838 req = rb_entry(n, struct ceph_osd_request, r_node); 839 if (tid < req->r_tid) 840 n = n->rb_left; 841 else if (tid > req->r_tid) 842 n = n->rb_right; 843 else 844 return req; 845 } 846 return NULL; 847 } 848 849 static struct ceph_osd_request * 850 __lookup_request_ge(struct ceph_osd_client *osdc, 851 u64 tid) 852 { 853 struct ceph_osd_request *req; 854 struct rb_node *n = osdc->requests.rb_node; 855 856 while (n) { 857 req = rb_entry(n, struct ceph_osd_request, r_node); 858 if (tid < req->r_tid) { 859 if (!n->rb_left) 860 return req; 861 n = n->rb_left; 862 } else if (tid > req->r_tid) { 863 n = n->rb_right; 864 } else { 865 return req; 866 } 867 } 868 return NULL; 869 } 870 871 /* 872 * Resubmit requests pending on the given osd. 873 */ 874 static void __kick_osd_requests(struct ceph_osd_client *osdc, 875 struct ceph_osd *osd) 876 { 877 struct ceph_osd_request *req, *nreq; 878 LIST_HEAD(resend); 879 int err; 880 881 dout("__kick_osd_requests osd%d\n", osd->o_osd); 882 err = __reset_osd(osdc, osd); 883 if (err) 884 return; 885 /* 886 * Build up a list of requests to resend by traversing the 887 * osd's list of requests. Requests for a given object are 888 * sent in tid order, and that is also the order they're 889 * kept on this list. Therefore all requests that are in 890 * flight will be found first, followed by all requests that 891 * have not yet been sent. And to resend requests while 892 * preserving this order we will want to put any sent 893 * requests back on the front of the osd client's unsent 894 * list. 895 * 896 * So we build a separate ordered list of already-sent 897 * requests for the affected osd and splice it onto the 898 * front of the osd client's unsent list. Once we've seen a 899 * request that has not yet been sent we're done. Those 900 * requests are already sitting right where they belong. 901 */ 902 list_for_each_entry(req, &osd->o_requests, r_osd_item) { 903 if (!req->r_sent) 904 break; 905 list_move_tail(&req->r_req_lru_item, &resend); 906 dout("requeueing %p tid %llu osd%d\n", req, req->r_tid, 907 osd->o_osd); 908 if (!req->r_linger) 909 req->r_flags |= CEPH_OSD_FLAG_RETRY; 910 } 911 list_splice(&resend, &osdc->req_unsent); 912 913 /* 914 * Linger requests are re-registered before sending, which 915 * sets up a new tid for each. We add them to the unsent 916 * list at the end to keep things in tid order. 917 */ 918 list_for_each_entry_safe(req, nreq, &osd->o_linger_requests, 919 r_linger_osd) { 920 /* 921 * reregister request prior to unregistering linger so 922 * that r_osd is preserved. 923 */ 924 BUG_ON(!list_empty(&req->r_req_lru_item)); 925 __register_request(osdc, req); 926 list_add_tail(&req->r_req_lru_item, &osdc->req_unsent); 927 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 928 __unregister_linger_request(osdc, req); 929 dout("requeued lingering %p tid %llu osd%d\n", req, req->r_tid, 930 osd->o_osd); 931 } 932 } 933 934 /* 935 * If the osd connection drops, we need to resubmit all requests. 936 */ 937 static void osd_reset(struct ceph_connection *con) 938 { 939 struct ceph_osd *osd = con->private; 940 struct ceph_osd_client *osdc; 941 942 if (!osd) 943 return; 944 dout("osd_reset osd%d\n", osd->o_osd); 945 osdc = osd->o_osdc; 946 down_read(&osdc->map_sem); 947 mutex_lock(&osdc->request_mutex); 948 __kick_osd_requests(osdc, osd); 949 __send_queued(osdc); 950 mutex_unlock(&osdc->request_mutex); 951 up_read(&osdc->map_sem); 952 } 953 954 /* 955 * Track open sessions with osds. 956 */ 957 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 958 { 959 struct ceph_osd *osd; 960 961 osd = kzalloc(sizeof(*osd), GFP_NOFS); 962 if (!osd) 963 return NULL; 964 965 atomic_set(&osd->o_ref, 1); 966 osd->o_osdc = osdc; 967 osd->o_osd = onum; 968 RB_CLEAR_NODE(&osd->o_node); 969 INIT_LIST_HEAD(&osd->o_requests); 970 INIT_LIST_HEAD(&osd->o_linger_requests); 971 INIT_LIST_HEAD(&osd->o_osd_lru); 972 osd->o_incarnation = 1; 973 974 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 975 976 INIT_LIST_HEAD(&osd->o_keepalive_item); 977 return osd; 978 } 979 980 static struct ceph_osd *get_osd(struct ceph_osd *osd) 981 { 982 if (atomic_inc_not_zero(&osd->o_ref)) { 983 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1, 984 atomic_read(&osd->o_ref)); 985 return osd; 986 } else { 987 dout("get_osd %p FAIL\n", osd); 988 return NULL; 989 } 990 } 991 992 static void put_osd(struct ceph_osd *osd) 993 { 994 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref), 995 atomic_read(&osd->o_ref) - 1); 996 if (atomic_dec_and_test(&osd->o_ref) && osd->o_auth.authorizer) { 997 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth; 998 999 ceph_auth_destroy_authorizer(ac, osd->o_auth.authorizer); 1000 kfree(osd); 1001 } 1002 } 1003 1004 /* 1005 * remove an osd from our map 1006 */ 1007 static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1008 { 1009 dout("__remove_osd %p\n", osd); 1010 BUG_ON(!list_empty(&osd->o_requests)); 1011 rb_erase(&osd->o_node, &osdc->osds); 1012 list_del_init(&osd->o_osd_lru); 1013 ceph_con_close(&osd->o_con); 1014 put_osd(osd); 1015 } 1016 1017 static void remove_all_osds(struct ceph_osd_client *osdc) 1018 { 1019 dout("%s %p\n", __func__, osdc); 1020 mutex_lock(&osdc->request_mutex); 1021 while (!RB_EMPTY_ROOT(&osdc->osds)) { 1022 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 1023 struct ceph_osd, o_node); 1024 __remove_osd(osdc, osd); 1025 } 1026 mutex_unlock(&osdc->request_mutex); 1027 } 1028 1029 static void __move_osd_to_lru(struct ceph_osd_client *osdc, 1030 struct ceph_osd *osd) 1031 { 1032 dout("__move_osd_to_lru %p\n", osd); 1033 BUG_ON(!list_empty(&osd->o_osd_lru)); 1034 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1035 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; 1036 } 1037 1038 static void __remove_osd_from_lru(struct ceph_osd *osd) 1039 { 1040 dout("__remove_osd_from_lru %p\n", osd); 1041 if (!list_empty(&osd->o_osd_lru)) 1042 list_del_init(&osd->o_osd_lru); 1043 } 1044 1045 static void remove_old_osds(struct ceph_osd_client *osdc) 1046 { 1047 struct ceph_osd *osd, *nosd; 1048 1049 dout("__remove_old_osds %p\n", osdc); 1050 mutex_lock(&osdc->request_mutex); 1051 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 1052 if (time_before(jiffies, osd->lru_ttl)) 1053 break; 1054 __remove_osd(osdc, osd); 1055 } 1056 mutex_unlock(&osdc->request_mutex); 1057 } 1058 1059 /* 1060 * reset osd connect 1061 */ 1062 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd) 1063 { 1064 struct ceph_entity_addr *peer_addr; 1065 1066 dout("__reset_osd %p osd%d\n", osd, osd->o_osd); 1067 if (list_empty(&osd->o_requests) && 1068 list_empty(&osd->o_linger_requests)) { 1069 __remove_osd(osdc, osd); 1070 1071 return -ENODEV; 1072 } 1073 1074 peer_addr = &osdc->osdmap->osd_addr[osd->o_osd]; 1075 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1076 !ceph_con_opened(&osd->o_con)) { 1077 struct ceph_osd_request *req; 1078 1079 dout("osd addr hasn't changed and connection never opened, " 1080 "letting msgr retry\n"); 1081 /* touch each r_stamp for handle_timeout()'s benfit */ 1082 list_for_each_entry(req, &osd->o_requests, r_osd_item) 1083 req->r_stamp = jiffies; 1084 1085 return -EAGAIN; 1086 } 1087 1088 ceph_con_close(&osd->o_con); 1089 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1090 osd->o_incarnation++; 1091 1092 return 0; 1093 } 1094 1095 static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new) 1096 { 1097 struct rb_node **p = &osdc->osds.rb_node; 1098 struct rb_node *parent = NULL; 1099 struct ceph_osd *osd = NULL; 1100 1101 dout("__insert_osd %p osd%d\n", new, new->o_osd); 1102 while (*p) { 1103 parent = *p; 1104 osd = rb_entry(parent, struct ceph_osd, o_node); 1105 if (new->o_osd < osd->o_osd) 1106 p = &(*p)->rb_left; 1107 else if (new->o_osd > osd->o_osd) 1108 p = &(*p)->rb_right; 1109 else 1110 BUG(); 1111 } 1112 1113 rb_link_node(&new->o_node, parent, p); 1114 rb_insert_color(&new->o_node, &osdc->osds); 1115 } 1116 1117 static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) 1118 { 1119 struct ceph_osd *osd; 1120 struct rb_node *n = osdc->osds.rb_node; 1121 1122 while (n) { 1123 osd = rb_entry(n, struct ceph_osd, o_node); 1124 if (o < osd->o_osd) 1125 n = n->rb_left; 1126 else if (o > osd->o_osd) 1127 n = n->rb_right; 1128 else 1129 return osd; 1130 } 1131 return NULL; 1132 } 1133 1134 static void __schedule_osd_timeout(struct ceph_osd_client *osdc) 1135 { 1136 schedule_delayed_work(&osdc->timeout_work, 1137 osdc->client->options->osd_keepalive_timeout * HZ); 1138 } 1139 1140 static void __cancel_osd_timeout(struct ceph_osd_client *osdc) 1141 { 1142 cancel_delayed_work(&osdc->timeout_work); 1143 } 1144 1145 /* 1146 * Register request, assign tid. If this is the first request, set up 1147 * the timeout event. 1148 */ 1149 static void __register_request(struct ceph_osd_client *osdc, 1150 struct ceph_osd_request *req) 1151 { 1152 req->r_tid = ++osdc->last_tid; 1153 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 1154 dout("__register_request %p tid %lld\n", req, req->r_tid); 1155 __insert_request(osdc, req); 1156 ceph_osdc_get_request(req); 1157 osdc->num_requests++; 1158 if (osdc->num_requests == 1) { 1159 dout(" first request, scheduling timeout\n"); 1160 __schedule_osd_timeout(osdc); 1161 } 1162 } 1163 1164 /* 1165 * called under osdc->request_mutex 1166 */ 1167 static void __unregister_request(struct ceph_osd_client *osdc, 1168 struct ceph_osd_request *req) 1169 { 1170 if (RB_EMPTY_NODE(&req->r_node)) { 1171 dout("__unregister_request %p tid %lld not registered\n", 1172 req, req->r_tid); 1173 return; 1174 } 1175 1176 dout("__unregister_request %p tid %lld\n", req, req->r_tid); 1177 rb_erase(&req->r_node, &osdc->requests); 1178 osdc->num_requests--; 1179 1180 if (req->r_osd) { 1181 /* make sure the original request isn't in flight. */ 1182 ceph_msg_revoke(req->r_request); 1183 1184 list_del_init(&req->r_osd_item); 1185 if (list_empty(&req->r_osd->o_requests) && 1186 list_empty(&req->r_osd->o_linger_requests)) { 1187 dout("moving osd to %p lru\n", req->r_osd); 1188 __move_osd_to_lru(osdc, req->r_osd); 1189 } 1190 if (list_empty(&req->r_linger_item)) 1191 req->r_osd = NULL; 1192 } 1193 1194 list_del_init(&req->r_req_lru_item); 1195 ceph_osdc_put_request(req); 1196 1197 if (osdc->num_requests == 0) { 1198 dout(" no requests, canceling timeout\n"); 1199 __cancel_osd_timeout(osdc); 1200 } 1201 } 1202 1203 /* 1204 * Cancel a previously queued request message 1205 */ 1206 static void __cancel_request(struct ceph_osd_request *req) 1207 { 1208 if (req->r_sent && req->r_osd) { 1209 ceph_msg_revoke(req->r_request); 1210 req->r_sent = 0; 1211 } 1212 } 1213 1214 static void __register_linger_request(struct ceph_osd_client *osdc, 1215 struct ceph_osd_request *req) 1216 { 1217 dout("__register_linger_request %p\n", req); 1218 ceph_osdc_get_request(req); 1219 list_add_tail(&req->r_linger_item, &osdc->req_linger); 1220 if (req->r_osd) 1221 list_add_tail(&req->r_linger_osd, 1222 &req->r_osd->o_linger_requests); 1223 } 1224 1225 static void __unregister_linger_request(struct ceph_osd_client *osdc, 1226 struct ceph_osd_request *req) 1227 { 1228 dout("__unregister_linger_request %p\n", req); 1229 list_del_init(&req->r_linger_item); 1230 if (req->r_osd) { 1231 list_del_init(&req->r_linger_osd); 1232 1233 if (list_empty(&req->r_osd->o_requests) && 1234 list_empty(&req->r_osd->o_linger_requests)) { 1235 dout("moving osd to %p lru\n", req->r_osd); 1236 __move_osd_to_lru(osdc, req->r_osd); 1237 } 1238 if (list_empty(&req->r_osd_item)) 1239 req->r_osd = NULL; 1240 } 1241 ceph_osdc_put_request(req); 1242 } 1243 1244 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc, 1245 struct ceph_osd_request *req) 1246 { 1247 mutex_lock(&osdc->request_mutex); 1248 if (req->r_linger) { 1249 req->r_linger = 0; 1250 __unregister_linger_request(osdc, req); 1251 } 1252 mutex_unlock(&osdc->request_mutex); 1253 } 1254 EXPORT_SYMBOL(ceph_osdc_unregister_linger_request); 1255 1256 void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc, 1257 struct ceph_osd_request *req) 1258 { 1259 if (!req->r_linger) { 1260 dout("set_request_linger %p\n", req); 1261 req->r_linger = 1; 1262 } 1263 } 1264 EXPORT_SYMBOL(ceph_osdc_set_request_linger); 1265 1266 /* 1267 * Returns whether a request should be blocked from being sent 1268 * based on the current osdmap and osd_client settings. 1269 * 1270 * Caller should hold map_sem for read. 1271 */ 1272 static bool __req_should_be_paused(struct ceph_osd_client *osdc, 1273 struct ceph_osd_request *req) 1274 { 1275 bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD); 1276 bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) || 1277 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 1278 return (req->r_flags & CEPH_OSD_FLAG_READ && pauserd) || 1279 (req->r_flags & CEPH_OSD_FLAG_WRITE && pausewr); 1280 } 1281 1282 /* 1283 * Calculate mapping of a request to a PG. Takes tiering into account. 1284 */ 1285 static int __calc_request_pg(struct ceph_osdmap *osdmap, 1286 struct ceph_osd_request *req, 1287 struct ceph_pg *pg_out) 1288 { 1289 bool need_check_tiering; 1290 1291 need_check_tiering = false; 1292 if (req->r_target_oloc.pool == -1) { 1293 req->r_target_oloc = req->r_base_oloc; /* struct */ 1294 need_check_tiering = true; 1295 } 1296 if (req->r_target_oid.name_len == 0) { 1297 ceph_oid_copy(&req->r_target_oid, &req->r_base_oid); 1298 need_check_tiering = true; 1299 } 1300 1301 if (need_check_tiering && 1302 (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1303 struct ceph_pg_pool_info *pi; 1304 1305 pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool); 1306 if (pi) { 1307 if ((req->r_flags & CEPH_OSD_FLAG_READ) && 1308 pi->read_tier >= 0) 1309 req->r_target_oloc.pool = pi->read_tier; 1310 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 1311 pi->write_tier >= 0) 1312 req->r_target_oloc.pool = pi->write_tier; 1313 } 1314 /* !pi is caught in ceph_oloc_oid_to_pg() */ 1315 } 1316 1317 return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc, 1318 &req->r_target_oid, pg_out); 1319 } 1320 1321 /* 1322 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct 1323 * (as needed), and set the request r_osd appropriately. If there is 1324 * no up osd, set r_osd to NULL. Move the request to the appropriate list 1325 * (unsent, homeless) or leave on in-flight lru. 1326 * 1327 * Return 0 if unchanged, 1 if changed, or negative on error. 1328 * 1329 * Caller should hold map_sem for read and request_mutex. 1330 */ 1331 static int __map_request(struct ceph_osd_client *osdc, 1332 struct ceph_osd_request *req, int force_resend) 1333 { 1334 struct ceph_pg pgid; 1335 int acting[CEPH_PG_MAX_SIZE]; 1336 int num, o; 1337 int err; 1338 bool was_paused; 1339 1340 dout("map_request %p tid %lld\n", req, req->r_tid); 1341 1342 err = __calc_request_pg(osdc->osdmap, req, &pgid); 1343 if (err) { 1344 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1345 return err; 1346 } 1347 req->r_pgid = pgid; 1348 1349 num = ceph_calc_pg_acting(osdc->osdmap, pgid, acting, &o); 1350 if (num < 0) 1351 num = 0; 1352 1353 was_paused = req->r_paused; 1354 req->r_paused = __req_should_be_paused(osdc, req); 1355 if (was_paused && !req->r_paused) 1356 force_resend = 1; 1357 1358 if ((!force_resend && 1359 req->r_osd && req->r_osd->o_osd == o && 1360 req->r_sent >= req->r_osd->o_incarnation && 1361 req->r_num_pg_osds == num && 1362 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) || 1363 (req->r_osd == NULL && o == -1) || 1364 req->r_paused) 1365 return 0; /* no change */ 1366 1367 dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n", 1368 req->r_tid, pgid.pool, pgid.seed, o, 1369 req->r_osd ? req->r_osd->o_osd : -1); 1370 1371 /* record full pg acting set */ 1372 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num); 1373 req->r_num_pg_osds = num; 1374 1375 if (req->r_osd) { 1376 __cancel_request(req); 1377 list_del_init(&req->r_osd_item); 1378 req->r_osd = NULL; 1379 } 1380 1381 req->r_osd = __lookup_osd(osdc, o); 1382 if (!req->r_osd && o >= 0) { 1383 err = -ENOMEM; 1384 req->r_osd = create_osd(osdc, o); 1385 if (!req->r_osd) { 1386 list_move(&req->r_req_lru_item, &osdc->req_notarget); 1387 goto out; 1388 } 1389 1390 dout("map_request osd %p is osd%d\n", req->r_osd, o); 1391 __insert_osd(osdc, req->r_osd); 1392 1393 ceph_con_open(&req->r_osd->o_con, 1394 CEPH_ENTITY_TYPE_OSD, o, 1395 &osdc->osdmap->osd_addr[o]); 1396 } 1397 1398 if (req->r_osd) { 1399 __remove_osd_from_lru(req->r_osd); 1400 list_add_tail(&req->r_osd_item, &req->r_osd->o_requests); 1401 list_move_tail(&req->r_req_lru_item, &osdc->req_unsent); 1402 } else { 1403 list_move_tail(&req->r_req_lru_item, &osdc->req_notarget); 1404 } 1405 err = 1; /* osd or pg changed */ 1406 1407 out: 1408 return err; 1409 } 1410 1411 /* 1412 * caller should hold map_sem (for read) and request_mutex 1413 */ 1414 static void __send_request(struct ceph_osd_client *osdc, 1415 struct ceph_osd_request *req) 1416 { 1417 void *p; 1418 1419 dout("send_request %p tid %llu to osd%d flags %d pg %lld.%x\n", 1420 req, req->r_tid, req->r_osd->o_osd, req->r_flags, 1421 (unsigned long long)req->r_pgid.pool, req->r_pgid.seed); 1422 1423 /* fill in message content that changes each time we send it */ 1424 put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch); 1425 put_unaligned_le32(req->r_flags, req->r_request_flags); 1426 put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool); 1427 p = req->r_request_pgid; 1428 ceph_encode_64(&p, req->r_pgid.pool); 1429 ceph_encode_32(&p, req->r_pgid.seed); 1430 put_unaligned_le64(1, req->r_request_attempts); /* FIXME */ 1431 memcpy(req->r_request_reassert_version, &req->r_reassert_version, 1432 sizeof(req->r_reassert_version)); 1433 1434 req->r_stamp = jiffies; 1435 list_move_tail(&req->r_req_lru_item, &osdc->req_lru); 1436 1437 ceph_msg_get(req->r_request); /* send consumes a ref */ 1438 1439 req->r_sent = req->r_osd->o_incarnation; 1440 1441 ceph_con_send(&req->r_osd->o_con, req->r_request); 1442 } 1443 1444 /* 1445 * Send any requests in the queue (req_unsent). 1446 */ 1447 static void __send_queued(struct ceph_osd_client *osdc) 1448 { 1449 struct ceph_osd_request *req, *tmp; 1450 1451 dout("__send_queued\n"); 1452 list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) 1453 __send_request(osdc, req); 1454 } 1455 1456 /* 1457 * Caller should hold map_sem for read and request_mutex. 1458 */ 1459 static int __ceph_osdc_start_request(struct ceph_osd_client *osdc, 1460 struct ceph_osd_request *req, 1461 bool nofail) 1462 { 1463 int rc; 1464 1465 __register_request(osdc, req); 1466 req->r_sent = 0; 1467 req->r_got_reply = 0; 1468 rc = __map_request(osdc, req, 0); 1469 if (rc < 0) { 1470 if (nofail) { 1471 dout("osdc_start_request failed map, " 1472 " will retry %lld\n", req->r_tid); 1473 rc = 0; 1474 } else { 1475 __unregister_request(osdc, req); 1476 } 1477 return rc; 1478 } 1479 1480 if (req->r_osd == NULL) { 1481 dout("send_request %p no up osds in pg\n", req); 1482 ceph_monc_request_next_osdmap(&osdc->client->monc); 1483 } else { 1484 __send_queued(osdc); 1485 } 1486 1487 return 0; 1488 } 1489 1490 /* 1491 * Timeout callback, called every N seconds when 1 or more osd 1492 * requests has been active for more than N seconds. When this 1493 * happens, we ping all OSDs with requests who have timed out to 1494 * ensure any communications channel reset is detected. Reset the 1495 * request timeouts another N seconds in the future as we go. 1496 * Reschedule the timeout event another N seconds in future (unless 1497 * there are no open requests). 1498 */ 1499 static void handle_timeout(struct work_struct *work) 1500 { 1501 struct ceph_osd_client *osdc = 1502 container_of(work, struct ceph_osd_client, timeout_work.work); 1503 struct ceph_osd_request *req; 1504 struct ceph_osd *osd; 1505 unsigned long keepalive = 1506 osdc->client->options->osd_keepalive_timeout * HZ; 1507 struct list_head slow_osds; 1508 dout("timeout\n"); 1509 down_read(&osdc->map_sem); 1510 1511 ceph_monc_request_next_osdmap(&osdc->client->monc); 1512 1513 mutex_lock(&osdc->request_mutex); 1514 1515 /* 1516 * ping osds that are a bit slow. this ensures that if there 1517 * is a break in the TCP connection we will notice, and reopen 1518 * a connection with that osd (from the fault callback). 1519 */ 1520 INIT_LIST_HEAD(&slow_osds); 1521 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { 1522 if (time_before(jiffies, req->r_stamp + keepalive)) 1523 break; 1524 1525 osd = req->r_osd; 1526 BUG_ON(!osd); 1527 dout(" tid %llu is slow, will send keepalive on osd%d\n", 1528 req->r_tid, osd->o_osd); 1529 list_move_tail(&osd->o_keepalive_item, &slow_osds); 1530 } 1531 while (!list_empty(&slow_osds)) { 1532 osd = list_entry(slow_osds.next, struct ceph_osd, 1533 o_keepalive_item); 1534 list_del_init(&osd->o_keepalive_item); 1535 ceph_con_keepalive(&osd->o_con); 1536 } 1537 1538 __schedule_osd_timeout(osdc); 1539 __send_queued(osdc); 1540 mutex_unlock(&osdc->request_mutex); 1541 up_read(&osdc->map_sem); 1542 } 1543 1544 static void handle_osds_timeout(struct work_struct *work) 1545 { 1546 struct ceph_osd_client *osdc = 1547 container_of(work, struct ceph_osd_client, 1548 osds_timeout_work.work); 1549 unsigned long delay = 1550 osdc->client->options->osd_idle_ttl * HZ >> 2; 1551 1552 dout("osds timeout\n"); 1553 down_read(&osdc->map_sem); 1554 remove_old_osds(osdc); 1555 up_read(&osdc->map_sem); 1556 1557 schedule_delayed_work(&osdc->osds_timeout_work, 1558 round_jiffies_relative(delay)); 1559 } 1560 1561 static int ceph_oloc_decode(void **p, void *end, 1562 struct ceph_object_locator *oloc) 1563 { 1564 u8 struct_v, struct_cv; 1565 u32 len; 1566 void *struct_end; 1567 int ret = 0; 1568 1569 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1570 struct_v = ceph_decode_8(p); 1571 struct_cv = ceph_decode_8(p); 1572 if (struct_v < 3) { 1573 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 1574 struct_v, struct_cv); 1575 goto e_inval; 1576 } 1577 if (struct_cv > 6) { 1578 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 1579 struct_v, struct_cv); 1580 goto e_inval; 1581 } 1582 len = ceph_decode_32(p); 1583 ceph_decode_need(p, end, len, e_inval); 1584 struct_end = *p + len; 1585 1586 oloc->pool = ceph_decode_64(p); 1587 *p += 4; /* skip preferred */ 1588 1589 len = ceph_decode_32(p); 1590 if (len > 0) { 1591 pr_warn("ceph_object_locator::key is set\n"); 1592 goto e_inval; 1593 } 1594 1595 if (struct_v >= 5) { 1596 len = ceph_decode_32(p); 1597 if (len > 0) { 1598 pr_warn("ceph_object_locator::nspace is set\n"); 1599 goto e_inval; 1600 } 1601 } 1602 1603 if (struct_v >= 6) { 1604 s64 hash = ceph_decode_64(p); 1605 if (hash != -1) { 1606 pr_warn("ceph_object_locator::hash is set\n"); 1607 goto e_inval; 1608 } 1609 } 1610 1611 /* skip the rest */ 1612 *p = struct_end; 1613 out: 1614 return ret; 1615 1616 e_inval: 1617 ret = -EINVAL; 1618 goto out; 1619 } 1620 1621 static int ceph_redirect_decode(void **p, void *end, 1622 struct ceph_request_redirect *redir) 1623 { 1624 u8 struct_v, struct_cv; 1625 u32 len; 1626 void *struct_end; 1627 int ret; 1628 1629 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 1630 struct_v = ceph_decode_8(p); 1631 struct_cv = ceph_decode_8(p); 1632 if (struct_cv > 1) { 1633 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 1634 struct_v, struct_cv); 1635 goto e_inval; 1636 } 1637 len = ceph_decode_32(p); 1638 ceph_decode_need(p, end, len, e_inval); 1639 struct_end = *p + len; 1640 1641 ret = ceph_oloc_decode(p, end, &redir->oloc); 1642 if (ret) 1643 goto out; 1644 1645 len = ceph_decode_32(p); 1646 if (len > 0) { 1647 pr_warn("ceph_request_redirect::object_name is set\n"); 1648 goto e_inval; 1649 } 1650 1651 len = ceph_decode_32(p); 1652 *p += len; /* skip osd_instructions */ 1653 1654 /* skip the rest */ 1655 *p = struct_end; 1656 out: 1657 return ret; 1658 1659 e_inval: 1660 ret = -EINVAL; 1661 goto out; 1662 } 1663 1664 static void complete_request(struct ceph_osd_request *req) 1665 { 1666 complete_all(&req->r_safe_completion); /* fsync waiter */ 1667 } 1668 1669 /* 1670 * handle osd op reply. either call the callback if it is specified, 1671 * or do the completion to wake up the waiting thread. 1672 */ 1673 static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, 1674 struct ceph_connection *con) 1675 { 1676 void *p, *end; 1677 struct ceph_osd_request *req; 1678 struct ceph_request_redirect redir; 1679 u64 tid; 1680 int object_len; 1681 unsigned int numops; 1682 int payload_len, flags; 1683 s32 result; 1684 s32 retry_attempt; 1685 struct ceph_pg pg; 1686 int err; 1687 u32 reassert_epoch; 1688 u64 reassert_version; 1689 u32 osdmap_epoch; 1690 int already_completed; 1691 u32 bytes; 1692 unsigned int i; 1693 1694 tid = le64_to_cpu(msg->hdr.tid); 1695 dout("handle_reply %p tid %llu\n", msg, tid); 1696 1697 p = msg->front.iov_base; 1698 end = p + msg->front.iov_len; 1699 1700 ceph_decode_need(&p, end, 4, bad); 1701 object_len = ceph_decode_32(&p); 1702 ceph_decode_need(&p, end, object_len, bad); 1703 p += object_len; 1704 1705 err = ceph_decode_pgid(&p, end, &pg); 1706 if (err) 1707 goto bad; 1708 1709 ceph_decode_need(&p, end, 8 + 4 + 4 + 8 + 4, bad); 1710 flags = ceph_decode_64(&p); 1711 result = ceph_decode_32(&p); 1712 reassert_epoch = ceph_decode_32(&p); 1713 reassert_version = ceph_decode_64(&p); 1714 osdmap_epoch = ceph_decode_32(&p); 1715 1716 /* lookup */ 1717 down_read(&osdc->map_sem); 1718 mutex_lock(&osdc->request_mutex); 1719 req = __lookup_request(osdc, tid); 1720 if (req == NULL) { 1721 dout("handle_reply tid %llu dne\n", tid); 1722 goto bad_mutex; 1723 } 1724 ceph_osdc_get_request(req); 1725 1726 dout("handle_reply %p tid %llu req %p result %d\n", msg, tid, 1727 req, result); 1728 1729 ceph_decode_need(&p, end, 4, bad_put); 1730 numops = ceph_decode_32(&p); 1731 if (numops > CEPH_OSD_MAX_OP) 1732 goto bad_put; 1733 if (numops != req->r_num_ops) 1734 goto bad_put; 1735 payload_len = 0; 1736 ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put); 1737 for (i = 0; i < numops; i++) { 1738 struct ceph_osd_op *op = p; 1739 int len; 1740 1741 len = le32_to_cpu(op->payload_len); 1742 req->r_reply_op_len[i] = len; 1743 dout(" op %d has %d bytes\n", i, len); 1744 payload_len += len; 1745 p += sizeof(*op); 1746 } 1747 bytes = le32_to_cpu(msg->hdr.data_len); 1748 if (payload_len != bytes) { 1749 pr_warning("sum of op payload lens %d != data_len %d", 1750 payload_len, bytes); 1751 goto bad_put; 1752 } 1753 1754 ceph_decode_need(&p, end, 4 + numops * 4, bad_put); 1755 retry_attempt = ceph_decode_32(&p); 1756 for (i = 0; i < numops; i++) 1757 req->r_reply_op_result[i] = ceph_decode_32(&p); 1758 1759 if (le16_to_cpu(msg->hdr.version) >= 6) { 1760 p += 8 + 4; /* skip replay_version */ 1761 p += 8; /* skip user_version */ 1762 1763 err = ceph_redirect_decode(&p, end, &redir); 1764 if (err) 1765 goto bad_put; 1766 } else { 1767 redir.oloc.pool = -1; 1768 } 1769 1770 if (redir.oloc.pool != -1) { 1771 dout("redirect pool %lld\n", redir.oloc.pool); 1772 1773 __unregister_request(osdc, req); 1774 1775 req->r_target_oloc = redir.oloc; /* struct */ 1776 1777 /* 1778 * Start redirect requests with nofail=true. If 1779 * mapping fails, request will end up on the notarget 1780 * list, waiting for the new osdmap (which can take 1781 * a while), even though the original request mapped 1782 * successfully. In the future we might want to follow 1783 * original request's nofail setting here. 1784 */ 1785 err = __ceph_osdc_start_request(osdc, req, true); 1786 BUG_ON(err); 1787 1788 goto out_unlock; 1789 } 1790 1791 already_completed = req->r_got_reply; 1792 if (!req->r_got_reply) { 1793 req->r_result = result; 1794 dout("handle_reply result %d bytes %d\n", req->r_result, 1795 bytes); 1796 if (req->r_result == 0) 1797 req->r_result = bytes; 1798 1799 /* in case this is a write and we need to replay, */ 1800 req->r_reassert_version.epoch = cpu_to_le32(reassert_epoch); 1801 req->r_reassert_version.version = cpu_to_le64(reassert_version); 1802 1803 req->r_got_reply = 1; 1804 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { 1805 dout("handle_reply tid %llu dup ack\n", tid); 1806 goto out_unlock; 1807 } 1808 1809 dout("handle_reply tid %llu flags %d\n", tid, flags); 1810 1811 if (req->r_linger && (flags & CEPH_OSD_FLAG_ONDISK)) 1812 __register_linger_request(osdc, req); 1813 1814 /* either this is a read, or we got the safe response */ 1815 if (result < 0 || 1816 (flags & CEPH_OSD_FLAG_ONDISK) || 1817 ((flags & CEPH_OSD_FLAG_WRITE) == 0)) 1818 __unregister_request(osdc, req); 1819 1820 mutex_unlock(&osdc->request_mutex); 1821 up_read(&osdc->map_sem); 1822 1823 if (!already_completed) { 1824 if (req->r_unsafe_callback && 1825 result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK)) 1826 req->r_unsafe_callback(req, true); 1827 if (req->r_callback) 1828 req->r_callback(req, msg); 1829 else 1830 complete_all(&req->r_completion); 1831 } 1832 1833 if (flags & CEPH_OSD_FLAG_ONDISK) { 1834 if (req->r_unsafe_callback && already_completed) 1835 req->r_unsafe_callback(req, false); 1836 complete_request(req); 1837 } 1838 1839 out: 1840 dout("req=%p req->r_linger=%d\n", req, req->r_linger); 1841 ceph_osdc_put_request(req); 1842 return; 1843 out_unlock: 1844 mutex_unlock(&osdc->request_mutex); 1845 up_read(&osdc->map_sem); 1846 goto out; 1847 1848 bad_put: 1849 req->r_result = -EIO; 1850 __unregister_request(osdc, req); 1851 if (req->r_callback) 1852 req->r_callback(req, msg); 1853 else 1854 complete_all(&req->r_completion); 1855 complete_request(req); 1856 ceph_osdc_put_request(req); 1857 bad_mutex: 1858 mutex_unlock(&osdc->request_mutex); 1859 up_read(&osdc->map_sem); 1860 bad: 1861 pr_err("corrupt osd_op_reply got %d %d\n", 1862 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); 1863 ceph_msg_dump(msg); 1864 } 1865 1866 static void reset_changed_osds(struct ceph_osd_client *osdc) 1867 { 1868 struct rb_node *p, *n; 1869 1870 for (p = rb_first(&osdc->osds); p; p = n) { 1871 struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node); 1872 1873 n = rb_next(p); 1874 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 1875 memcmp(&osd->o_con.peer_addr, 1876 ceph_osd_addr(osdc->osdmap, 1877 osd->o_osd), 1878 sizeof(struct ceph_entity_addr)) != 0) 1879 __reset_osd(osdc, osd); 1880 } 1881 } 1882 1883 /* 1884 * Requeue requests whose mapping to an OSD has changed. If requests map to 1885 * no osd, request a new map. 1886 * 1887 * Caller should hold map_sem for read. 1888 */ 1889 static void kick_requests(struct ceph_osd_client *osdc, bool force_resend, 1890 bool force_resend_writes) 1891 { 1892 struct ceph_osd_request *req, *nreq; 1893 struct rb_node *p; 1894 int needmap = 0; 1895 int err; 1896 bool force_resend_req; 1897 1898 dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "", 1899 force_resend_writes ? " (force resend writes)" : ""); 1900 mutex_lock(&osdc->request_mutex); 1901 for (p = rb_first(&osdc->requests); p; ) { 1902 req = rb_entry(p, struct ceph_osd_request, r_node); 1903 p = rb_next(p); 1904 1905 /* 1906 * For linger requests that have not yet been 1907 * registered, move them to the linger list; they'll 1908 * be sent to the osd in the loop below. Unregister 1909 * the request before re-registering it as a linger 1910 * request to ensure the __map_request() below 1911 * will decide it needs to be sent. 1912 */ 1913 if (req->r_linger && list_empty(&req->r_linger_item)) { 1914 dout("%p tid %llu restart on osd%d\n", 1915 req, req->r_tid, 1916 req->r_osd ? req->r_osd->o_osd : -1); 1917 ceph_osdc_get_request(req); 1918 __unregister_request(osdc, req); 1919 __register_linger_request(osdc, req); 1920 ceph_osdc_put_request(req); 1921 continue; 1922 } 1923 1924 force_resend_req = force_resend || 1925 (force_resend_writes && 1926 req->r_flags & CEPH_OSD_FLAG_WRITE); 1927 err = __map_request(osdc, req, force_resend_req); 1928 if (err < 0) 1929 continue; /* error */ 1930 if (req->r_osd == NULL) { 1931 dout("%p tid %llu maps to no osd\n", req, req->r_tid); 1932 needmap++; /* request a newer map */ 1933 } else if (err > 0) { 1934 if (!req->r_linger) { 1935 dout("%p tid %llu requeued on osd%d\n", req, 1936 req->r_tid, 1937 req->r_osd ? req->r_osd->o_osd : -1); 1938 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1939 } 1940 } 1941 } 1942 1943 list_for_each_entry_safe(req, nreq, &osdc->req_linger, 1944 r_linger_item) { 1945 dout("linger req=%p req->r_osd=%p\n", req, req->r_osd); 1946 1947 err = __map_request(osdc, req, 1948 force_resend || force_resend_writes); 1949 dout("__map_request returned %d\n", err); 1950 if (err == 0) 1951 continue; /* no change and no osd was specified */ 1952 if (err < 0) 1953 continue; /* hrm! */ 1954 if (req->r_osd == NULL) { 1955 dout("tid %llu maps to no valid osd\n", req->r_tid); 1956 needmap++; /* request a newer map */ 1957 continue; 1958 } 1959 1960 dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid, 1961 req->r_osd ? req->r_osd->o_osd : -1); 1962 __register_request(osdc, req); 1963 __unregister_linger_request(osdc, req); 1964 } 1965 reset_changed_osds(osdc); 1966 mutex_unlock(&osdc->request_mutex); 1967 1968 if (needmap) { 1969 dout("%d requests for down osds, need new map\n", needmap); 1970 ceph_monc_request_next_osdmap(&osdc->client->monc); 1971 } 1972 } 1973 1974 1975 /* 1976 * Process updated osd map. 1977 * 1978 * The message contains any number of incremental and full maps, normally 1979 * indicating some sort of topology change in the cluster. Kick requests 1980 * off to different OSDs as needed. 1981 */ 1982 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 1983 { 1984 void *p, *end, *next; 1985 u32 nr_maps, maplen; 1986 u32 epoch; 1987 struct ceph_osdmap *newmap = NULL, *oldmap; 1988 int err; 1989 struct ceph_fsid fsid; 1990 bool was_full; 1991 1992 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0); 1993 p = msg->front.iov_base; 1994 end = p + msg->front.iov_len; 1995 1996 /* verify fsid */ 1997 ceph_decode_need(&p, end, sizeof(fsid), bad); 1998 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 1999 if (ceph_check_fsid(osdc->client, &fsid) < 0) 2000 return; 2001 2002 down_write(&osdc->map_sem); 2003 2004 was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL); 2005 2006 /* incremental maps */ 2007 ceph_decode_32_safe(&p, end, nr_maps, bad); 2008 dout(" %d inc maps\n", nr_maps); 2009 while (nr_maps > 0) { 2010 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2011 epoch = ceph_decode_32(&p); 2012 maplen = ceph_decode_32(&p); 2013 ceph_decode_need(&p, end, maplen, bad); 2014 next = p + maplen; 2015 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) { 2016 dout("applying incremental map %u len %d\n", 2017 epoch, maplen); 2018 newmap = osdmap_apply_incremental(&p, next, 2019 osdc->osdmap, 2020 &osdc->client->msgr); 2021 if (IS_ERR(newmap)) { 2022 err = PTR_ERR(newmap); 2023 goto bad; 2024 } 2025 BUG_ON(!newmap); 2026 if (newmap != osdc->osdmap) { 2027 ceph_osdmap_destroy(osdc->osdmap); 2028 osdc->osdmap = newmap; 2029 } 2030 was_full = was_full || 2031 ceph_osdmap_flag(osdc->osdmap, 2032 CEPH_OSDMAP_FULL); 2033 kick_requests(osdc, 0, was_full); 2034 } else { 2035 dout("ignoring incremental map %u len %d\n", 2036 epoch, maplen); 2037 } 2038 p = next; 2039 nr_maps--; 2040 } 2041 if (newmap) 2042 goto done; 2043 2044 /* full maps */ 2045 ceph_decode_32_safe(&p, end, nr_maps, bad); 2046 dout(" %d full maps\n", nr_maps); 2047 while (nr_maps) { 2048 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 2049 epoch = ceph_decode_32(&p); 2050 maplen = ceph_decode_32(&p); 2051 ceph_decode_need(&p, end, maplen, bad); 2052 if (nr_maps > 1) { 2053 dout("skipping non-latest full map %u len %d\n", 2054 epoch, maplen); 2055 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) { 2056 dout("skipping full map %u len %d, " 2057 "older than our %u\n", epoch, maplen, 2058 osdc->osdmap->epoch); 2059 } else { 2060 int skipped_map = 0; 2061 2062 dout("taking full map %u len %d\n", epoch, maplen); 2063 newmap = ceph_osdmap_decode(&p, p+maplen); 2064 if (IS_ERR(newmap)) { 2065 err = PTR_ERR(newmap); 2066 goto bad; 2067 } 2068 BUG_ON(!newmap); 2069 oldmap = osdc->osdmap; 2070 osdc->osdmap = newmap; 2071 if (oldmap) { 2072 if (oldmap->epoch + 1 < newmap->epoch) 2073 skipped_map = 1; 2074 ceph_osdmap_destroy(oldmap); 2075 } 2076 was_full = was_full || 2077 ceph_osdmap_flag(osdc->osdmap, 2078 CEPH_OSDMAP_FULL); 2079 kick_requests(osdc, skipped_map, was_full); 2080 } 2081 p += maplen; 2082 nr_maps--; 2083 } 2084 2085 if (!osdc->osdmap) 2086 goto bad; 2087 done: 2088 downgrade_write(&osdc->map_sem); 2089 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch); 2090 2091 /* 2092 * subscribe to subsequent osdmap updates if full to ensure 2093 * we find out when we are no longer full and stop returning 2094 * ENOSPC. 2095 */ 2096 if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) || 2097 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) || 2098 ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) 2099 ceph_monc_request_next_osdmap(&osdc->client->monc); 2100 2101 mutex_lock(&osdc->request_mutex); 2102 __send_queued(osdc); 2103 mutex_unlock(&osdc->request_mutex); 2104 up_read(&osdc->map_sem); 2105 wake_up_all(&osdc->client->auth_wq); 2106 return; 2107 2108 bad: 2109 pr_err("osdc handle_map corrupt msg\n"); 2110 ceph_msg_dump(msg); 2111 up_write(&osdc->map_sem); 2112 } 2113 2114 /* 2115 * watch/notify callback event infrastructure 2116 * 2117 * These callbacks are used both for watch and notify operations. 2118 */ 2119 static void __release_event(struct kref *kref) 2120 { 2121 struct ceph_osd_event *event = 2122 container_of(kref, struct ceph_osd_event, kref); 2123 2124 dout("__release_event %p\n", event); 2125 kfree(event); 2126 } 2127 2128 static void get_event(struct ceph_osd_event *event) 2129 { 2130 kref_get(&event->kref); 2131 } 2132 2133 void ceph_osdc_put_event(struct ceph_osd_event *event) 2134 { 2135 kref_put(&event->kref, __release_event); 2136 } 2137 EXPORT_SYMBOL(ceph_osdc_put_event); 2138 2139 static void __insert_event(struct ceph_osd_client *osdc, 2140 struct ceph_osd_event *new) 2141 { 2142 struct rb_node **p = &osdc->event_tree.rb_node; 2143 struct rb_node *parent = NULL; 2144 struct ceph_osd_event *event = NULL; 2145 2146 while (*p) { 2147 parent = *p; 2148 event = rb_entry(parent, struct ceph_osd_event, node); 2149 if (new->cookie < event->cookie) 2150 p = &(*p)->rb_left; 2151 else if (new->cookie > event->cookie) 2152 p = &(*p)->rb_right; 2153 else 2154 BUG(); 2155 } 2156 2157 rb_link_node(&new->node, parent, p); 2158 rb_insert_color(&new->node, &osdc->event_tree); 2159 } 2160 2161 static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc, 2162 u64 cookie) 2163 { 2164 struct rb_node **p = &osdc->event_tree.rb_node; 2165 struct rb_node *parent = NULL; 2166 struct ceph_osd_event *event = NULL; 2167 2168 while (*p) { 2169 parent = *p; 2170 event = rb_entry(parent, struct ceph_osd_event, node); 2171 if (cookie < event->cookie) 2172 p = &(*p)->rb_left; 2173 else if (cookie > event->cookie) 2174 p = &(*p)->rb_right; 2175 else 2176 return event; 2177 } 2178 return NULL; 2179 } 2180 2181 static void __remove_event(struct ceph_osd_event *event) 2182 { 2183 struct ceph_osd_client *osdc = event->osdc; 2184 2185 if (!RB_EMPTY_NODE(&event->node)) { 2186 dout("__remove_event removed %p\n", event); 2187 rb_erase(&event->node, &osdc->event_tree); 2188 ceph_osdc_put_event(event); 2189 } else { 2190 dout("__remove_event didn't remove %p\n", event); 2191 } 2192 } 2193 2194 int ceph_osdc_create_event(struct ceph_osd_client *osdc, 2195 void (*event_cb)(u64, u64, u8, void *), 2196 void *data, struct ceph_osd_event **pevent) 2197 { 2198 struct ceph_osd_event *event; 2199 2200 event = kmalloc(sizeof(*event), GFP_NOIO); 2201 if (!event) 2202 return -ENOMEM; 2203 2204 dout("create_event %p\n", event); 2205 event->cb = event_cb; 2206 event->one_shot = 0; 2207 event->data = data; 2208 event->osdc = osdc; 2209 INIT_LIST_HEAD(&event->osd_node); 2210 RB_CLEAR_NODE(&event->node); 2211 kref_init(&event->kref); /* one ref for us */ 2212 kref_get(&event->kref); /* one ref for the caller */ 2213 2214 spin_lock(&osdc->event_lock); 2215 event->cookie = ++osdc->event_count; 2216 __insert_event(osdc, event); 2217 spin_unlock(&osdc->event_lock); 2218 2219 *pevent = event; 2220 return 0; 2221 } 2222 EXPORT_SYMBOL(ceph_osdc_create_event); 2223 2224 void ceph_osdc_cancel_event(struct ceph_osd_event *event) 2225 { 2226 struct ceph_osd_client *osdc = event->osdc; 2227 2228 dout("cancel_event %p\n", event); 2229 spin_lock(&osdc->event_lock); 2230 __remove_event(event); 2231 spin_unlock(&osdc->event_lock); 2232 ceph_osdc_put_event(event); /* caller's */ 2233 } 2234 EXPORT_SYMBOL(ceph_osdc_cancel_event); 2235 2236 2237 static void do_event_work(struct work_struct *work) 2238 { 2239 struct ceph_osd_event_work *event_work = 2240 container_of(work, struct ceph_osd_event_work, work); 2241 struct ceph_osd_event *event = event_work->event; 2242 u64 ver = event_work->ver; 2243 u64 notify_id = event_work->notify_id; 2244 u8 opcode = event_work->opcode; 2245 2246 dout("do_event_work completing %p\n", event); 2247 event->cb(ver, notify_id, opcode, event->data); 2248 dout("do_event_work completed %p\n", event); 2249 ceph_osdc_put_event(event); 2250 kfree(event_work); 2251 } 2252 2253 2254 /* 2255 * Process osd watch notifications 2256 */ 2257 static void handle_watch_notify(struct ceph_osd_client *osdc, 2258 struct ceph_msg *msg) 2259 { 2260 void *p, *end; 2261 u8 proto_ver; 2262 u64 cookie, ver, notify_id; 2263 u8 opcode; 2264 struct ceph_osd_event *event; 2265 struct ceph_osd_event_work *event_work; 2266 2267 p = msg->front.iov_base; 2268 end = p + msg->front.iov_len; 2269 2270 ceph_decode_8_safe(&p, end, proto_ver, bad); 2271 ceph_decode_8_safe(&p, end, opcode, bad); 2272 ceph_decode_64_safe(&p, end, cookie, bad); 2273 ceph_decode_64_safe(&p, end, ver, bad); 2274 ceph_decode_64_safe(&p, end, notify_id, bad); 2275 2276 spin_lock(&osdc->event_lock); 2277 event = __find_event(osdc, cookie); 2278 if (event) { 2279 BUG_ON(event->one_shot); 2280 get_event(event); 2281 } 2282 spin_unlock(&osdc->event_lock); 2283 dout("handle_watch_notify cookie %lld ver %lld event %p\n", 2284 cookie, ver, event); 2285 if (event) { 2286 event_work = kmalloc(sizeof(*event_work), GFP_NOIO); 2287 if (!event_work) { 2288 dout("ERROR: could not allocate event_work\n"); 2289 goto done_err; 2290 } 2291 INIT_WORK(&event_work->work, do_event_work); 2292 event_work->event = event; 2293 event_work->ver = ver; 2294 event_work->notify_id = notify_id; 2295 event_work->opcode = opcode; 2296 if (!queue_work(osdc->notify_wq, &event_work->work)) { 2297 dout("WARNING: failed to queue notify event work\n"); 2298 goto done_err; 2299 } 2300 } 2301 2302 return; 2303 2304 done_err: 2305 ceph_osdc_put_event(event); 2306 return; 2307 2308 bad: 2309 pr_err("osdc handle_watch_notify corrupt msg\n"); 2310 } 2311 2312 /* 2313 * build new request AND message 2314 * 2315 */ 2316 void ceph_osdc_build_request(struct ceph_osd_request *req, u64 off, 2317 struct ceph_snap_context *snapc, u64 snap_id, 2318 struct timespec *mtime) 2319 { 2320 struct ceph_msg *msg = req->r_request; 2321 void *p; 2322 size_t msg_size; 2323 int flags = req->r_flags; 2324 u64 data_len; 2325 unsigned int i; 2326 2327 req->r_snapid = snap_id; 2328 req->r_snapc = ceph_get_snap_context(snapc); 2329 2330 /* encode request */ 2331 msg->hdr.version = cpu_to_le16(4); 2332 2333 p = msg->front.iov_base; 2334 ceph_encode_32(&p, 1); /* client_inc is always 1 */ 2335 req->r_request_osdmap_epoch = p; 2336 p += 4; 2337 req->r_request_flags = p; 2338 p += 4; 2339 if (req->r_flags & CEPH_OSD_FLAG_WRITE) 2340 ceph_encode_timespec(p, mtime); 2341 p += sizeof(struct ceph_timespec); 2342 req->r_request_reassert_version = p; 2343 p += sizeof(struct ceph_eversion); /* will get filled in */ 2344 2345 /* oloc */ 2346 ceph_encode_8(&p, 4); 2347 ceph_encode_8(&p, 4); 2348 ceph_encode_32(&p, 8 + 4 + 4); 2349 req->r_request_pool = p; 2350 p += 8; 2351 ceph_encode_32(&p, -1); /* preferred */ 2352 ceph_encode_32(&p, 0); /* key len */ 2353 2354 ceph_encode_8(&p, 1); 2355 req->r_request_pgid = p; 2356 p += 8 + 4; 2357 ceph_encode_32(&p, -1); /* preferred */ 2358 2359 /* oid */ 2360 ceph_encode_32(&p, req->r_base_oid.name_len); 2361 memcpy(p, req->r_base_oid.name, req->r_base_oid.name_len); 2362 dout("oid '%.*s' len %d\n", req->r_base_oid.name_len, 2363 req->r_base_oid.name, req->r_base_oid.name_len); 2364 p += req->r_base_oid.name_len; 2365 2366 /* ops--can imply data */ 2367 ceph_encode_16(&p, (u16)req->r_num_ops); 2368 data_len = 0; 2369 for (i = 0; i < req->r_num_ops; i++) { 2370 data_len += osd_req_encode_op(req, p, i); 2371 p += sizeof(struct ceph_osd_op); 2372 } 2373 2374 /* snaps */ 2375 ceph_encode_64(&p, req->r_snapid); 2376 ceph_encode_64(&p, req->r_snapc ? req->r_snapc->seq : 0); 2377 ceph_encode_32(&p, req->r_snapc ? req->r_snapc->num_snaps : 0); 2378 if (req->r_snapc) { 2379 for (i = 0; i < snapc->num_snaps; i++) { 2380 ceph_encode_64(&p, req->r_snapc->snaps[i]); 2381 } 2382 } 2383 2384 req->r_request_attempts = p; 2385 p += 4; 2386 2387 /* data */ 2388 if (flags & CEPH_OSD_FLAG_WRITE) { 2389 u16 data_off; 2390 2391 /* 2392 * The header "data_off" is a hint to the receiver 2393 * allowing it to align received data into its 2394 * buffers such that there's no need to re-copy 2395 * it before writing it to disk (direct I/O). 2396 */ 2397 data_off = (u16) (off & 0xffff); 2398 req->r_request->hdr.data_off = cpu_to_le16(data_off); 2399 } 2400 req->r_request->hdr.data_len = cpu_to_le32(data_len); 2401 2402 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 2403 msg_size = p - msg->front.iov_base; 2404 msg->front.iov_len = msg_size; 2405 msg->hdr.front_len = cpu_to_le32(msg_size); 2406 2407 dout("build_request msg_size was %d\n", (int)msg_size); 2408 } 2409 EXPORT_SYMBOL(ceph_osdc_build_request); 2410 2411 /* 2412 * Register request, send initial attempt. 2413 */ 2414 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 2415 struct ceph_osd_request *req, 2416 bool nofail) 2417 { 2418 int rc; 2419 2420 down_read(&osdc->map_sem); 2421 mutex_lock(&osdc->request_mutex); 2422 2423 rc = __ceph_osdc_start_request(osdc, req, nofail); 2424 2425 mutex_unlock(&osdc->request_mutex); 2426 up_read(&osdc->map_sem); 2427 2428 return rc; 2429 } 2430 EXPORT_SYMBOL(ceph_osdc_start_request); 2431 2432 /* 2433 * wait for a request to complete 2434 */ 2435 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 2436 struct ceph_osd_request *req) 2437 { 2438 int rc; 2439 2440 rc = wait_for_completion_interruptible(&req->r_completion); 2441 if (rc < 0) { 2442 mutex_lock(&osdc->request_mutex); 2443 __cancel_request(req); 2444 __unregister_request(osdc, req); 2445 mutex_unlock(&osdc->request_mutex); 2446 complete_request(req); 2447 dout("wait_request tid %llu canceled/timed out\n", req->r_tid); 2448 return rc; 2449 } 2450 2451 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result); 2452 return req->r_result; 2453 } 2454 EXPORT_SYMBOL(ceph_osdc_wait_request); 2455 2456 /* 2457 * sync - wait for all in-flight requests to flush. avoid starvation. 2458 */ 2459 void ceph_osdc_sync(struct ceph_osd_client *osdc) 2460 { 2461 struct ceph_osd_request *req; 2462 u64 last_tid, next_tid = 0; 2463 2464 mutex_lock(&osdc->request_mutex); 2465 last_tid = osdc->last_tid; 2466 while (1) { 2467 req = __lookup_request_ge(osdc, next_tid); 2468 if (!req) 2469 break; 2470 if (req->r_tid > last_tid) 2471 break; 2472 2473 next_tid = req->r_tid + 1; 2474 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0) 2475 continue; 2476 2477 ceph_osdc_get_request(req); 2478 mutex_unlock(&osdc->request_mutex); 2479 dout("sync waiting on tid %llu (last is %llu)\n", 2480 req->r_tid, last_tid); 2481 wait_for_completion(&req->r_safe_completion); 2482 mutex_lock(&osdc->request_mutex); 2483 ceph_osdc_put_request(req); 2484 } 2485 mutex_unlock(&osdc->request_mutex); 2486 dout("sync done (thru tid %llu)\n", last_tid); 2487 } 2488 EXPORT_SYMBOL(ceph_osdc_sync); 2489 2490 /* 2491 * Call all pending notify callbacks - for use after a watch is 2492 * unregistered, to make sure no more callbacks for it will be invoked 2493 */ 2494 extern void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 2495 { 2496 flush_workqueue(osdc->notify_wq); 2497 } 2498 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 2499 2500 2501 /* 2502 * init, shutdown 2503 */ 2504 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 2505 { 2506 int err; 2507 2508 dout("init\n"); 2509 osdc->client = client; 2510 osdc->osdmap = NULL; 2511 init_rwsem(&osdc->map_sem); 2512 init_completion(&osdc->map_waiters); 2513 osdc->last_requested_map = 0; 2514 mutex_init(&osdc->request_mutex); 2515 osdc->last_tid = 0; 2516 osdc->osds = RB_ROOT; 2517 INIT_LIST_HEAD(&osdc->osd_lru); 2518 osdc->requests = RB_ROOT; 2519 INIT_LIST_HEAD(&osdc->req_lru); 2520 INIT_LIST_HEAD(&osdc->req_unsent); 2521 INIT_LIST_HEAD(&osdc->req_notarget); 2522 INIT_LIST_HEAD(&osdc->req_linger); 2523 osdc->num_requests = 0; 2524 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 2525 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 2526 spin_lock_init(&osdc->event_lock); 2527 osdc->event_tree = RB_ROOT; 2528 osdc->event_count = 0; 2529 2530 schedule_delayed_work(&osdc->osds_timeout_work, 2531 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); 2532 2533 err = -ENOMEM; 2534 osdc->req_mempool = mempool_create_kmalloc_pool(10, 2535 sizeof(struct ceph_osd_request)); 2536 if (!osdc->req_mempool) 2537 goto out; 2538 2539 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 2540 OSD_OP_FRONT_LEN, 10, true, 2541 "osd_op"); 2542 if (err < 0) 2543 goto out_mempool; 2544 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 2545 OSD_OPREPLY_FRONT_LEN, 10, true, 2546 "osd_op_reply"); 2547 if (err < 0) 2548 goto out_msgpool; 2549 2550 err = -ENOMEM; 2551 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 2552 if (!osdc->notify_wq) 2553 goto out_msgpool_reply; 2554 2555 return 0; 2556 2557 out_msgpool_reply: 2558 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2559 out_msgpool: 2560 ceph_msgpool_destroy(&osdc->msgpool_op); 2561 out_mempool: 2562 mempool_destroy(osdc->req_mempool); 2563 out: 2564 return err; 2565 } 2566 2567 void ceph_osdc_stop(struct ceph_osd_client *osdc) 2568 { 2569 flush_workqueue(osdc->notify_wq); 2570 destroy_workqueue(osdc->notify_wq); 2571 cancel_delayed_work_sync(&osdc->timeout_work); 2572 cancel_delayed_work_sync(&osdc->osds_timeout_work); 2573 if (osdc->osdmap) { 2574 ceph_osdmap_destroy(osdc->osdmap); 2575 osdc->osdmap = NULL; 2576 } 2577 remove_all_osds(osdc); 2578 mempool_destroy(osdc->req_mempool); 2579 ceph_msgpool_destroy(&osdc->msgpool_op); 2580 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 2581 } 2582 2583 /* 2584 * Read some contiguous pages. If we cross a stripe boundary, shorten 2585 * *plen. Return number of bytes read, or error. 2586 */ 2587 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 2588 struct ceph_vino vino, struct ceph_file_layout *layout, 2589 u64 off, u64 *plen, 2590 u32 truncate_seq, u64 truncate_size, 2591 struct page **pages, int num_pages, int page_align) 2592 { 2593 struct ceph_osd_request *req; 2594 int rc = 0; 2595 2596 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 2597 vino.snap, off, *plen); 2598 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, 2599 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 2600 NULL, truncate_seq, truncate_size, 2601 false); 2602 if (IS_ERR(req)) 2603 return PTR_ERR(req); 2604 2605 /* it may be a short read due to an object boundary */ 2606 2607 osd_req_op_extent_osd_data_pages(req, 0, 2608 pages, *plen, page_align, false, false); 2609 2610 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 2611 off, *plen, *plen, page_align); 2612 2613 ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); 2614 2615 rc = ceph_osdc_start_request(osdc, req, false); 2616 if (!rc) 2617 rc = ceph_osdc_wait_request(osdc, req); 2618 2619 ceph_osdc_put_request(req); 2620 dout("readpages result %d\n", rc); 2621 return rc; 2622 } 2623 EXPORT_SYMBOL(ceph_osdc_readpages); 2624 2625 /* 2626 * do a synchronous write on N pages 2627 */ 2628 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 2629 struct ceph_file_layout *layout, 2630 struct ceph_snap_context *snapc, 2631 u64 off, u64 len, 2632 u32 truncate_seq, u64 truncate_size, 2633 struct timespec *mtime, 2634 struct page **pages, int num_pages) 2635 { 2636 struct ceph_osd_request *req; 2637 int rc = 0; 2638 int page_align = off & ~PAGE_MASK; 2639 2640 BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ 2641 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, 2642 CEPH_OSD_OP_WRITE, 2643 CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, 2644 snapc, truncate_seq, truncate_size, 2645 true); 2646 if (IS_ERR(req)) 2647 return PTR_ERR(req); 2648 2649 /* it may be a short write due to an object boundary */ 2650 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 2651 false, false); 2652 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 2653 2654 ceph_osdc_build_request(req, off, snapc, CEPH_NOSNAP, mtime); 2655 2656 rc = ceph_osdc_start_request(osdc, req, true); 2657 if (!rc) 2658 rc = ceph_osdc_wait_request(osdc, req); 2659 2660 ceph_osdc_put_request(req); 2661 if (rc == 0) 2662 rc = len; 2663 dout("writepages result %d\n", rc); 2664 return rc; 2665 } 2666 EXPORT_SYMBOL(ceph_osdc_writepages); 2667 2668 int ceph_osdc_setup(void) 2669 { 2670 BUG_ON(ceph_osd_request_cache); 2671 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", 2672 sizeof (struct ceph_osd_request), 2673 __alignof__(struct ceph_osd_request), 2674 0, NULL); 2675 2676 return ceph_osd_request_cache ? 0 : -ENOMEM; 2677 } 2678 EXPORT_SYMBOL(ceph_osdc_setup); 2679 2680 void ceph_osdc_cleanup(void) 2681 { 2682 BUG_ON(!ceph_osd_request_cache); 2683 kmem_cache_destroy(ceph_osd_request_cache); 2684 ceph_osd_request_cache = NULL; 2685 } 2686 EXPORT_SYMBOL(ceph_osdc_cleanup); 2687 2688 /* 2689 * handle incoming message 2690 */ 2691 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 2692 { 2693 struct ceph_osd *osd = con->private; 2694 struct ceph_osd_client *osdc; 2695 int type = le16_to_cpu(msg->hdr.type); 2696 2697 if (!osd) 2698 goto out; 2699 osdc = osd->o_osdc; 2700 2701 switch (type) { 2702 case CEPH_MSG_OSD_MAP: 2703 ceph_osdc_handle_map(osdc, msg); 2704 break; 2705 case CEPH_MSG_OSD_OPREPLY: 2706 handle_reply(osdc, msg, con); 2707 break; 2708 case CEPH_MSG_WATCH_NOTIFY: 2709 handle_watch_notify(osdc, msg); 2710 break; 2711 2712 default: 2713 pr_err("received unknown message type %d %s\n", type, 2714 ceph_msg_type_name(type)); 2715 } 2716 out: 2717 ceph_msg_put(msg); 2718 } 2719 2720 /* 2721 * lookup and return message for incoming reply. set up reply message 2722 * pages. 2723 */ 2724 static struct ceph_msg *get_reply(struct ceph_connection *con, 2725 struct ceph_msg_header *hdr, 2726 int *skip) 2727 { 2728 struct ceph_osd *osd = con->private; 2729 struct ceph_osd_client *osdc = osd->o_osdc; 2730 struct ceph_msg *m; 2731 struct ceph_osd_request *req; 2732 int front_len = le32_to_cpu(hdr->front_len); 2733 int data_len = le32_to_cpu(hdr->data_len); 2734 u64 tid; 2735 2736 tid = le64_to_cpu(hdr->tid); 2737 mutex_lock(&osdc->request_mutex); 2738 req = __lookup_request(osdc, tid); 2739 if (!req) { 2740 *skip = 1; 2741 m = NULL; 2742 dout("get_reply unknown tid %llu from osd%d\n", tid, 2743 osd->o_osd); 2744 goto out; 2745 } 2746 2747 if (req->r_reply->con) 2748 dout("%s revoking msg %p from old con %p\n", __func__, 2749 req->r_reply, req->r_reply->con); 2750 ceph_msg_revoke_incoming(req->r_reply); 2751 2752 if (front_len > req->r_reply->front_alloc_len) { 2753 pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n", 2754 front_len, req->r_reply->front_alloc_len, 2755 (unsigned int)con->peer_name.type, 2756 le64_to_cpu(con->peer_name.num)); 2757 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 2758 false); 2759 if (!m) 2760 goto out; 2761 ceph_msg_put(req->r_reply); 2762 req->r_reply = m; 2763 } 2764 m = ceph_msg_get(req->r_reply); 2765 2766 if (data_len > 0) { 2767 struct ceph_osd_data *osd_data; 2768 2769 /* 2770 * XXX This is assuming there is only one op containing 2771 * XXX page data. Probably OK for reads, but this 2772 * XXX ought to be done more generally. 2773 */ 2774 osd_data = osd_req_op_extent_osd_data(req, 0); 2775 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 2776 if (osd_data->pages && 2777 unlikely(osd_data->length < data_len)) { 2778 2779 pr_warning("tid %lld reply has %d bytes " 2780 "we had only %llu bytes ready\n", 2781 tid, data_len, osd_data->length); 2782 *skip = 1; 2783 ceph_msg_put(m); 2784 m = NULL; 2785 goto out; 2786 } 2787 } 2788 } 2789 *skip = 0; 2790 dout("get_reply tid %lld %p\n", tid, m); 2791 2792 out: 2793 mutex_unlock(&osdc->request_mutex); 2794 return m; 2795 2796 } 2797 2798 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 2799 struct ceph_msg_header *hdr, 2800 int *skip) 2801 { 2802 struct ceph_osd *osd = con->private; 2803 int type = le16_to_cpu(hdr->type); 2804 int front = le32_to_cpu(hdr->front_len); 2805 2806 *skip = 0; 2807 switch (type) { 2808 case CEPH_MSG_OSD_MAP: 2809 case CEPH_MSG_WATCH_NOTIFY: 2810 return ceph_msg_new(type, front, GFP_NOFS, false); 2811 case CEPH_MSG_OSD_OPREPLY: 2812 return get_reply(con, hdr, skip); 2813 default: 2814 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type, 2815 osd->o_osd); 2816 *skip = 1; 2817 return NULL; 2818 } 2819 } 2820 2821 /* 2822 * Wrappers to refcount containing ceph_osd struct 2823 */ 2824 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 2825 { 2826 struct ceph_osd *osd = con->private; 2827 if (get_osd(osd)) 2828 return con; 2829 return NULL; 2830 } 2831 2832 static void put_osd_con(struct ceph_connection *con) 2833 { 2834 struct ceph_osd *osd = con->private; 2835 put_osd(osd); 2836 } 2837 2838 /* 2839 * authentication 2840 */ 2841 /* 2842 * Note: returned pointer is the address of a structure that's 2843 * managed separately. Caller must *not* attempt to free it. 2844 */ 2845 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 2846 int *proto, int force_new) 2847 { 2848 struct ceph_osd *o = con->private; 2849 struct ceph_osd_client *osdc = o->o_osdc; 2850 struct ceph_auth_client *ac = osdc->client->monc.auth; 2851 struct ceph_auth_handshake *auth = &o->o_auth; 2852 2853 if (force_new && auth->authorizer) { 2854 ceph_auth_destroy_authorizer(ac, auth->authorizer); 2855 auth->authorizer = NULL; 2856 } 2857 if (!auth->authorizer) { 2858 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2859 auth); 2860 if (ret) 2861 return ERR_PTR(ret); 2862 } else { 2863 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 2864 auth); 2865 if (ret) 2866 return ERR_PTR(ret); 2867 } 2868 *proto = ac->protocol; 2869 2870 return auth; 2871 } 2872 2873 2874 static int verify_authorizer_reply(struct ceph_connection *con, int len) 2875 { 2876 struct ceph_osd *o = con->private; 2877 struct ceph_osd_client *osdc = o->o_osdc; 2878 struct ceph_auth_client *ac = osdc->client->monc.auth; 2879 2880 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer, len); 2881 } 2882 2883 static int invalidate_authorizer(struct ceph_connection *con) 2884 { 2885 struct ceph_osd *o = con->private; 2886 struct ceph_osd_client *osdc = o->o_osdc; 2887 struct ceph_auth_client *ac = osdc->client->monc.auth; 2888 2889 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 2890 return ceph_monc_validate_auth(&osdc->client->monc); 2891 } 2892 2893 static const struct ceph_connection_operations osd_con_ops = { 2894 .get = get_osd_con, 2895 .put = put_osd_con, 2896 .dispatch = dispatch, 2897 .get_authorizer = get_authorizer, 2898 .verify_authorizer_reply = verify_authorizer_reply, 2899 .invalidate_authorizer = invalidate_authorizer, 2900 .alloc_msg = alloc_msg, 2901 .fault = osd_reset, 2902 }; 2903