1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/err.h> 7 #include <linux/highmem.h> 8 #include <linux/mm.h> 9 #include <linux/pagemap.h> 10 #include <linux/slab.h> 11 #include <linux/uaccess.h> 12 #ifdef CONFIG_BLOCK 13 #include <linux/bio.h> 14 #endif 15 16 #include <linux/ceph/ceph_features.h> 17 #include <linux/ceph/libceph.h> 18 #include <linux/ceph/osd_client.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/auth.h> 22 #include <linux/ceph/pagelist.h> 23 #include <linux/ceph/striper.h> 24 25 #define OSD_OPREPLY_FRONT_LEN 512 26 27 static struct kmem_cache *ceph_osd_request_cache; 28 29 static const struct ceph_connection_operations osd_con_ops; 30 31 /* 32 * Implement client access to distributed object storage cluster. 33 * 34 * All data objects are stored within a cluster/cloud of OSDs, or 35 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 36 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 37 * remote daemons serving up and coordinating consistent and safe 38 * access to storage. 39 * 40 * Cluster membership and the mapping of data objects onto storage devices 41 * are described by the osd map. 42 * 43 * We keep track of pending OSD requests (read, write), resubmit 44 * requests to different OSDs when the cluster topology/data layout 45 * change, or retry the affected requests when the communications 46 * channel with an OSD is reset. 47 */ 48 49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); 50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); 51 static void link_linger(struct ceph_osd *osd, 52 struct ceph_osd_linger_request *lreq); 53 static void unlink_linger(struct ceph_osd *osd, 54 struct ceph_osd_linger_request *lreq); 55 static void clear_backoffs(struct ceph_osd *osd); 56 57 #if 1 58 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) 59 { 60 bool wrlocked = true; 61 62 if (unlikely(down_read_trylock(sem))) { 63 wrlocked = false; 64 up_read(sem); 65 } 66 67 return wrlocked; 68 } 69 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) 70 { 71 WARN_ON(!rwsem_is_locked(&osdc->lock)); 72 } 73 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) 74 { 75 WARN_ON(!rwsem_is_wrlocked(&osdc->lock)); 76 } 77 static inline void verify_osd_locked(struct ceph_osd *osd) 78 { 79 struct ceph_osd_client *osdc = osd->o_osdc; 80 81 WARN_ON(!(mutex_is_locked(&osd->lock) && 82 rwsem_is_locked(&osdc->lock)) && 83 !rwsem_is_wrlocked(&osdc->lock)); 84 } 85 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) 86 { 87 WARN_ON(!mutex_is_locked(&lreq->lock)); 88 } 89 #else 90 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } 91 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } 92 static inline void verify_osd_locked(struct ceph_osd *osd) { } 93 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } 94 #endif 95 96 /* 97 * calculate the mapping of a file extent onto an object, and fill out the 98 * request accordingly. shorten extent as necessary if it crosses an 99 * object boundary. 100 * 101 * fill osd op in request message. 102 */ 103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 104 u64 *objnum, u64 *objoff, u64 *objlen) 105 { 106 u64 orig_len = *plen; 107 u32 xlen; 108 109 /* object extent? */ 110 ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 111 objoff, &xlen); 112 *objlen = xlen; 113 if (*objlen < orig_len) { 114 *plen = *objlen; 115 dout(" skipping last %llu, final file extent %llu~%llu\n", 116 orig_len - *plen, off, *plen); 117 } 118 119 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 120 return 0; 121 } 122 123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 124 { 125 memset(osd_data, 0, sizeof (*osd_data)); 126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 127 } 128 129 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 130 struct page **pages, u64 length, u32 alignment, 131 bool pages_from_pool, bool own_pages) 132 { 133 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 134 osd_data->pages = pages; 135 osd_data->length = length; 136 osd_data->alignment = alignment; 137 osd_data->pages_from_pool = pages_from_pool; 138 osd_data->own_pages = own_pages; 139 } 140 141 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 142 struct ceph_pagelist *pagelist) 143 { 144 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 145 osd_data->pagelist = pagelist; 146 } 147 148 #ifdef CONFIG_BLOCK 149 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 150 struct ceph_bio_iter *bio_pos, 151 u32 bio_length) 152 { 153 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 154 osd_data->bio_pos = *bio_pos; 155 osd_data->bio_length = bio_length; 156 } 157 #endif /* CONFIG_BLOCK */ 158 159 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, 160 struct ceph_bvec_iter *bvec_pos, 161 u32 num_bvecs) 162 { 163 osd_data->type = CEPH_OSD_DATA_TYPE_BVECS; 164 osd_data->bvec_pos = *bvec_pos; 165 osd_data->num_bvecs = num_bvecs; 166 } 167 168 #define osd_req_op_data(oreq, whch, typ, fld) \ 169 ({ \ 170 struct ceph_osd_request *__oreq = (oreq); \ 171 unsigned int __whch = (whch); \ 172 BUG_ON(__whch >= __oreq->r_num_ops); \ 173 &__oreq->r_ops[__whch].typ.fld; \ 174 }) 175 176 static struct ceph_osd_data * 177 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 178 { 179 BUG_ON(which >= osd_req->r_num_ops); 180 181 return &osd_req->r_ops[which].raw_data_in; 182 } 183 184 struct ceph_osd_data * 185 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 186 unsigned int which) 187 { 188 return osd_req_op_data(osd_req, which, extent, osd_data); 189 } 190 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 191 192 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 193 unsigned int which, struct page **pages, 194 u64 length, u32 alignment, 195 bool pages_from_pool, bool own_pages) 196 { 197 struct ceph_osd_data *osd_data; 198 199 osd_data = osd_req_op_raw_data_in(osd_req, which); 200 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 201 pages_from_pool, own_pages); 202 } 203 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 204 205 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 206 unsigned int which, struct page **pages, 207 u64 length, u32 alignment, 208 bool pages_from_pool, bool own_pages) 209 { 210 struct ceph_osd_data *osd_data; 211 212 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 213 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 214 pages_from_pool, own_pages); 215 } 216 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 217 218 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 219 unsigned int which, struct ceph_pagelist *pagelist) 220 { 221 struct ceph_osd_data *osd_data; 222 223 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 224 ceph_osd_data_pagelist_init(osd_data, pagelist); 225 } 226 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 227 228 #ifdef CONFIG_BLOCK 229 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 230 unsigned int which, 231 struct ceph_bio_iter *bio_pos, 232 u32 bio_length) 233 { 234 struct ceph_osd_data *osd_data; 235 236 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 237 ceph_osd_data_bio_init(osd_data, bio_pos, bio_length); 238 } 239 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 240 #endif /* CONFIG_BLOCK */ 241 242 void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req, 243 unsigned int which, 244 struct bio_vec *bvecs, u32 num_bvecs, 245 u32 bytes) 246 { 247 struct ceph_osd_data *osd_data; 248 struct ceph_bvec_iter it = { 249 .bvecs = bvecs, 250 .iter = { .bi_size = bytes }, 251 }; 252 253 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 254 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs); 255 } 256 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs); 257 258 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, 259 unsigned int which, 260 struct ceph_bvec_iter *bvec_pos) 261 { 262 struct ceph_osd_data *osd_data; 263 264 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 265 ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0); 266 } 267 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); 268 269 static void osd_req_op_cls_request_info_pagelist( 270 struct ceph_osd_request *osd_req, 271 unsigned int which, struct ceph_pagelist *pagelist) 272 { 273 struct ceph_osd_data *osd_data; 274 275 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 276 ceph_osd_data_pagelist_init(osd_data, pagelist); 277 } 278 279 void osd_req_op_cls_request_data_pagelist( 280 struct ceph_osd_request *osd_req, 281 unsigned int which, struct ceph_pagelist *pagelist) 282 { 283 struct ceph_osd_data *osd_data; 284 285 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 286 ceph_osd_data_pagelist_init(osd_data, pagelist); 287 osd_req->r_ops[which].cls.indata_len += pagelist->length; 288 osd_req->r_ops[which].indata_len += pagelist->length; 289 } 290 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 291 292 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 293 unsigned int which, struct page **pages, u64 length, 294 u32 alignment, bool pages_from_pool, bool own_pages) 295 { 296 struct ceph_osd_data *osd_data; 297 298 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 299 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 300 pages_from_pool, own_pages); 301 osd_req->r_ops[which].cls.indata_len += length; 302 osd_req->r_ops[which].indata_len += length; 303 } 304 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 305 306 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, 307 unsigned int which, 308 struct bio_vec *bvecs, u32 num_bvecs, 309 u32 bytes) 310 { 311 struct ceph_osd_data *osd_data; 312 struct ceph_bvec_iter it = { 313 .bvecs = bvecs, 314 .iter = { .bi_size = bytes }, 315 }; 316 317 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 318 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs); 319 osd_req->r_ops[which].cls.indata_len += bytes; 320 osd_req->r_ops[which].indata_len += bytes; 321 } 322 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs); 323 324 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 325 unsigned int which, struct page **pages, u64 length, 326 u32 alignment, bool pages_from_pool, bool own_pages) 327 { 328 struct ceph_osd_data *osd_data; 329 330 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 331 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 332 pages_from_pool, own_pages); 333 } 334 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 335 336 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 337 { 338 switch (osd_data->type) { 339 case CEPH_OSD_DATA_TYPE_NONE: 340 return 0; 341 case CEPH_OSD_DATA_TYPE_PAGES: 342 return osd_data->length; 343 case CEPH_OSD_DATA_TYPE_PAGELIST: 344 return (u64)osd_data->pagelist->length; 345 #ifdef CONFIG_BLOCK 346 case CEPH_OSD_DATA_TYPE_BIO: 347 return (u64)osd_data->bio_length; 348 #endif /* CONFIG_BLOCK */ 349 case CEPH_OSD_DATA_TYPE_BVECS: 350 return osd_data->bvec_pos.iter.bi_size; 351 default: 352 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 353 return 0; 354 } 355 } 356 357 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 358 { 359 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 360 int num_pages; 361 362 num_pages = calc_pages_for((u64)osd_data->alignment, 363 (u64)osd_data->length); 364 ceph_release_page_vector(osd_data->pages, num_pages); 365 } 366 ceph_osd_data_init(osd_data); 367 } 368 369 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 370 unsigned int which) 371 { 372 struct ceph_osd_req_op *op; 373 374 BUG_ON(which >= osd_req->r_num_ops); 375 op = &osd_req->r_ops[which]; 376 377 switch (op->op) { 378 case CEPH_OSD_OP_READ: 379 case CEPH_OSD_OP_WRITE: 380 case CEPH_OSD_OP_WRITEFULL: 381 ceph_osd_data_release(&op->extent.osd_data); 382 break; 383 case CEPH_OSD_OP_CALL: 384 ceph_osd_data_release(&op->cls.request_info); 385 ceph_osd_data_release(&op->cls.request_data); 386 ceph_osd_data_release(&op->cls.response_data); 387 break; 388 case CEPH_OSD_OP_SETXATTR: 389 case CEPH_OSD_OP_CMPXATTR: 390 ceph_osd_data_release(&op->xattr.osd_data); 391 break; 392 case CEPH_OSD_OP_STAT: 393 ceph_osd_data_release(&op->raw_data_in); 394 break; 395 case CEPH_OSD_OP_NOTIFY_ACK: 396 ceph_osd_data_release(&op->notify_ack.request_data); 397 break; 398 case CEPH_OSD_OP_NOTIFY: 399 ceph_osd_data_release(&op->notify.request_data); 400 ceph_osd_data_release(&op->notify.response_data); 401 break; 402 case CEPH_OSD_OP_LIST_WATCHERS: 403 ceph_osd_data_release(&op->list_watchers.response_data); 404 break; 405 default: 406 break; 407 } 408 } 409 410 /* 411 * Assumes @t is zero-initialized. 412 */ 413 static void target_init(struct ceph_osd_request_target *t) 414 { 415 ceph_oid_init(&t->base_oid); 416 ceph_oloc_init(&t->base_oloc); 417 ceph_oid_init(&t->target_oid); 418 ceph_oloc_init(&t->target_oloc); 419 420 ceph_osds_init(&t->acting); 421 ceph_osds_init(&t->up); 422 t->size = -1; 423 t->min_size = -1; 424 425 t->osd = CEPH_HOMELESS_OSD; 426 } 427 428 static void target_copy(struct ceph_osd_request_target *dest, 429 const struct ceph_osd_request_target *src) 430 { 431 ceph_oid_copy(&dest->base_oid, &src->base_oid); 432 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); 433 ceph_oid_copy(&dest->target_oid, &src->target_oid); 434 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); 435 436 dest->pgid = src->pgid; /* struct */ 437 dest->spgid = src->spgid; /* struct */ 438 dest->pg_num = src->pg_num; 439 dest->pg_num_mask = src->pg_num_mask; 440 ceph_osds_copy(&dest->acting, &src->acting); 441 ceph_osds_copy(&dest->up, &src->up); 442 dest->size = src->size; 443 dest->min_size = src->min_size; 444 dest->sort_bitwise = src->sort_bitwise; 445 446 dest->flags = src->flags; 447 dest->paused = src->paused; 448 449 dest->epoch = src->epoch; 450 dest->last_force_resend = src->last_force_resend; 451 452 dest->osd = src->osd; 453 } 454 455 static void target_destroy(struct ceph_osd_request_target *t) 456 { 457 ceph_oid_destroy(&t->base_oid); 458 ceph_oloc_destroy(&t->base_oloc); 459 ceph_oid_destroy(&t->target_oid); 460 ceph_oloc_destroy(&t->target_oloc); 461 } 462 463 /* 464 * requests 465 */ 466 static void request_release_checks(struct ceph_osd_request *req) 467 { 468 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 469 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); 470 WARN_ON(!list_empty(&req->r_unsafe_item)); 471 WARN_ON(req->r_osd); 472 } 473 474 static void ceph_osdc_release_request(struct kref *kref) 475 { 476 struct ceph_osd_request *req = container_of(kref, 477 struct ceph_osd_request, r_kref); 478 unsigned int which; 479 480 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 481 req->r_request, req->r_reply); 482 request_release_checks(req); 483 484 if (req->r_request) 485 ceph_msg_put(req->r_request); 486 if (req->r_reply) 487 ceph_msg_put(req->r_reply); 488 489 for (which = 0; which < req->r_num_ops; which++) 490 osd_req_op_data_release(req, which); 491 492 target_destroy(&req->r_t); 493 ceph_put_snap_context(req->r_snapc); 494 495 if (req->r_mempool) 496 mempool_free(req, req->r_osdc->req_mempool); 497 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 498 kmem_cache_free(ceph_osd_request_cache, req); 499 else 500 kfree(req); 501 } 502 503 void ceph_osdc_get_request(struct ceph_osd_request *req) 504 { 505 dout("%s %p (was %d)\n", __func__, req, 506 kref_read(&req->r_kref)); 507 kref_get(&req->r_kref); 508 } 509 EXPORT_SYMBOL(ceph_osdc_get_request); 510 511 void ceph_osdc_put_request(struct ceph_osd_request *req) 512 { 513 if (req) { 514 dout("%s %p (was %d)\n", __func__, req, 515 kref_read(&req->r_kref)); 516 kref_put(&req->r_kref, ceph_osdc_release_request); 517 } 518 } 519 EXPORT_SYMBOL(ceph_osdc_put_request); 520 521 static void request_init(struct ceph_osd_request *req) 522 { 523 /* req only, each op is zeroed in _osd_req_op_init() */ 524 memset(req, 0, sizeof(*req)); 525 526 kref_init(&req->r_kref); 527 init_completion(&req->r_completion); 528 RB_CLEAR_NODE(&req->r_node); 529 RB_CLEAR_NODE(&req->r_mc_node); 530 INIT_LIST_HEAD(&req->r_unsafe_item); 531 532 target_init(&req->r_t); 533 } 534 535 /* 536 * This is ugly, but it allows us to reuse linger registration and ping 537 * requests, keeping the structure of the code around send_linger{_ping}() 538 * reasonable. Setting up a min_nr=2 mempool for each linger request 539 * and dealing with copying ops (this blasts req only, watch op remains 540 * intact) isn't any better. 541 */ 542 static void request_reinit(struct ceph_osd_request *req) 543 { 544 struct ceph_osd_client *osdc = req->r_osdc; 545 bool mempool = req->r_mempool; 546 unsigned int num_ops = req->r_num_ops; 547 u64 snapid = req->r_snapid; 548 struct ceph_snap_context *snapc = req->r_snapc; 549 bool linger = req->r_linger; 550 struct ceph_msg *request_msg = req->r_request; 551 struct ceph_msg *reply_msg = req->r_reply; 552 553 dout("%s req %p\n", __func__, req); 554 WARN_ON(kref_read(&req->r_kref) != 1); 555 request_release_checks(req); 556 557 WARN_ON(kref_read(&request_msg->kref) != 1); 558 WARN_ON(kref_read(&reply_msg->kref) != 1); 559 target_destroy(&req->r_t); 560 561 request_init(req); 562 req->r_osdc = osdc; 563 req->r_mempool = mempool; 564 req->r_num_ops = num_ops; 565 req->r_snapid = snapid; 566 req->r_snapc = snapc; 567 req->r_linger = linger; 568 req->r_request = request_msg; 569 req->r_reply = reply_msg; 570 } 571 572 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 573 struct ceph_snap_context *snapc, 574 unsigned int num_ops, 575 bool use_mempool, 576 gfp_t gfp_flags) 577 { 578 struct ceph_osd_request *req; 579 580 if (use_mempool) { 581 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 582 req = mempool_alloc(osdc->req_mempool, gfp_flags); 583 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 584 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 585 } else { 586 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 587 req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags); 588 } 589 if (unlikely(!req)) 590 return NULL; 591 592 request_init(req); 593 req->r_osdc = osdc; 594 req->r_mempool = use_mempool; 595 req->r_num_ops = num_ops; 596 req->r_snapid = CEPH_NOSNAP; 597 req->r_snapc = ceph_get_snap_context(snapc); 598 599 dout("%s req %p\n", __func__, req); 600 return req; 601 } 602 EXPORT_SYMBOL(ceph_osdc_alloc_request); 603 604 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) 605 { 606 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 607 } 608 609 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 610 { 611 struct ceph_osd_client *osdc = req->r_osdc; 612 struct ceph_msg *msg; 613 int msg_size; 614 615 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 616 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 617 618 /* create request message */ 619 msg_size = CEPH_ENCODING_START_BLK_LEN + 620 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 621 msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */ 622 msg_size += CEPH_ENCODING_START_BLK_LEN + 623 sizeof(struct ceph_osd_reqid); /* reqid */ 624 msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */ 625 msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */ 626 msg_size += CEPH_ENCODING_START_BLK_LEN + 627 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ 628 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 629 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 630 msg_size += 8; /* snapid */ 631 msg_size += 8; /* snap_seq */ 632 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 633 msg_size += 4 + 8; /* retry_attempt, features */ 634 635 if (req->r_mempool) 636 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 637 else 638 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); 639 if (!msg) 640 return -ENOMEM; 641 642 memset(msg->front.iov_base, 0, msg->front.iov_len); 643 req->r_request = msg; 644 645 /* create reply message */ 646 msg_size = OSD_OPREPLY_FRONT_LEN; 647 msg_size += req->r_base_oid.name_len; 648 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 649 650 if (req->r_mempool) 651 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 652 else 653 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); 654 if (!msg) 655 return -ENOMEM; 656 657 req->r_reply = msg; 658 659 return 0; 660 } 661 EXPORT_SYMBOL(ceph_osdc_alloc_messages); 662 663 static bool osd_req_opcode_valid(u16 opcode) 664 { 665 switch (opcode) { 666 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 667 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 668 #undef GENERATE_CASE 669 default: 670 return false; 671 } 672 } 673 674 /* 675 * This is an osd op init function for opcodes that have no data or 676 * other information associated with them. It also serves as a 677 * common init routine for all the other init functions, below. 678 */ 679 static struct ceph_osd_req_op * 680 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 681 u16 opcode, u32 flags) 682 { 683 struct ceph_osd_req_op *op; 684 685 BUG_ON(which >= osd_req->r_num_ops); 686 BUG_ON(!osd_req_opcode_valid(opcode)); 687 688 op = &osd_req->r_ops[which]; 689 memset(op, 0, sizeof (*op)); 690 op->op = opcode; 691 op->flags = flags; 692 693 return op; 694 } 695 696 void osd_req_op_init(struct ceph_osd_request *osd_req, 697 unsigned int which, u16 opcode, u32 flags) 698 { 699 (void)_osd_req_op_init(osd_req, which, opcode, flags); 700 } 701 EXPORT_SYMBOL(osd_req_op_init); 702 703 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 704 unsigned int which, u16 opcode, 705 u64 offset, u64 length, 706 u64 truncate_size, u32 truncate_seq) 707 { 708 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 709 opcode, 0); 710 size_t payload_len = 0; 711 712 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 713 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 714 opcode != CEPH_OSD_OP_TRUNCATE); 715 716 op->extent.offset = offset; 717 op->extent.length = length; 718 op->extent.truncate_size = truncate_size; 719 op->extent.truncate_seq = truncate_seq; 720 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 721 payload_len += length; 722 723 op->indata_len = payload_len; 724 } 725 EXPORT_SYMBOL(osd_req_op_extent_init); 726 727 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 728 unsigned int which, u64 length) 729 { 730 struct ceph_osd_req_op *op; 731 u64 previous; 732 733 BUG_ON(which >= osd_req->r_num_ops); 734 op = &osd_req->r_ops[which]; 735 previous = op->extent.length; 736 737 if (length == previous) 738 return; /* Nothing to do */ 739 BUG_ON(length > previous); 740 741 op->extent.length = length; 742 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 743 op->indata_len -= previous - length; 744 } 745 EXPORT_SYMBOL(osd_req_op_extent_update); 746 747 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 748 unsigned int which, u64 offset_inc) 749 { 750 struct ceph_osd_req_op *op, *prev_op; 751 752 BUG_ON(which + 1 >= osd_req->r_num_ops); 753 754 prev_op = &osd_req->r_ops[which]; 755 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 756 /* dup previous one */ 757 op->indata_len = prev_op->indata_len; 758 op->outdata_len = prev_op->outdata_len; 759 op->extent = prev_op->extent; 760 /* adjust offset */ 761 op->extent.offset += offset_inc; 762 op->extent.length -= offset_inc; 763 764 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 765 op->indata_len -= offset_inc; 766 } 767 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 768 769 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 770 u16 opcode, const char *class, const char *method) 771 { 772 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 773 opcode, 0); 774 struct ceph_pagelist *pagelist; 775 size_t payload_len = 0; 776 size_t size; 777 778 BUG_ON(opcode != CEPH_OSD_OP_CALL); 779 780 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 781 BUG_ON(!pagelist); 782 ceph_pagelist_init(pagelist); 783 784 op->cls.class_name = class; 785 size = strlen(class); 786 BUG_ON(size > (size_t) U8_MAX); 787 op->cls.class_len = size; 788 ceph_pagelist_append(pagelist, class, size); 789 payload_len += size; 790 791 op->cls.method_name = method; 792 size = strlen(method); 793 BUG_ON(size > (size_t) U8_MAX); 794 op->cls.method_len = size; 795 ceph_pagelist_append(pagelist, method, size); 796 payload_len += size; 797 798 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 799 800 op->indata_len = payload_len; 801 } 802 EXPORT_SYMBOL(osd_req_op_cls_init); 803 804 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 805 u16 opcode, const char *name, const void *value, 806 size_t size, u8 cmp_op, u8 cmp_mode) 807 { 808 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 809 opcode, 0); 810 struct ceph_pagelist *pagelist; 811 size_t payload_len; 812 813 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 814 815 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 816 if (!pagelist) 817 return -ENOMEM; 818 819 ceph_pagelist_init(pagelist); 820 821 payload_len = strlen(name); 822 op->xattr.name_len = payload_len; 823 ceph_pagelist_append(pagelist, name, payload_len); 824 825 op->xattr.value_len = size; 826 ceph_pagelist_append(pagelist, value, size); 827 payload_len += size; 828 829 op->xattr.cmp_op = cmp_op; 830 op->xattr.cmp_mode = cmp_mode; 831 832 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 833 op->indata_len = payload_len; 834 return 0; 835 } 836 EXPORT_SYMBOL(osd_req_op_xattr_init); 837 838 /* 839 * @watch_opcode: CEPH_OSD_WATCH_OP_* 840 */ 841 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, 842 u64 cookie, u8 watch_opcode) 843 { 844 struct ceph_osd_req_op *op; 845 846 op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); 847 op->watch.cookie = cookie; 848 op->watch.op = watch_opcode; 849 op->watch.gen = 0; 850 } 851 852 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 853 unsigned int which, 854 u64 expected_object_size, 855 u64 expected_write_size) 856 { 857 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 858 CEPH_OSD_OP_SETALLOCHINT, 859 0); 860 861 op->alloc_hint.expected_object_size = expected_object_size; 862 op->alloc_hint.expected_write_size = expected_write_size; 863 864 /* 865 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 866 * not worth a feature bit. Set FAILOK per-op flag to make 867 * sure older osds don't trip over an unsupported opcode. 868 */ 869 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 870 } 871 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 872 873 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 874 struct ceph_osd_data *osd_data) 875 { 876 u64 length = ceph_osd_data_length(osd_data); 877 878 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 879 BUG_ON(length > (u64) SIZE_MAX); 880 if (length) 881 ceph_msg_data_add_pages(msg, osd_data->pages, 882 length, osd_data->alignment); 883 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 884 BUG_ON(!length); 885 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 886 #ifdef CONFIG_BLOCK 887 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 888 ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length); 889 #endif 890 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { 891 ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); 892 } else { 893 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 894 } 895 } 896 897 static u32 osd_req_encode_op(struct ceph_osd_op *dst, 898 const struct ceph_osd_req_op *src) 899 { 900 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 901 pr_err("unrecognized osd opcode %d\n", src->op); 902 903 return 0; 904 } 905 906 switch (src->op) { 907 case CEPH_OSD_OP_STAT: 908 break; 909 case CEPH_OSD_OP_READ: 910 case CEPH_OSD_OP_WRITE: 911 case CEPH_OSD_OP_WRITEFULL: 912 case CEPH_OSD_OP_ZERO: 913 case CEPH_OSD_OP_TRUNCATE: 914 dst->extent.offset = cpu_to_le64(src->extent.offset); 915 dst->extent.length = cpu_to_le64(src->extent.length); 916 dst->extent.truncate_size = 917 cpu_to_le64(src->extent.truncate_size); 918 dst->extent.truncate_seq = 919 cpu_to_le32(src->extent.truncate_seq); 920 break; 921 case CEPH_OSD_OP_CALL: 922 dst->cls.class_len = src->cls.class_len; 923 dst->cls.method_len = src->cls.method_len; 924 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 925 break; 926 case CEPH_OSD_OP_WATCH: 927 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 928 dst->watch.ver = cpu_to_le64(0); 929 dst->watch.op = src->watch.op; 930 dst->watch.gen = cpu_to_le32(src->watch.gen); 931 break; 932 case CEPH_OSD_OP_NOTIFY_ACK: 933 break; 934 case CEPH_OSD_OP_NOTIFY: 935 dst->notify.cookie = cpu_to_le64(src->notify.cookie); 936 break; 937 case CEPH_OSD_OP_LIST_WATCHERS: 938 break; 939 case CEPH_OSD_OP_SETALLOCHINT: 940 dst->alloc_hint.expected_object_size = 941 cpu_to_le64(src->alloc_hint.expected_object_size); 942 dst->alloc_hint.expected_write_size = 943 cpu_to_le64(src->alloc_hint.expected_write_size); 944 break; 945 case CEPH_OSD_OP_SETXATTR: 946 case CEPH_OSD_OP_CMPXATTR: 947 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 948 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 949 dst->xattr.cmp_op = src->xattr.cmp_op; 950 dst->xattr.cmp_mode = src->xattr.cmp_mode; 951 break; 952 case CEPH_OSD_OP_CREATE: 953 case CEPH_OSD_OP_DELETE: 954 break; 955 default: 956 pr_err("unsupported osd opcode %s\n", 957 ceph_osd_op_name(src->op)); 958 WARN_ON(1); 959 960 return 0; 961 } 962 963 dst->op = cpu_to_le16(src->op); 964 dst->flags = cpu_to_le32(src->flags); 965 dst->payload_len = cpu_to_le32(src->indata_len); 966 967 return src->indata_len; 968 } 969 970 /* 971 * build new request AND message, calculate layout, and adjust file 972 * extent as needed. 973 * 974 * if the file was recently truncated, we include information about its 975 * old and new size so that the object can be updated appropriately. (we 976 * avoid synchronously deleting truncated objects because it's slow.) 977 */ 978 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 979 struct ceph_file_layout *layout, 980 struct ceph_vino vino, 981 u64 off, u64 *plen, 982 unsigned int which, int num_ops, 983 int opcode, int flags, 984 struct ceph_snap_context *snapc, 985 u32 truncate_seq, 986 u64 truncate_size, 987 bool use_mempool) 988 { 989 struct ceph_osd_request *req; 990 u64 objnum = 0; 991 u64 objoff = 0; 992 u64 objlen = 0; 993 int r; 994 995 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 996 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 997 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 998 999 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 1000 GFP_NOFS); 1001 if (!req) { 1002 r = -ENOMEM; 1003 goto fail; 1004 } 1005 1006 /* calculate max write size */ 1007 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 1008 if (r) 1009 goto fail; 1010 1011 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 1012 osd_req_op_init(req, which, opcode, 0); 1013 } else { 1014 u32 object_size = layout->object_size; 1015 u32 object_base = off - objoff; 1016 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 1017 if (truncate_size <= object_base) { 1018 truncate_size = 0; 1019 } else { 1020 truncate_size -= object_base; 1021 if (truncate_size > object_size) 1022 truncate_size = object_size; 1023 } 1024 } 1025 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 1026 truncate_size, truncate_seq); 1027 } 1028 1029 req->r_abort_on_full = true; 1030 req->r_flags = flags; 1031 req->r_base_oloc.pool = layout->pool_id; 1032 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 1033 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 1034 1035 req->r_snapid = vino.snap; 1036 if (flags & CEPH_OSD_FLAG_WRITE) 1037 req->r_data_offset = off; 1038 1039 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 1040 if (r) 1041 goto fail; 1042 1043 return req; 1044 1045 fail: 1046 ceph_osdc_put_request(req); 1047 return ERR_PTR(r); 1048 } 1049 EXPORT_SYMBOL(ceph_osdc_new_request); 1050 1051 /* 1052 * We keep osd requests in an rbtree, sorted by ->r_tid. 1053 */ 1054 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 1055 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) 1056 1057 static bool osd_homeless(struct ceph_osd *osd) 1058 { 1059 return osd->o_osd == CEPH_HOMELESS_OSD; 1060 } 1061 1062 static bool osd_registered(struct ceph_osd *osd) 1063 { 1064 verify_osdc_locked(osd->o_osdc); 1065 1066 return !RB_EMPTY_NODE(&osd->o_node); 1067 } 1068 1069 /* 1070 * Assumes @osd is zero-initialized. 1071 */ 1072 static void osd_init(struct ceph_osd *osd) 1073 { 1074 refcount_set(&osd->o_ref, 1); 1075 RB_CLEAR_NODE(&osd->o_node); 1076 osd->o_requests = RB_ROOT; 1077 osd->o_linger_requests = RB_ROOT; 1078 osd->o_backoff_mappings = RB_ROOT; 1079 osd->o_backoffs_by_id = RB_ROOT; 1080 INIT_LIST_HEAD(&osd->o_osd_lru); 1081 INIT_LIST_HEAD(&osd->o_keepalive_item); 1082 osd->o_incarnation = 1; 1083 mutex_init(&osd->lock); 1084 } 1085 1086 static void osd_cleanup(struct ceph_osd *osd) 1087 { 1088 WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); 1089 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 1090 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 1091 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings)); 1092 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id)); 1093 WARN_ON(!list_empty(&osd->o_osd_lru)); 1094 WARN_ON(!list_empty(&osd->o_keepalive_item)); 1095 1096 if (osd->o_auth.authorizer) { 1097 WARN_ON(osd_homeless(osd)); 1098 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1099 } 1100 } 1101 1102 /* 1103 * Track open sessions with osds. 1104 */ 1105 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1106 { 1107 struct ceph_osd *osd; 1108 1109 WARN_ON(onum == CEPH_HOMELESS_OSD); 1110 1111 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL); 1112 osd_init(osd); 1113 osd->o_osdc = osdc; 1114 osd->o_osd = onum; 1115 1116 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1117 1118 return osd; 1119 } 1120 1121 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1122 { 1123 if (refcount_inc_not_zero(&osd->o_ref)) { 1124 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1, 1125 refcount_read(&osd->o_ref)); 1126 return osd; 1127 } else { 1128 dout("get_osd %p FAIL\n", osd); 1129 return NULL; 1130 } 1131 } 1132 1133 static void put_osd(struct ceph_osd *osd) 1134 { 1135 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref), 1136 refcount_read(&osd->o_ref) - 1); 1137 if (refcount_dec_and_test(&osd->o_ref)) { 1138 osd_cleanup(osd); 1139 kfree(osd); 1140 } 1141 } 1142 1143 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) 1144 1145 static void __move_osd_to_lru(struct ceph_osd *osd) 1146 { 1147 struct ceph_osd_client *osdc = osd->o_osdc; 1148 1149 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1150 BUG_ON(!list_empty(&osd->o_osd_lru)); 1151 1152 spin_lock(&osdc->osd_lru_lock); 1153 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1154 spin_unlock(&osdc->osd_lru_lock); 1155 1156 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1157 } 1158 1159 static void maybe_move_osd_to_lru(struct ceph_osd *osd) 1160 { 1161 if (RB_EMPTY_ROOT(&osd->o_requests) && 1162 RB_EMPTY_ROOT(&osd->o_linger_requests)) 1163 __move_osd_to_lru(osd); 1164 } 1165 1166 static void __remove_osd_from_lru(struct ceph_osd *osd) 1167 { 1168 struct ceph_osd_client *osdc = osd->o_osdc; 1169 1170 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1171 1172 spin_lock(&osdc->osd_lru_lock); 1173 if (!list_empty(&osd->o_osd_lru)) 1174 list_del_init(&osd->o_osd_lru); 1175 spin_unlock(&osdc->osd_lru_lock); 1176 } 1177 1178 /* 1179 * Close the connection and assign any leftover requests to the 1180 * homeless session. 1181 */ 1182 static void close_osd(struct ceph_osd *osd) 1183 { 1184 struct ceph_osd_client *osdc = osd->o_osdc; 1185 struct rb_node *n; 1186 1187 verify_osdc_wrlocked(osdc); 1188 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1189 1190 ceph_con_close(&osd->o_con); 1191 1192 for (n = rb_first(&osd->o_requests); n; ) { 1193 struct ceph_osd_request *req = 1194 rb_entry(n, struct ceph_osd_request, r_node); 1195 1196 n = rb_next(n); /* unlink_request() */ 1197 1198 dout(" reassigning req %p tid %llu\n", req, req->r_tid); 1199 unlink_request(osd, req); 1200 link_request(&osdc->homeless_osd, req); 1201 } 1202 for (n = rb_first(&osd->o_linger_requests); n; ) { 1203 struct ceph_osd_linger_request *lreq = 1204 rb_entry(n, struct ceph_osd_linger_request, node); 1205 1206 n = rb_next(n); /* unlink_linger() */ 1207 1208 dout(" reassigning lreq %p linger_id %llu\n", lreq, 1209 lreq->linger_id); 1210 unlink_linger(osd, lreq); 1211 link_linger(&osdc->homeless_osd, lreq); 1212 } 1213 clear_backoffs(osd); 1214 1215 __remove_osd_from_lru(osd); 1216 erase_osd(&osdc->osds, osd); 1217 put_osd(osd); 1218 } 1219 1220 /* 1221 * reset osd connect 1222 */ 1223 static int reopen_osd(struct ceph_osd *osd) 1224 { 1225 struct ceph_entity_addr *peer_addr; 1226 1227 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1228 1229 if (RB_EMPTY_ROOT(&osd->o_requests) && 1230 RB_EMPTY_ROOT(&osd->o_linger_requests)) { 1231 close_osd(osd); 1232 return -ENODEV; 1233 } 1234 1235 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd]; 1236 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1237 !ceph_con_opened(&osd->o_con)) { 1238 struct rb_node *n; 1239 1240 dout("osd addr hasn't changed and connection never opened, " 1241 "letting msgr retry\n"); 1242 /* touch each r_stamp for handle_timeout()'s benfit */ 1243 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 1244 struct ceph_osd_request *req = 1245 rb_entry(n, struct ceph_osd_request, r_node); 1246 req->r_stamp = jiffies; 1247 } 1248 1249 return -EAGAIN; 1250 } 1251 1252 ceph_con_close(&osd->o_con); 1253 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1254 osd->o_incarnation++; 1255 1256 return 0; 1257 } 1258 1259 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o, 1260 bool wrlocked) 1261 { 1262 struct ceph_osd *osd; 1263 1264 if (wrlocked) 1265 verify_osdc_wrlocked(osdc); 1266 else 1267 verify_osdc_locked(osdc); 1268 1269 if (o != CEPH_HOMELESS_OSD) 1270 osd = lookup_osd(&osdc->osds, o); 1271 else 1272 osd = &osdc->homeless_osd; 1273 if (!osd) { 1274 if (!wrlocked) 1275 return ERR_PTR(-EAGAIN); 1276 1277 osd = create_osd(osdc, o); 1278 insert_osd(&osdc->osds, osd); 1279 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, 1280 &osdc->osdmap->osd_addr[osd->o_osd]); 1281 } 1282 1283 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd); 1284 return osd; 1285 } 1286 1287 /* 1288 * Create request <-> OSD session relation. 1289 * 1290 * @req has to be assigned a tid, @osd may be homeless. 1291 */ 1292 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1293 { 1294 verify_osd_locked(osd); 1295 WARN_ON(!req->r_tid || req->r_osd); 1296 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1297 req, req->r_tid); 1298 1299 if (!osd_homeless(osd)) 1300 __remove_osd_from_lru(osd); 1301 else 1302 atomic_inc(&osd->o_osdc->num_homeless); 1303 1304 get_osd(osd); 1305 insert_request(&osd->o_requests, req); 1306 req->r_osd = osd; 1307 } 1308 1309 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1310 { 1311 verify_osd_locked(osd); 1312 WARN_ON(req->r_osd != osd); 1313 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1314 req, req->r_tid); 1315 1316 req->r_osd = NULL; 1317 erase_request(&osd->o_requests, req); 1318 put_osd(osd); 1319 1320 if (!osd_homeless(osd)) 1321 maybe_move_osd_to_lru(osd); 1322 else 1323 atomic_dec(&osd->o_osdc->num_homeless); 1324 } 1325 1326 static bool __pool_full(struct ceph_pg_pool_info *pi) 1327 { 1328 return pi->flags & CEPH_POOL_FLAG_FULL; 1329 } 1330 1331 static bool have_pool_full(struct ceph_osd_client *osdc) 1332 { 1333 struct rb_node *n; 1334 1335 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 1336 struct ceph_pg_pool_info *pi = 1337 rb_entry(n, struct ceph_pg_pool_info, node); 1338 1339 if (__pool_full(pi)) 1340 return true; 1341 } 1342 1343 return false; 1344 } 1345 1346 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id) 1347 { 1348 struct ceph_pg_pool_info *pi; 1349 1350 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 1351 if (!pi) 1352 return false; 1353 1354 return __pool_full(pi); 1355 } 1356 1357 /* 1358 * Returns whether a request should be blocked from being sent 1359 * based on the current osdmap and osd_client settings. 1360 */ 1361 static bool target_should_be_paused(struct ceph_osd_client *osdc, 1362 const struct ceph_osd_request_target *t, 1363 struct ceph_pg_pool_info *pi) 1364 { 1365 bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 1366 bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 1367 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1368 __pool_full(pi); 1369 1370 WARN_ON(pi->id != t->target_oloc.pool); 1371 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || 1372 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || 1373 (osdc->osdmap->epoch < osdc->epoch_barrier); 1374 } 1375 1376 enum calc_target_result { 1377 CALC_TARGET_NO_ACTION = 0, 1378 CALC_TARGET_NEED_RESEND, 1379 CALC_TARGET_POOL_DNE, 1380 }; 1381 1382 static enum calc_target_result calc_target(struct ceph_osd_client *osdc, 1383 struct ceph_osd_request_target *t, 1384 struct ceph_connection *con, 1385 bool any_change) 1386 { 1387 struct ceph_pg_pool_info *pi; 1388 struct ceph_pg pgid, last_pgid; 1389 struct ceph_osds up, acting; 1390 bool force_resend = false; 1391 bool unpaused = false; 1392 bool legacy_change; 1393 bool split = false; 1394 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE); 1395 bool recovery_deletes = ceph_osdmap_flag(osdc, 1396 CEPH_OSDMAP_RECOVERY_DELETES); 1397 enum calc_target_result ct_res; 1398 int ret; 1399 1400 t->epoch = osdc->osdmap->epoch; 1401 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1402 if (!pi) { 1403 t->osd = CEPH_HOMELESS_OSD; 1404 ct_res = CALC_TARGET_POOL_DNE; 1405 goto out; 1406 } 1407 1408 if (osdc->osdmap->epoch == pi->last_force_request_resend) { 1409 if (t->last_force_resend < pi->last_force_request_resend) { 1410 t->last_force_resend = pi->last_force_request_resend; 1411 force_resend = true; 1412 } else if (t->last_force_resend == 0) { 1413 force_resend = true; 1414 } 1415 } 1416 1417 /* apply tiering */ 1418 ceph_oid_copy(&t->target_oid, &t->base_oid); 1419 ceph_oloc_copy(&t->target_oloc, &t->base_oloc); 1420 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1421 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0) 1422 t->target_oloc.pool = pi->read_tier; 1423 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0) 1424 t->target_oloc.pool = pi->write_tier; 1425 1426 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool); 1427 if (!pi) { 1428 t->osd = CEPH_HOMELESS_OSD; 1429 ct_res = CALC_TARGET_POOL_DNE; 1430 goto out; 1431 } 1432 } 1433 1434 ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, 1435 &pgid); 1436 if (ret) { 1437 WARN_ON(ret != -ENOENT); 1438 t->osd = CEPH_HOMELESS_OSD; 1439 ct_res = CALC_TARGET_POOL_DNE; 1440 goto out; 1441 } 1442 last_pgid.pool = pgid.pool; 1443 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1444 1445 ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting); 1446 if (any_change && 1447 ceph_is_new_interval(&t->acting, 1448 &acting, 1449 &t->up, 1450 &up, 1451 t->size, 1452 pi->size, 1453 t->min_size, 1454 pi->min_size, 1455 t->pg_num, 1456 pi->pg_num, 1457 t->sort_bitwise, 1458 sort_bitwise, 1459 t->recovery_deletes, 1460 recovery_deletes, 1461 &last_pgid)) 1462 force_resend = true; 1463 1464 if (t->paused && !target_should_be_paused(osdc, t, pi)) { 1465 t->paused = false; 1466 unpaused = true; 1467 } 1468 legacy_change = ceph_pg_compare(&t->pgid, &pgid) || 1469 ceph_osds_changed(&t->acting, &acting, any_change); 1470 if (t->pg_num) 1471 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num); 1472 1473 if (legacy_change || force_resend || split) { 1474 t->pgid = pgid; /* struct */ 1475 ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid); 1476 ceph_osds_copy(&t->acting, &acting); 1477 ceph_osds_copy(&t->up, &up); 1478 t->size = pi->size; 1479 t->min_size = pi->min_size; 1480 t->pg_num = pi->pg_num; 1481 t->pg_num_mask = pi->pg_num_mask; 1482 t->sort_bitwise = sort_bitwise; 1483 t->recovery_deletes = recovery_deletes; 1484 1485 t->osd = acting.primary; 1486 } 1487 1488 if (unpaused || legacy_change || force_resend || 1489 (split && con && CEPH_HAVE_FEATURE(con->peer_features, 1490 RESEND_ON_SPLIT))) 1491 ct_res = CALC_TARGET_NEED_RESEND; 1492 else 1493 ct_res = CALC_TARGET_NO_ACTION; 1494 1495 out: 1496 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd); 1497 return ct_res; 1498 } 1499 1500 static struct ceph_spg_mapping *alloc_spg_mapping(void) 1501 { 1502 struct ceph_spg_mapping *spg; 1503 1504 spg = kmalloc(sizeof(*spg), GFP_NOIO); 1505 if (!spg) 1506 return NULL; 1507 1508 RB_CLEAR_NODE(&spg->node); 1509 spg->backoffs = RB_ROOT; 1510 return spg; 1511 } 1512 1513 static void free_spg_mapping(struct ceph_spg_mapping *spg) 1514 { 1515 WARN_ON(!RB_EMPTY_NODE(&spg->node)); 1516 WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs)); 1517 1518 kfree(spg); 1519 } 1520 1521 /* 1522 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to 1523 * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is 1524 * defined only within a specific spgid; it does not pass anything to 1525 * children on split, or to another primary. 1526 */ 1527 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare, 1528 RB_BYPTR, const struct ceph_spg *, node) 1529 1530 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid) 1531 { 1532 return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits; 1533 } 1534 1535 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid, 1536 void **pkey, size_t *pkey_len) 1537 { 1538 if (hoid->key_len) { 1539 *pkey = hoid->key; 1540 *pkey_len = hoid->key_len; 1541 } else { 1542 *pkey = hoid->oid; 1543 *pkey_len = hoid->oid_len; 1544 } 1545 } 1546 1547 static int compare_names(const void *name1, size_t name1_len, 1548 const void *name2, size_t name2_len) 1549 { 1550 int ret; 1551 1552 ret = memcmp(name1, name2, min(name1_len, name2_len)); 1553 if (!ret) { 1554 if (name1_len < name2_len) 1555 ret = -1; 1556 else if (name1_len > name2_len) 1557 ret = 1; 1558 } 1559 return ret; 1560 } 1561 1562 static int hoid_compare(const struct ceph_hobject_id *lhs, 1563 const struct ceph_hobject_id *rhs) 1564 { 1565 void *effective_key1, *effective_key2; 1566 size_t effective_key1_len, effective_key2_len; 1567 int ret; 1568 1569 if (lhs->is_max < rhs->is_max) 1570 return -1; 1571 if (lhs->is_max > rhs->is_max) 1572 return 1; 1573 1574 if (lhs->pool < rhs->pool) 1575 return -1; 1576 if (lhs->pool > rhs->pool) 1577 return 1; 1578 1579 if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs)) 1580 return -1; 1581 if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs)) 1582 return 1; 1583 1584 ret = compare_names(lhs->nspace, lhs->nspace_len, 1585 rhs->nspace, rhs->nspace_len); 1586 if (ret) 1587 return ret; 1588 1589 hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len); 1590 hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len); 1591 ret = compare_names(effective_key1, effective_key1_len, 1592 effective_key2, effective_key2_len); 1593 if (ret) 1594 return ret; 1595 1596 ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len); 1597 if (ret) 1598 return ret; 1599 1600 if (lhs->snapid < rhs->snapid) 1601 return -1; 1602 if (lhs->snapid > rhs->snapid) 1603 return 1; 1604 1605 return 0; 1606 } 1607 1608 /* 1609 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX 1610 * compat stuff here. 1611 * 1612 * Assumes @hoid is zero-initialized. 1613 */ 1614 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid) 1615 { 1616 u8 struct_v; 1617 u32 struct_len; 1618 int ret; 1619 1620 ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v, 1621 &struct_len); 1622 if (ret) 1623 return ret; 1624 1625 if (struct_v < 4) { 1626 pr_err("got struct_v %d < 4 of hobject_t\n", struct_v); 1627 goto e_inval; 1628 } 1629 1630 hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len, 1631 GFP_NOIO); 1632 if (IS_ERR(hoid->key)) { 1633 ret = PTR_ERR(hoid->key); 1634 hoid->key = NULL; 1635 return ret; 1636 } 1637 1638 hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len, 1639 GFP_NOIO); 1640 if (IS_ERR(hoid->oid)) { 1641 ret = PTR_ERR(hoid->oid); 1642 hoid->oid = NULL; 1643 return ret; 1644 } 1645 1646 ceph_decode_64_safe(p, end, hoid->snapid, e_inval); 1647 ceph_decode_32_safe(p, end, hoid->hash, e_inval); 1648 ceph_decode_8_safe(p, end, hoid->is_max, e_inval); 1649 1650 hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len, 1651 GFP_NOIO); 1652 if (IS_ERR(hoid->nspace)) { 1653 ret = PTR_ERR(hoid->nspace); 1654 hoid->nspace = NULL; 1655 return ret; 1656 } 1657 1658 ceph_decode_64_safe(p, end, hoid->pool, e_inval); 1659 1660 ceph_hoid_build_hash_cache(hoid); 1661 return 0; 1662 1663 e_inval: 1664 return -EINVAL; 1665 } 1666 1667 static int hoid_encoding_size(const struct ceph_hobject_id *hoid) 1668 { 1669 return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */ 1670 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len; 1671 } 1672 1673 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid) 1674 { 1675 ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid)); 1676 ceph_encode_string(p, end, hoid->key, hoid->key_len); 1677 ceph_encode_string(p, end, hoid->oid, hoid->oid_len); 1678 ceph_encode_64(p, hoid->snapid); 1679 ceph_encode_32(p, hoid->hash); 1680 ceph_encode_8(p, hoid->is_max); 1681 ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len); 1682 ceph_encode_64(p, hoid->pool); 1683 } 1684 1685 static void free_hoid(struct ceph_hobject_id *hoid) 1686 { 1687 if (hoid) { 1688 kfree(hoid->key); 1689 kfree(hoid->oid); 1690 kfree(hoid->nspace); 1691 kfree(hoid); 1692 } 1693 } 1694 1695 static struct ceph_osd_backoff *alloc_backoff(void) 1696 { 1697 struct ceph_osd_backoff *backoff; 1698 1699 backoff = kzalloc(sizeof(*backoff), GFP_NOIO); 1700 if (!backoff) 1701 return NULL; 1702 1703 RB_CLEAR_NODE(&backoff->spg_node); 1704 RB_CLEAR_NODE(&backoff->id_node); 1705 return backoff; 1706 } 1707 1708 static void free_backoff(struct ceph_osd_backoff *backoff) 1709 { 1710 WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node)); 1711 WARN_ON(!RB_EMPTY_NODE(&backoff->id_node)); 1712 1713 free_hoid(backoff->begin); 1714 free_hoid(backoff->end); 1715 kfree(backoff); 1716 } 1717 1718 /* 1719 * Within a specific spgid, backoffs are managed by ->begin hoid. 1720 */ 1721 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare, 1722 RB_BYVAL, spg_node); 1723 1724 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root, 1725 const struct ceph_hobject_id *hoid) 1726 { 1727 struct rb_node *n = root->rb_node; 1728 1729 while (n) { 1730 struct ceph_osd_backoff *cur = 1731 rb_entry(n, struct ceph_osd_backoff, spg_node); 1732 int cmp; 1733 1734 cmp = hoid_compare(hoid, cur->begin); 1735 if (cmp < 0) { 1736 n = n->rb_left; 1737 } else if (cmp > 0) { 1738 if (hoid_compare(hoid, cur->end) < 0) 1739 return cur; 1740 1741 n = n->rb_right; 1742 } else { 1743 return cur; 1744 } 1745 } 1746 1747 return NULL; 1748 } 1749 1750 /* 1751 * Each backoff has a unique id within its OSD session. 1752 */ 1753 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node) 1754 1755 static void clear_backoffs(struct ceph_osd *osd) 1756 { 1757 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) { 1758 struct ceph_spg_mapping *spg = 1759 rb_entry(rb_first(&osd->o_backoff_mappings), 1760 struct ceph_spg_mapping, node); 1761 1762 while (!RB_EMPTY_ROOT(&spg->backoffs)) { 1763 struct ceph_osd_backoff *backoff = 1764 rb_entry(rb_first(&spg->backoffs), 1765 struct ceph_osd_backoff, spg_node); 1766 1767 erase_backoff(&spg->backoffs, backoff); 1768 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 1769 free_backoff(backoff); 1770 } 1771 erase_spg_mapping(&osd->o_backoff_mappings, spg); 1772 free_spg_mapping(spg); 1773 } 1774 } 1775 1776 /* 1777 * Set up a temporary, non-owning view into @t. 1778 */ 1779 static void hoid_fill_from_target(struct ceph_hobject_id *hoid, 1780 const struct ceph_osd_request_target *t) 1781 { 1782 hoid->key = NULL; 1783 hoid->key_len = 0; 1784 hoid->oid = t->target_oid.name; 1785 hoid->oid_len = t->target_oid.name_len; 1786 hoid->snapid = CEPH_NOSNAP; 1787 hoid->hash = t->pgid.seed; 1788 hoid->is_max = false; 1789 if (t->target_oloc.pool_ns) { 1790 hoid->nspace = t->target_oloc.pool_ns->str; 1791 hoid->nspace_len = t->target_oloc.pool_ns->len; 1792 } else { 1793 hoid->nspace = NULL; 1794 hoid->nspace_len = 0; 1795 } 1796 hoid->pool = t->target_oloc.pool; 1797 ceph_hoid_build_hash_cache(hoid); 1798 } 1799 1800 static bool should_plug_request(struct ceph_osd_request *req) 1801 { 1802 struct ceph_osd *osd = req->r_osd; 1803 struct ceph_spg_mapping *spg; 1804 struct ceph_osd_backoff *backoff; 1805 struct ceph_hobject_id hoid; 1806 1807 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid); 1808 if (!spg) 1809 return false; 1810 1811 hoid_fill_from_target(&hoid, &req->r_t); 1812 backoff = lookup_containing_backoff(&spg->backoffs, &hoid); 1813 if (!backoff) 1814 return false; 1815 1816 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n", 1817 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool, 1818 backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id); 1819 return true; 1820 } 1821 1822 static void setup_request_data(struct ceph_osd_request *req, 1823 struct ceph_msg *msg) 1824 { 1825 u32 data_len = 0; 1826 int i; 1827 1828 if (!list_empty(&msg->data)) 1829 return; 1830 1831 WARN_ON(msg->data_length); 1832 for (i = 0; i < req->r_num_ops; i++) { 1833 struct ceph_osd_req_op *op = &req->r_ops[i]; 1834 1835 switch (op->op) { 1836 /* request */ 1837 case CEPH_OSD_OP_WRITE: 1838 case CEPH_OSD_OP_WRITEFULL: 1839 WARN_ON(op->indata_len != op->extent.length); 1840 ceph_osdc_msg_data_add(msg, &op->extent.osd_data); 1841 break; 1842 case CEPH_OSD_OP_SETXATTR: 1843 case CEPH_OSD_OP_CMPXATTR: 1844 WARN_ON(op->indata_len != op->xattr.name_len + 1845 op->xattr.value_len); 1846 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); 1847 break; 1848 case CEPH_OSD_OP_NOTIFY_ACK: 1849 ceph_osdc_msg_data_add(msg, 1850 &op->notify_ack.request_data); 1851 break; 1852 1853 /* reply */ 1854 case CEPH_OSD_OP_STAT: 1855 ceph_osdc_msg_data_add(req->r_reply, 1856 &op->raw_data_in); 1857 break; 1858 case CEPH_OSD_OP_READ: 1859 ceph_osdc_msg_data_add(req->r_reply, 1860 &op->extent.osd_data); 1861 break; 1862 case CEPH_OSD_OP_LIST_WATCHERS: 1863 ceph_osdc_msg_data_add(req->r_reply, 1864 &op->list_watchers.response_data); 1865 break; 1866 1867 /* both */ 1868 case CEPH_OSD_OP_CALL: 1869 WARN_ON(op->indata_len != op->cls.class_len + 1870 op->cls.method_len + 1871 op->cls.indata_len); 1872 ceph_osdc_msg_data_add(msg, &op->cls.request_info); 1873 /* optional, can be NONE */ 1874 ceph_osdc_msg_data_add(msg, &op->cls.request_data); 1875 /* optional, can be NONE */ 1876 ceph_osdc_msg_data_add(req->r_reply, 1877 &op->cls.response_data); 1878 break; 1879 case CEPH_OSD_OP_NOTIFY: 1880 ceph_osdc_msg_data_add(msg, 1881 &op->notify.request_data); 1882 ceph_osdc_msg_data_add(req->r_reply, 1883 &op->notify.response_data); 1884 break; 1885 } 1886 1887 data_len += op->indata_len; 1888 } 1889 1890 WARN_ON(data_len != msg->data_length); 1891 } 1892 1893 static void encode_pgid(void **p, const struct ceph_pg *pgid) 1894 { 1895 ceph_encode_8(p, 1); 1896 ceph_encode_64(p, pgid->pool); 1897 ceph_encode_32(p, pgid->seed); 1898 ceph_encode_32(p, -1); /* preferred */ 1899 } 1900 1901 static void encode_spgid(void **p, const struct ceph_spg *spgid) 1902 { 1903 ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1); 1904 encode_pgid(p, &spgid->pgid); 1905 ceph_encode_8(p, spgid->shard); 1906 } 1907 1908 static void encode_oloc(void **p, void *end, 1909 const struct ceph_object_locator *oloc) 1910 { 1911 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc)); 1912 ceph_encode_64(p, oloc->pool); 1913 ceph_encode_32(p, -1); /* preferred */ 1914 ceph_encode_32(p, 0); /* key len */ 1915 if (oloc->pool_ns) 1916 ceph_encode_string(p, end, oloc->pool_ns->str, 1917 oloc->pool_ns->len); 1918 else 1919 ceph_encode_32(p, 0); 1920 } 1921 1922 static void encode_request_partial(struct ceph_osd_request *req, 1923 struct ceph_msg *msg) 1924 { 1925 void *p = msg->front.iov_base; 1926 void *const end = p + msg->front_alloc_len; 1927 u32 data_len = 0; 1928 int i; 1929 1930 if (req->r_flags & CEPH_OSD_FLAG_WRITE) { 1931 /* snapshots aren't writeable */ 1932 WARN_ON(req->r_snapid != CEPH_NOSNAP); 1933 } else { 1934 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec || 1935 req->r_data_offset || req->r_snapc); 1936 } 1937 1938 setup_request_data(req, msg); 1939 1940 encode_spgid(&p, &req->r_t.spgid); /* actual spg */ 1941 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ 1942 ceph_encode_32(&p, req->r_osdc->osdmap->epoch); 1943 ceph_encode_32(&p, req->r_flags); 1944 1945 /* reqid */ 1946 ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid)); 1947 memset(p, 0, sizeof(struct ceph_osd_reqid)); 1948 p += sizeof(struct ceph_osd_reqid); 1949 1950 /* trace */ 1951 memset(p, 0, sizeof(struct ceph_blkin_trace_info)); 1952 p += sizeof(struct ceph_blkin_trace_info); 1953 1954 ceph_encode_32(&p, 0); /* client_inc, always 0 */ 1955 ceph_encode_timespec(p, &req->r_mtime); 1956 p += sizeof(struct ceph_timespec); 1957 1958 encode_oloc(&p, end, &req->r_t.target_oloc); 1959 ceph_encode_string(&p, end, req->r_t.target_oid.name, 1960 req->r_t.target_oid.name_len); 1961 1962 /* ops, can imply data */ 1963 ceph_encode_16(&p, req->r_num_ops); 1964 for (i = 0; i < req->r_num_ops; i++) { 1965 data_len += osd_req_encode_op(p, &req->r_ops[i]); 1966 p += sizeof(struct ceph_osd_op); 1967 } 1968 1969 ceph_encode_64(&p, req->r_snapid); /* snapid */ 1970 if (req->r_snapc) { 1971 ceph_encode_64(&p, req->r_snapc->seq); 1972 ceph_encode_32(&p, req->r_snapc->num_snaps); 1973 for (i = 0; i < req->r_snapc->num_snaps; i++) 1974 ceph_encode_64(&p, req->r_snapc->snaps[i]); 1975 } else { 1976 ceph_encode_64(&p, 0); /* snap_seq */ 1977 ceph_encode_32(&p, 0); /* snaps len */ 1978 } 1979 1980 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ 1981 BUG_ON(p > end - 8); /* space for features */ 1982 1983 msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */ 1984 /* front_len is finalized in encode_request_finish() */ 1985 msg->front.iov_len = p - msg->front.iov_base; 1986 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1987 msg->hdr.data_len = cpu_to_le32(data_len); 1988 /* 1989 * The header "data_off" is a hint to the receiver allowing it 1990 * to align received data into its buffers such that there's no 1991 * need to re-copy it before writing it to disk (direct I/O). 1992 */ 1993 msg->hdr.data_off = cpu_to_le16(req->r_data_offset); 1994 1995 dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg, 1996 req->r_t.target_oid.name, req->r_t.target_oid.name_len); 1997 } 1998 1999 static void encode_request_finish(struct ceph_msg *msg) 2000 { 2001 void *p = msg->front.iov_base; 2002 void *const partial_end = p + msg->front.iov_len; 2003 void *const end = p + msg->front_alloc_len; 2004 2005 if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) { 2006 /* luminous OSD -- encode features and be done */ 2007 p = partial_end; 2008 ceph_encode_64(&p, msg->con->peer_features); 2009 } else { 2010 struct { 2011 char spgid[CEPH_ENCODING_START_BLK_LEN + 2012 CEPH_PGID_ENCODING_LEN + 1]; 2013 __le32 hash; 2014 __le32 epoch; 2015 __le32 flags; 2016 char reqid[CEPH_ENCODING_START_BLK_LEN + 2017 sizeof(struct ceph_osd_reqid)]; 2018 char trace[sizeof(struct ceph_blkin_trace_info)]; 2019 __le32 client_inc; 2020 struct ceph_timespec mtime; 2021 } __packed head; 2022 struct ceph_pg pgid; 2023 void *oloc, *oid, *tail; 2024 int oloc_len, oid_len, tail_len; 2025 int len; 2026 2027 /* 2028 * Pre-luminous OSD -- reencode v8 into v4 using @head 2029 * as a temporary buffer. Encode the raw PG; the rest 2030 * is just a matter of moving oloc, oid and tail blobs 2031 * around. 2032 */ 2033 memcpy(&head, p, sizeof(head)); 2034 p += sizeof(head); 2035 2036 oloc = p; 2037 p += CEPH_ENCODING_START_BLK_LEN; 2038 pgid.pool = ceph_decode_64(&p); 2039 p += 4 + 4; /* preferred, key len */ 2040 len = ceph_decode_32(&p); 2041 p += len; /* nspace */ 2042 oloc_len = p - oloc; 2043 2044 oid = p; 2045 len = ceph_decode_32(&p); 2046 p += len; 2047 oid_len = p - oid; 2048 2049 tail = p; 2050 tail_len = partial_end - p; 2051 2052 p = msg->front.iov_base; 2053 ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc)); 2054 ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch)); 2055 ceph_encode_copy(&p, &head.flags, sizeof(head.flags)); 2056 ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime)); 2057 2058 /* reassert_version */ 2059 memset(p, 0, sizeof(struct ceph_eversion)); 2060 p += sizeof(struct ceph_eversion); 2061 2062 BUG_ON(p >= oloc); 2063 memmove(p, oloc, oloc_len); 2064 p += oloc_len; 2065 2066 pgid.seed = le32_to_cpu(head.hash); 2067 encode_pgid(&p, &pgid); /* raw pg */ 2068 2069 BUG_ON(p >= oid); 2070 memmove(p, oid, oid_len); 2071 p += oid_len; 2072 2073 /* tail -- ops, snapid, snapc, retry_attempt */ 2074 BUG_ON(p >= tail); 2075 memmove(p, tail, tail_len); 2076 p += tail_len; 2077 2078 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */ 2079 } 2080 2081 BUG_ON(p > end); 2082 msg->front.iov_len = p - msg->front.iov_base; 2083 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2084 2085 dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg, 2086 le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len), 2087 le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len), 2088 le16_to_cpu(msg->hdr.version)); 2089 } 2090 2091 /* 2092 * @req has to be assigned a tid and registered. 2093 */ 2094 static void send_request(struct ceph_osd_request *req) 2095 { 2096 struct ceph_osd *osd = req->r_osd; 2097 2098 verify_osd_locked(osd); 2099 WARN_ON(osd->o_osd != req->r_t.osd); 2100 2101 /* backoff? */ 2102 if (should_plug_request(req)) 2103 return; 2104 2105 /* 2106 * We may have a previously queued request message hanging 2107 * around. Cancel it to avoid corrupting the msgr. 2108 */ 2109 if (req->r_sent) 2110 ceph_msg_revoke(req->r_request); 2111 2112 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; 2113 if (req->r_attempts) 2114 req->r_flags |= CEPH_OSD_FLAG_RETRY; 2115 else 2116 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); 2117 2118 encode_request_partial(req, req->r_request); 2119 2120 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n", 2121 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, 2122 req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed, 2123 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags, 2124 req->r_attempts); 2125 2126 req->r_t.paused = false; 2127 req->r_stamp = jiffies; 2128 req->r_attempts++; 2129 2130 req->r_sent = osd->o_incarnation; 2131 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 2132 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); 2133 } 2134 2135 static void maybe_request_map(struct ceph_osd_client *osdc) 2136 { 2137 bool continuous = false; 2138 2139 verify_osdc_locked(osdc); 2140 WARN_ON(!osdc->osdmap->epoch); 2141 2142 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2143 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) || 2144 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2145 dout("%s osdc %p continuous\n", __func__, osdc); 2146 continuous = true; 2147 } else { 2148 dout("%s osdc %p onetime\n", __func__, osdc); 2149 } 2150 2151 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 2152 osdc->osdmap->epoch + 1, continuous)) 2153 ceph_monc_renew_subs(&osdc->client->monc); 2154 } 2155 2156 static void complete_request(struct ceph_osd_request *req, int err); 2157 static void send_map_check(struct ceph_osd_request *req); 2158 2159 static void __submit_request(struct ceph_osd_request *req, bool wrlocked) 2160 { 2161 struct ceph_osd_client *osdc = req->r_osdc; 2162 struct ceph_osd *osd; 2163 enum calc_target_result ct_res; 2164 bool need_send = false; 2165 bool promoted = false; 2166 bool need_abort = false; 2167 2168 WARN_ON(req->r_tid); 2169 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 2170 2171 again: 2172 ct_res = calc_target(osdc, &req->r_t, NULL, false); 2173 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked) 2174 goto promote; 2175 2176 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked); 2177 if (IS_ERR(osd)) { 2178 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked); 2179 goto promote; 2180 } 2181 2182 if (osdc->osdmap->epoch < osdc->epoch_barrier) { 2183 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, 2184 osdc->epoch_barrier); 2185 req->r_t.paused = true; 2186 maybe_request_map(osdc); 2187 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2188 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2189 dout("req %p pausewr\n", req); 2190 req->r_t.paused = true; 2191 maybe_request_map(osdc); 2192 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) && 2193 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2194 dout("req %p pauserd\n", req); 2195 req->r_t.paused = true; 2196 maybe_request_map(osdc); 2197 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2198 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY | 2199 CEPH_OSD_FLAG_FULL_FORCE)) && 2200 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2201 pool_full(osdc, req->r_t.base_oloc.pool))) { 2202 dout("req %p full/pool_full\n", req); 2203 pr_warn_ratelimited("FULL or reached pool quota\n"); 2204 req->r_t.paused = true; 2205 maybe_request_map(osdc); 2206 if (req->r_abort_on_full) 2207 need_abort = true; 2208 } else if (!osd_homeless(osd)) { 2209 need_send = true; 2210 } else { 2211 maybe_request_map(osdc); 2212 } 2213 2214 mutex_lock(&osd->lock); 2215 /* 2216 * Assign the tid atomically with send_request() to protect 2217 * multiple writes to the same object from racing with each 2218 * other, resulting in out of order ops on the OSDs. 2219 */ 2220 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2221 link_request(osd, req); 2222 if (need_send) 2223 send_request(req); 2224 else if (need_abort) 2225 complete_request(req, -ENOSPC); 2226 mutex_unlock(&osd->lock); 2227 2228 if (ct_res == CALC_TARGET_POOL_DNE) 2229 send_map_check(req); 2230 2231 if (promoted) 2232 downgrade_write(&osdc->lock); 2233 return; 2234 2235 promote: 2236 up_read(&osdc->lock); 2237 down_write(&osdc->lock); 2238 wrlocked = true; 2239 promoted = true; 2240 goto again; 2241 } 2242 2243 static void account_request(struct ceph_osd_request *req) 2244 { 2245 WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK)); 2246 WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); 2247 2248 req->r_flags |= CEPH_OSD_FLAG_ONDISK; 2249 atomic_inc(&req->r_osdc->num_requests); 2250 2251 req->r_start_stamp = jiffies; 2252 } 2253 2254 static void submit_request(struct ceph_osd_request *req, bool wrlocked) 2255 { 2256 ceph_osdc_get_request(req); 2257 account_request(req); 2258 __submit_request(req, wrlocked); 2259 } 2260 2261 static void finish_request(struct ceph_osd_request *req) 2262 { 2263 struct ceph_osd_client *osdc = req->r_osdc; 2264 2265 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); 2266 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2267 2268 if (req->r_osd) 2269 unlink_request(req->r_osd, req); 2270 atomic_dec(&osdc->num_requests); 2271 2272 /* 2273 * If an OSD has failed or returned and a request has been sent 2274 * twice, it's possible to get a reply and end up here while the 2275 * request message is queued for delivery. We will ignore the 2276 * reply, so not a big deal, but better to try and catch it. 2277 */ 2278 ceph_msg_revoke(req->r_request); 2279 ceph_msg_revoke_incoming(req->r_reply); 2280 } 2281 2282 static void __complete_request(struct ceph_osd_request *req) 2283 { 2284 if (req->r_callback) { 2285 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, 2286 req->r_tid, req->r_callback, req->r_result); 2287 req->r_callback(req); 2288 } 2289 } 2290 2291 /* 2292 * This is open-coded in handle_reply(). 2293 */ 2294 static void complete_request(struct ceph_osd_request *req, int err) 2295 { 2296 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2297 2298 req->r_result = err; 2299 finish_request(req); 2300 __complete_request(req); 2301 complete_all(&req->r_completion); 2302 ceph_osdc_put_request(req); 2303 } 2304 2305 static void cancel_map_check(struct ceph_osd_request *req) 2306 { 2307 struct ceph_osd_client *osdc = req->r_osdc; 2308 struct ceph_osd_request *lookup_req; 2309 2310 verify_osdc_wrlocked(osdc); 2311 2312 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2313 if (!lookup_req) 2314 return; 2315 2316 WARN_ON(lookup_req != req); 2317 erase_request_mc(&osdc->map_checks, req); 2318 ceph_osdc_put_request(req); 2319 } 2320 2321 static void cancel_request(struct ceph_osd_request *req) 2322 { 2323 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2324 2325 cancel_map_check(req); 2326 finish_request(req); 2327 complete_all(&req->r_completion); 2328 ceph_osdc_put_request(req); 2329 } 2330 2331 static void abort_request(struct ceph_osd_request *req, int err) 2332 { 2333 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2334 2335 cancel_map_check(req); 2336 complete_request(req, err); 2337 } 2338 2339 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2340 { 2341 if (likely(eb > osdc->epoch_barrier)) { 2342 dout("updating epoch_barrier from %u to %u\n", 2343 osdc->epoch_barrier, eb); 2344 osdc->epoch_barrier = eb; 2345 /* Request map if we're not to the barrier yet */ 2346 if (eb > osdc->osdmap->epoch) 2347 maybe_request_map(osdc); 2348 } 2349 } 2350 2351 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2352 { 2353 down_read(&osdc->lock); 2354 if (unlikely(eb > osdc->epoch_barrier)) { 2355 up_read(&osdc->lock); 2356 down_write(&osdc->lock); 2357 update_epoch_barrier(osdc, eb); 2358 up_write(&osdc->lock); 2359 } else { 2360 up_read(&osdc->lock); 2361 } 2362 } 2363 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); 2364 2365 /* 2366 * Drop all pending requests that are stalled waiting on a full condition to 2367 * clear, and complete them with ENOSPC as the return code. Set the 2368 * osdc->epoch_barrier to the latest map epoch that we've seen if any were 2369 * cancelled. 2370 */ 2371 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) 2372 { 2373 struct rb_node *n; 2374 bool victims = false; 2375 2376 dout("enter abort_on_full\n"); 2377 2378 if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) 2379 goto out; 2380 2381 /* Scan list and see if there is anything to abort */ 2382 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 2383 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 2384 struct rb_node *m; 2385 2386 m = rb_first(&osd->o_requests); 2387 while (m) { 2388 struct ceph_osd_request *req = rb_entry(m, 2389 struct ceph_osd_request, r_node); 2390 m = rb_next(m); 2391 2392 if (req->r_abort_on_full) { 2393 victims = true; 2394 break; 2395 } 2396 } 2397 if (victims) 2398 break; 2399 } 2400 2401 if (!victims) 2402 goto out; 2403 2404 /* 2405 * Update the barrier to current epoch if it's behind that point, 2406 * since we know we have some calls to be aborted in the tree. 2407 */ 2408 update_epoch_barrier(osdc, osdc->osdmap->epoch); 2409 2410 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 2411 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 2412 struct rb_node *m; 2413 2414 m = rb_first(&osd->o_requests); 2415 while (m) { 2416 struct ceph_osd_request *req = rb_entry(m, 2417 struct ceph_osd_request, r_node); 2418 m = rb_next(m); 2419 2420 if (req->r_abort_on_full && 2421 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2422 pool_full(osdc, req->r_t.target_oloc.pool))) 2423 abort_request(req, -ENOSPC); 2424 } 2425 } 2426 out: 2427 dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); 2428 } 2429 2430 static void check_pool_dne(struct ceph_osd_request *req) 2431 { 2432 struct ceph_osd_client *osdc = req->r_osdc; 2433 struct ceph_osdmap *map = osdc->osdmap; 2434 2435 verify_osdc_wrlocked(osdc); 2436 WARN_ON(!map->epoch); 2437 2438 if (req->r_attempts) { 2439 /* 2440 * We sent a request earlier, which means that 2441 * previously the pool existed, and now it does not 2442 * (i.e., it was deleted). 2443 */ 2444 req->r_map_dne_bound = map->epoch; 2445 dout("%s req %p tid %llu pool disappeared\n", __func__, req, 2446 req->r_tid); 2447 } else { 2448 dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__, 2449 req, req->r_tid, req->r_map_dne_bound, map->epoch); 2450 } 2451 2452 if (req->r_map_dne_bound) { 2453 if (map->epoch >= req->r_map_dne_bound) { 2454 /* we had a new enough map */ 2455 pr_info_ratelimited("tid %llu pool does not exist\n", 2456 req->r_tid); 2457 complete_request(req, -ENOENT); 2458 } 2459 } else { 2460 send_map_check(req); 2461 } 2462 } 2463 2464 static void map_check_cb(struct ceph_mon_generic_request *greq) 2465 { 2466 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 2467 struct ceph_osd_request *req; 2468 u64 tid = greq->private_data; 2469 2470 WARN_ON(greq->result || !greq->u.newest); 2471 2472 down_write(&osdc->lock); 2473 req = lookup_request_mc(&osdc->map_checks, tid); 2474 if (!req) { 2475 dout("%s tid %llu dne\n", __func__, tid); 2476 goto out_unlock; 2477 } 2478 2479 dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__, 2480 req, req->r_tid, req->r_map_dne_bound, greq->u.newest); 2481 if (!req->r_map_dne_bound) 2482 req->r_map_dne_bound = greq->u.newest; 2483 erase_request_mc(&osdc->map_checks, req); 2484 check_pool_dne(req); 2485 2486 ceph_osdc_put_request(req); 2487 out_unlock: 2488 up_write(&osdc->lock); 2489 } 2490 2491 static void send_map_check(struct ceph_osd_request *req) 2492 { 2493 struct ceph_osd_client *osdc = req->r_osdc; 2494 struct ceph_osd_request *lookup_req; 2495 int ret; 2496 2497 verify_osdc_wrlocked(osdc); 2498 2499 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2500 if (lookup_req) { 2501 WARN_ON(lookup_req != req); 2502 return; 2503 } 2504 2505 ceph_osdc_get_request(req); 2506 insert_request_mc(&osdc->map_checks, req); 2507 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 2508 map_check_cb, req->r_tid); 2509 WARN_ON(ret); 2510 } 2511 2512 /* 2513 * lingering requests, watch/notify v2 infrastructure 2514 */ 2515 static void linger_release(struct kref *kref) 2516 { 2517 struct ceph_osd_linger_request *lreq = 2518 container_of(kref, struct ceph_osd_linger_request, kref); 2519 2520 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, 2521 lreq->reg_req, lreq->ping_req); 2522 WARN_ON(!RB_EMPTY_NODE(&lreq->node)); 2523 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); 2524 WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node)); 2525 WARN_ON(!list_empty(&lreq->scan_item)); 2526 WARN_ON(!list_empty(&lreq->pending_lworks)); 2527 WARN_ON(lreq->osd); 2528 2529 if (lreq->reg_req) 2530 ceph_osdc_put_request(lreq->reg_req); 2531 if (lreq->ping_req) 2532 ceph_osdc_put_request(lreq->ping_req); 2533 target_destroy(&lreq->t); 2534 kfree(lreq); 2535 } 2536 2537 static void linger_put(struct ceph_osd_linger_request *lreq) 2538 { 2539 if (lreq) 2540 kref_put(&lreq->kref, linger_release); 2541 } 2542 2543 static struct ceph_osd_linger_request * 2544 linger_get(struct ceph_osd_linger_request *lreq) 2545 { 2546 kref_get(&lreq->kref); 2547 return lreq; 2548 } 2549 2550 static struct ceph_osd_linger_request * 2551 linger_alloc(struct ceph_osd_client *osdc) 2552 { 2553 struct ceph_osd_linger_request *lreq; 2554 2555 lreq = kzalloc(sizeof(*lreq), GFP_NOIO); 2556 if (!lreq) 2557 return NULL; 2558 2559 kref_init(&lreq->kref); 2560 mutex_init(&lreq->lock); 2561 RB_CLEAR_NODE(&lreq->node); 2562 RB_CLEAR_NODE(&lreq->osdc_node); 2563 RB_CLEAR_NODE(&lreq->mc_node); 2564 INIT_LIST_HEAD(&lreq->scan_item); 2565 INIT_LIST_HEAD(&lreq->pending_lworks); 2566 init_completion(&lreq->reg_commit_wait); 2567 init_completion(&lreq->notify_finish_wait); 2568 2569 lreq->osdc = osdc; 2570 target_init(&lreq->t); 2571 2572 dout("%s lreq %p\n", __func__, lreq); 2573 return lreq; 2574 } 2575 2576 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) 2577 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) 2578 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node) 2579 2580 /* 2581 * Create linger request <-> OSD session relation. 2582 * 2583 * @lreq has to be registered, @osd may be homeless. 2584 */ 2585 static void link_linger(struct ceph_osd *osd, 2586 struct ceph_osd_linger_request *lreq) 2587 { 2588 verify_osd_locked(osd); 2589 WARN_ON(!lreq->linger_id || lreq->osd); 2590 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2591 osd->o_osd, lreq, lreq->linger_id); 2592 2593 if (!osd_homeless(osd)) 2594 __remove_osd_from_lru(osd); 2595 else 2596 atomic_inc(&osd->o_osdc->num_homeless); 2597 2598 get_osd(osd); 2599 insert_linger(&osd->o_linger_requests, lreq); 2600 lreq->osd = osd; 2601 } 2602 2603 static void unlink_linger(struct ceph_osd *osd, 2604 struct ceph_osd_linger_request *lreq) 2605 { 2606 verify_osd_locked(osd); 2607 WARN_ON(lreq->osd != osd); 2608 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2609 osd->o_osd, lreq, lreq->linger_id); 2610 2611 lreq->osd = NULL; 2612 erase_linger(&osd->o_linger_requests, lreq); 2613 put_osd(osd); 2614 2615 if (!osd_homeless(osd)) 2616 maybe_move_osd_to_lru(osd); 2617 else 2618 atomic_dec(&osd->o_osdc->num_homeless); 2619 } 2620 2621 static bool __linger_registered(struct ceph_osd_linger_request *lreq) 2622 { 2623 verify_osdc_locked(lreq->osdc); 2624 2625 return !RB_EMPTY_NODE(&lreq->osdc_node); 2626 } 2627 2628 static bool linger_registered(struct ceph_osd_linger_request *lreq) 2629 { 2630 struct ceph_osd_client *osdc = lreq->osdc; 2631 bool registered; 2632 2633 down_read(&osdc->lock); 2634 registered = __linger_registered(lreq); 2635 up_read(&osdc->lock); 2636 2637 return registered; 2638 } 2639 2640 static void linger_register(struct ceph_osd_linger_request *lreq) 2641 { 2642 struct ceph_osd_client *osdc = lreq->osdc; 2643 2644 verify_osdc_wrlocked(osdc); 2645 WARN_ON(lreq->linger_id); 2646 2647 linger_get(lreq); 2648 lreq->linger_id = ++osdc->last_linger_id; 2649 insert_linger_osdc(&osdc->linger_requests, lreq); 2650 } 2651 2652 static void linger_unregister(struct ceph_osd_linger_request *lreq) 2653 { 2654 struct ceph_osd_client *osdc = lreq->osdc; 2655 2656 verify_osdc_wrlocked(osdc); 2657 2658 erase_linger_osdc(&osdc->linger_requests, lreq); 2659 linger_put(lreq); 2660 } 2661 2662 static void cancel_linger_request(struct ceph_osd_request *req) 2663 { 2664 struct ceph_osd_linger_request *lreq = req->r_priv; 2665 2666 WARN_ON(!req->r_linger); 2667 cancel_request(req); 2668 linger_put(lreq); 2669 } 2670 2671 struct linger_work { 2672 struct work_struct work; 2673 struct ceph_osd_linger_request *lreq; 2674 struct list_head pending_item; 2675 unsigned long queued_stamp; 2676 2677 union { 2678 struct { 2679 u64 notify_id; 2680 u64 notifier_id; 2681 void *payload; /* points into @msg front */ 2682 size_t payload_len; 2683 2684 struct ceph_msg *msg; /* for ceph_msg_put() */ 2685 } notify; 2686 struct { 2687 int err; 2688 } error; 2689 }; 2690 }; 2691 2692 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, 2693 work_func_t workfn) 2694 { 2695 struct linger_work *lwork; 2696 2697 lwork = kzalloc(sizeof(*lwork), GFP_NOIO); 2698 if (!lwork) 2699 return NULL; 2700 2701 INIT_WORK(&lwork->work, workfn); 2702 INIT_LIST_HEAD(&lwork->pending_item); 2703 lwork->lreq = linger_get(lreq); 2704 2705 return lwork; 2706 } 2707 2708 static void lwork_free(struct linger_work *lwork) 2709 { 2710 struct ceph_osd_linger_request *lreq = lwork->lreq; 2711 2712 mutex_lock(&lreq->lock); 2713 list_del(&lwork->pending_item); 2714 mutex_unlock(&lreq->lock); 2715 2716 linger_put(lreq); 2717 kfree(lwork); 2718 } 2719 2720 static void lwork_queue(struct linger_work *lwork) 2721 { 2722 struct ceph_osd_linger_request *lreq = lwork->lreq; 2723 struct ceph_osd_client *osdc = lreq->osdc; 2724 2725 verify_lreq_locked(lreq); 2726 WARN_ON(!list_empty(&lwork->pending_item)); 2727 2728 lwork->queued_stamp = jiffies; 2729 list_add_tail(&lwork->pending_item, &lreq->pending_lworks); 2730 queue_work(osdc->notify_wq, &lwork->work); 2731 } 2732 2733 static void do_watch_notify(struct work_struct *w) 2734 { 2735 struct linger_work *lwork = container_of(w, struct linger_work, work); 2736 struct ceph_osd_linger_request *lreq = lwork->lreq; 2737 2738 if (!linger_registered(lreq)) { 2739 dout("%s lreq %p not registered\n", __func__, lreq); 2740 goto out; 2741 } 2742 2743 WARN_ON(!lreq->is_watch); 2744 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", 2745 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, 2746 lwork->notify.payload_len); 2747 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, 2748 lwork->notify.notifier_id, lwork->notify.payload, 2749 lwork->notify.payload_len); 2750 2751 out: 2752 ceph_msg_put(lwork->notify.msg); 2753 lwork_free(lwork); 2754 } 2755 2756 static void do_watch_error(struct work_struct *w) 2757 { 2758 struct linger_work *lwork = container_of(w, struct linger_work, work); 2759 struct ceph_osd_linger_request *lreq = lwork->lreq; 2760 2761 if (!linger_registered(lreq)) { 2762 dout("%s lreq %p not registered\n", __func__, lreq); 2763 goto out; 2764 } 2765 2766 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); 2767 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); 2768 2769 out: 2770 lwork_free(lwork); 2771 } 2772 2773 static void queue_watch_error(struct ceph_osd_linger_request *lreq) 2774 { 2775 struct linger_work *lwork; 2776 2777 lwork = lwork_alloc(lreq, do_watch_error); 2778 if (!lwork) { 2779 pr_err("failed to allocate error-lwork\n"); 2780 return; 2781 } 2782 2783 lwork->error.err = lreq->last_error; 2784 lwork_queue(lwork); 2785 } 2786 2787 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, 2788 int result) 2789 { 2790 if (!completion_done(&lreq->reg_commit_wait)) { 2791 lreq->reg_commit_error = (result <= 0 ? result : 0); 2792 complete_all(&lreq->reg_commit_wait); 2793 } 2794 } 2795 2796 static void linger_commit_cb(struct ceph_osd_request *req) 2797 { 2798 struct ceph_osd_linger_request *lreq = req->r_priv; 2799 2800 mutex_lock(&lreq->lock); 2801 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, 2802 lreq->linger_id, req->r_result); 2803 linger_reg_commit_complete(lreq, req->r_result); 2804 lreq->committed = true; 2805 2806 if (!lreq->is_watch) { 2807 struct ceph_osd_data *osd_data = 2808 osd_req_op_data(req, 0, notify, response_data); 2809 void *p = page_address(osd_data->pages[0]); 2810 2811 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY || 2812 osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 2813 2814 /* make note of the notify_id */ 2815 if (req->r_ops[0].outdata_len >= sizeof(u64)) { 2816 lreq->notify_id = ceph_decode_64(&p); 2817 dout("lreq %p notify_id %llu\n", lreq, 2818 lreq->notify_id); 2819 } else { 2820 dout("lreq %p no notify_id\n", lreq); 2821 } 2822 } 2823 2824 mutex_unlock(&lreq->lock); 2825 linger_put(lreq); 2826 } 2827 2828 static int normalize_watch_error(int err) 2829 { 2830 /* 2831 * Translate ENOENT -> ENOTCONN so that a delete->disconnection 2832 * notification and a failure to reconnect because we raced with 2833 * the delete appear the same to the user. 2834 */ 2835 if (err == -ENOENT) 2836 err = -ENOTCONN; 2837 2838 return err; 2839 } 2840 2841 static void linger_reconnect_cb(struct ceph_osd_request *req) 2842 { 2843 struct ceph_osd_linger_request *lreq = req->r_priv; 2844 2845 mutex_lock(&lreq->lock); 2846 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, 2847 lreq, lreq->linger_id, req->r_result, lreq->last_error); 2848 if (req->r_result < 0) { 2849 if (!lreq->last_error) { 2850 lreq->last_error = normalize_watch_error(req->r_result); 2851 queue_watch_error(lreq); 2852 } 2853 } 2854 2855 mutex_unlock(&lreq->lock); 2856 linger_put(lreq); 2857 } 2858 2859 static void send_linger(struct ceph_osd_linger_request *lreq) 2860 { 2861 struct ceph_osd_request *req = lreq->reg_req; 2862 struct ceph_osd_req_op *op = &req->r_ops[0]; 2863 2864 verify_osdc_wrlocked(req->r_osdc); 2865 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 2866 2867 if (req->r_osd) 2868 cancel_linger_request(req); 2869 2870 request_reinit(req); 2871 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 2872 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 2873 req->r_flags = lreq->t.flags; 2874 req->r_mtime = lreq->mtime; 2875 2876 mutex_lock(&lreq->lock); 2877 if (lreq->is_watch && lreq->committed) { 2878 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2879 op->watch.cookie != lreq->linger_id); 2880 op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT; 2881 op->watch.gen = ++lreq->register_gen; 2882 dout("lreq %p reconnect register_gen %u\n", lreq, 2883 op->watch.gen); 2884 req->r_callback = linger_reconnect_cb; 2885 } else { 2886 if (!lreq->is_watch) 2887 lreq->notify_id = 0; 2888 else 2889 WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH); 2890 dout("lreq %p register\n", lreq); 2891 req->r_callback = linger_commit_cb; 2892 } 2893 mutex_unlock(&lreq->lock); 2894 2895 req->r_priv = linger_get(lreq); 2896 req->r_linger = true; 2897 2898 submit_request(req, true); 2899 } 2900 2901 static void linger_ping_cb(struct ceph_osd_request *req) 2902 { 2903 struct ceph_osd_linger_request *lreq = req->r_priv; 2904 2905 mutex_lock(&lreq->lock); 2906 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", 2907 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, 2908 lreq->last_error); 2909 if (lreq->register_gen == req->r_ops[0].watch.gen) { 2910 if (!req->r_result) { 2911 lreq->watch_valid_thru = lreq->ping_sent; 2912 } else if (!lreq->last_error) { 2913 lreq->last_error = normalize_watch_error(req->r_result); 2914 queue_watch_error(lreq); 2915 } 2916 } else { 2917 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, 2918 lreq->register_gen, req->r_ops[0].watch.gen); 2919 } 2920 2921 mutex_unlock(&lreq->lock); 2922 linger_put(lreq); 2923 } 2924 2925 static void send_linger_ping(struct ceph_osd_linger_request *lreq) 2926 { 2927 struct ceph_osd_client *osdc = lreq->osdc; 2928 struct ceph_osd_request *req = lreq->ping_req; 2929 struct ceph_osd_req_op *op = &req->r_ops[0]; 2930 2931 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2932 dout("%s PAUSERD\n", __func__); 2933 return; 2934 } 2935 2936 lreq->ping_sent = jiffies; 2937 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", 2938 __func__, lreq, lreq->linger_id, lreq->ping_sent, 2939 lreq->register_gen); 2940 2941 if (req->r_osd) 2942 cancel_linger_request(req); 2943 2944 request_reinit(req); 2945 target_copy(&req->r_t, &lreq->t); 2946 2947 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2948 op->watch.cookie != lreq->linger_id || 2949 op->watch.op != CEPH_OSD_WATCH_OP_PING); 2950 op->watch.gen = lreq->register_gen; 2951 req->r_callback = linger_ping_cb; 2952 req->r_priv = linger_get(lreq); 2953 req->r_linger = true; 2954 2955 ceph_osdc_get_request(req); 2956 account_request(req); 2957 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2958 link_request(lreq->osd, req); 2959 send_request(req); 2960 } 2961 2962 static void linger_submit(struct ceph_osd_linger_request *lreq) 2963 { 2964 struct ceph_osd_client *osdc = lreq->osdc; 2965 struct ceph_osd *osd; 2966 2967 calc_target(osdc, &lreq->t, NULL, false); 2968 osd = lookup_create_osd(osdc, lreq->t.osd, true); 2969 link_linger(osd, lreq); 2970 2971 send_linger(lreq); 2972 } 2973 2974 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) 2975 { 2976 struct ceph_osd_client *osdc = lreq->osdc; 2977 struct ceph_osd_linger_request *lookup_lreq; 2978 2979 verify_osdc_wrlocked(osdc); 2980 2981 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 2982 lreq->linger_id); 2983 if (!lookup_lreq) 2984 return; 2985 2986 WARN_ON(lookup_lreq != lreq); 2987 erase_linger_mc(&osdc->linger_map_checks, lreq); 2988 linger_put(lreq); 2989 } 2990 2991 /* 2992 * @lreq has to be both registered and linked. 2993 */ 2994 static void __linger_cancel(struct ceph_osd_linger_request *lreq) 2995 { 2996 if (lreq->is_watch && lreq->ping_req->r_osd) 2997 cancel_linger_request(lreq->ping_req); 2998 if (lreq->reg_req->r_osd) 2999 cancel_linger_request(lreq->reg_req); 3000 cancel_linger_map_check(lreq); 3001 unlink_linger(lreq->osd, lreq); 3002 linger_unregister(lreq); 3003 } 3004 3005 static void linger_cancel(struct ceph_osd_linger_request *lreq) 3006 { 3007 struct ceph_osd_client *osdc = lreq->osdc; 3008 3009 down_write(&osdc->lock); 3010 if (__linger_registered(lreq)) 3011 __linger_cancel(lreq); 3012 up_write(&osdc->lock); 3013 } 3014 3015 static void send_linger_map_check(struct ceph_osd_linger_request *lreq); 3016 3017 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq) 3018 { 3019 struct ceph_osd_client *osdc = lreq->osdc; 3020 struct ceph_osdmap *map = osdc->osdmap; 3021 3022 verify_osdc_wrlocked(osdc); 3023 WARN_ON(!map->epoch); 3024 3025 if (lreq->register_gen) { 3026 lreq->map_dne_bound = map->epoch; 3027 dout("%s lreq %p linger_id %llu pool disappeared\n", __func__, 3028 lreq, lreq->linger_id); 3029 } else { 3030 dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n", 3031 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3032 map->epoch); 3033 } 3034 3035 if (lreq->map_dne_bound) { 3036 if (map->epoch >= lreq->map_dne_bound) { 3037 /* we had a new enough map */ 3038 pr_info("linger_id %llu pool does not exist\n", 3039 lreq->linger_id); 3040 linger_reg_commit_complete(lreq, -ENOENT); 3041 __linger_cancel(lreq); 3042 } 3043 } else { 3044 send_linger_map_check(lreq); 3045 } 3046 } 3047 3048 static void linger_map_check_cb(struct ceph_mon_generic_request *greq) 3049 { 3050 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 3051 struct ceph_osd_linger_request *lreq; 3052 u64 linger_id = greq->private_data; 3053 3054 WARN_ON(greq->result || !greq->u.newest); 3055 3056 down_write(&osdc->lock); 3057 lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id); 3058 if (!lreq) { 3059 dout("%s linger_id %llu dne\n", __func__, linger_id); 3060 goto out_unlock; 3061 } 3062 3063 dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n", 3064 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3065 greq->u.newest); 3066 if (!lreq->map_dne_bound) 3067 lreq->map_dne_bound = greq->u.newest; 3068 erase_linger_mc(&osdc->linger_map_checks, lreq); 3069 check_linger_pool_dne(lreq); 3070 3071 linger_put(lreq); 3072 out_unlock: 3073 up_write(&osdc->lock); 3074 } 3075 3076 static void send_linger_map_check(struct ceph_osd_linger_request *lreq) 3077 { 3078 struct ceph_osd_client *osdc = lreq->osdc; 3079 struct ceph_osd_linger_request *lookup_lreq; 3080 int ret; 3081 3082 verify_osdc_wrlocked(osdc); 3083 3084 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 3085 lreq->linger_id); 3086 if (lookup_lreq) { 3087 WARN_ON(lookup_lreq != lreq); 3088 return; 3089 } 3090 3091 linger_get(lreq); 3092 insert_linger_mc(&osdc->linger_map_checks, lreq); 3093 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 3094 linger_map_check_cb, lreq->linger_id); 3095 WARN_ON(ret); 3096 } 3097 3098 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) 3099 { 3100 int ret; 3101 3102 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3103 ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); 3104 return ret ?: lreq->reg_commit_error; 3105 } 3106 3107 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq) 3108 { 3109 int ret; 3110 3111 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3112 ret = wait_for_completion_interruptible(&lreq->notify_finish_wait); 3113 return ret ?: lreq->notify_finish_error; 3114 } 3115 3116 /* 3117 * Timeout callback, called every N seconds. When 1 or more OSD 3118 * requests has been active for more than N seconds, we send a keepalive 3119 * (tag + timestamp) to its OSD to ensure any communications channel 3120 * reset is detected. 3121 */ 3122 static void handle_timeout(struct work_struct *work) 3123 { 3124 struct ceph_osd_client *osdc = 3125 container_of(work, struct ceph_osd_client, timeout_work.work); 3126 struct ceph_options *opts = osdc->client->options; 3127 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout; 3128 unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout; 3129 LIST_HEAD(slow_osds); 3130 struct rb_node *n, *p; 3131 3132 dout("%s osdc %p\n", __func__, osdc); 3133 down_write(&osdc->lock); 3134 3135 /* 3136 * ping osds that are a bit slow. this ensures that if there 3137 * is a break in the TCP connection we will notice, and reopen 3138 * a connection with that osd (from the fault callback). 3139 */ 3140 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 3141 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3142 bool found = false; 3143 3144 for (p = rb_first(&osd->o_requests); p; ) { 3145 struct ceph_osd_request *req = 3146 rb_entry(p, struct ceph_osd_request, r_node); 3147 3148 p = rb_next(p); /* abort_request() */ 3149 3150 if (time_before(req->r_stamp, cutoff)) { 3151 dout(" req %p tid %llu on osd%d is laggy\n", 3152 req, req->r_tid, osd->o_osd); 3153 found = true; 3154 } 3155 if (opts->osd_request_timeout && 3156 time_before(req->r_start_stamp, expiry_cutoff)) { 3157 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3158 req->r_tid, osd->o_osd); 3159 abort_request(req, -ETIMEDOUT); 3160 } 3161 } 3162 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { 3163 struct ceph_osd_linger_request *lreq = 3164 rb_entry(p, struct ceph_osd_linger_request, node); 3165 3166 dout(" lreq %p linger_id %llu is served by osd%d\n", 3167 lreq, lreq->linger_id, osd->o_osd); 3168 found = true; 3169 3170 mutex_lock(&lreq->lock); 3171 if (lreq->is_watch && lreq->committed && !lreq->last_error) 3172 send_linger_ping(lreq); 3173 mutex_unlock(&lreq->lock); 3174 } 3175 3176 if (found) 3177 list_move_tail(&osd->o_keepalive_item, &slow_osds); 3178 } 3179 3180 if (opts->osd_request_timeout) { 3181 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { 3182 struct ceph_osd_request *req = 3183 rb_entry(p, struct ceph_osd_request, r_node); 3184 3185 p = rb_next(p); /* abort_request() */ 3186 3187 if (time_before(req->r_start_stamp, expiry_cutoff)) { 3188 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3189 req->r_tid, osdc->homeless_osd.o_osd); 3190 abort_request(req, -ETIMEDOUT); 3191 } 3192 } 3193 } 3194 3195 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds)) 3196 maybe_request_map(osdc); 3197 3198 while (!list_empty(&slow_osds)) { 3199 struct ceph_osd *osd = list_first_entry(&slow_osds, 3200 struct ceph_osd, 3201 o_keepalive_item); 3202 list_del_init(&osd->o_keepalive_item); 3203 ceph_con_keepalive(&osd->o_con); 3204 } 3205 3206 up_write(&osdc->lock); 3207 schedule_delayed_work(&osdc->timeout_work, 3208 osdc->client->options->osd_keepalive_timeout); 3209 } 3210 3211 static void handle_osds_timeout(struct work_struct *work) 3212 { 3213 struct ceph_osd_client *osdc = 3214 container_of(work, struct ceph_osd_client, 3215 osds_timeout_work.work); 3216 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 3217 struct ceph_osd *osd, *nosd; 3218 3219 dout("%s osdc %p\n", __func__, osdc); 3220 down_write(&osdc->lock); 3221 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 3222 if (time_before(jiffies, osd->lru_ttl)) 3223 break; 3224 3225 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 3226 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 3227 close_osd(osd); 3228 } 3229 3230 up_write(&osdc->lock); 3231 schedule_delayed_work(&osdc->osds_timeout_work, 3232 round_jiffies_relative(delay)); 3233 } 3234 3235 static int ceph_oloc_decode(void **p, void *end, 3236 struct ceph_object_locator *oloc) 3237 { 3238 u8 struct_v, struct_cv; 3239 u32 len; 3240 void *struct_end; 3241 int ret = 0; 3242 3243 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3244 struct_v = ceph_decode_8(p); 3245 struct_cv = ceph_decode_8(p); 3246 if (struct_v < 3) { 3247 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 3248 struct_v, struct_cv); 3249 goto e_inval; 3250 } 3251 if (struct_cv > 6) { 3252 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 3253 struct_v, struct_cv); 3254 goto e_inval; 3255 } 3256 len = ceph_decode_32(p); 3257 ceph_decode_need(p, end, len, e_inval); 3258 struct_end = *p + len; 3259 3260 oloc->pool = ceph_decode_64(p); 3261 *p += 4; /* skip preferred */ 3262 3263 len = ceph_decode_32(p); 3264 if (len > 0) { 3265 pr_warn("ceph_object_locator::key is set\n"); 3266 goto e_inval; 3267 } 3268 3269 if (struct_v >= 5) { 3270 bool changed = false; 3271 3272 len = ceph_decode_32(p); 3273 if (len > 0) { 3274 ceph_decode_need(p, end, len, e_inval); 3275 if (!oloc->pool_ns || 3276 ceph_compare_string(oloc->pool_ns, *p, len)) 3277 changed = true; 3278 *p += len; 3279 } else { 3280 if (oloc->pool_ns) 3281 changed = true; 3282 } 3283 if (changed) { 3284 /* redirect changes namespace */ 3285 pr_warn("ceph_object_locator::nspace is changed\n"); 3286 goto e_inval; 3287 } 3288 } 3289 3290 if (struct_v >= 6) { 3291 s64 hash = ceph_decode_64(p); 3292 if (hash != -1) { 3293 pr_warn("ceph_object_locator::hash is set\n"); 3294 goto e_inval; 3295 } 3296 } 3297 3298 /* skip the rest */ 3299 *p = struct_end; 3300 out: 3301 return ret; 3302 3303 e_inval: 3304 ret = -EINVAL; 3305 goto out; 3306 } 3307 3308 static int ceph_redirect_decode(void **p, void *end, 3309 struct ceph_request_redirect *redir) 3310 { 3311 u8 struct_v, struct_cv; 3312 u32 len; 3313 void *struct_end; 3314 int ret; 3315 3316 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3317 struct_v = ceph_decode_8(p); 3318 struct_cv = ceph_decode_8(p); 3319 if (struct_cv > 1) { 3320 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 3321 struct_v, struct_cv); 3322 goto e_inval; 3323 } 3324 len = ceph_decode_32(p); 3325 ceph_decode_need(p, end, len, e_inval); 3326 struct_end = *p + len; 3327 3328 ret = ceph_oloc_decode(p, end, &redir->oloc); 3329 if (ret) 3330 goto out; 3331 3332 len = ceph_decode_32(p); 3333 if (len > 0) { 3334 pr_warn("ceph_request_redirect::object_name is set\n"); 3335 goto e_inval; 3336 } 3337 3338 len = ceph_decode_32(p); 3339 *p += len; /* skip osd_instructions */ 3340 3341 /* skip the rest */ 3342 *p = struct_end; 3343 out: 3344 return ret; 3345 3346 e_inval: 3347 ret = -EINVAL; 3348 goto out; 3349 } 3350 3351 struct MOSDOpReply { 3352 struct ceph_pg pgid; 3353 u64 flags; 3354 int result; 3355 u32 epoch; 3356 int num_ops; 3357 u32 outdata_len[CEPH_OSD_MAX_OPS]; 3358 s32 rval[CEPH_OSD_MAX_OPS]; 3359 int retry_attempt; 3360 struct ceph_eversion replay_version; 3361 u64 user_version; 3362 struct ceph_request_redirect redirect; 3363 }; 3364 3365 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m) 3366 { 3367 void *p = msg->front.iov_base; 3368 void *const end = p + msg->front.iov_len; 3369 u16 version = le16_to_cpu(msg->hdr.version); 3370 struct ceph_eversion bad_replay_version; 3371 u8 decode_redir; 3372 u32 len; 3373 int ret; 3374 int i; 3375 3376 ceph_decode_32_safe(&p, end, len, e_inval); 3377 ceph_decode_need(&p, end, len, e_inval); 3378 p += len; /* skip oid */ 3379 3380 ret = ceph_decode_pgid(&p, end, &m->pgid); 3381 if (ret) 3382 return ret; 3383 3384 ceph_decode_64_safe(&p, end, m->flags, e_inval); 3385 ceph_decode_32_safe(&p, end, m->result, e_inval); 3386 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval); 3387 memcpy(&bad_replay_version, p, sizeof(bad_replay_version)); 3388 p += sizeof(bad_replay_version); 3389 ceph_decode_32_safe(&p, end, m->epoch, e_inval); 3390 3391 ceph_decode_32_safe(&p, end, m->num_ops, e_inval); 3392 if (m->num_ops > ARRAY_SIZE(m->outdata_len)) 3393 goto e_inval; 3394 3395 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op), 3396 e_inval); 3397 for (i = 0; i < m->num_ops; i++) { 3398 struct ceph_osd_op *op = p; 3399 3400 m->outdata_len[i] = le32_to_cpu(op->payload_len); 3401 p += sizeof(*op); 3402 } 3403 3404 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval); 3405 for (i = 0; i < m->num_ops; i++) 3406 ceph_decode_32_safe(&p, end, m->rval[i], e_inval); 3407 3408 if (version >= 5) { 3409 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval); 3410 memcpy(&m->replay_version, p, sizeof(m->replay_version)); 3411 p += sizeof(m->replay_version); 3412 ceph_decode_64_safe(&p, end, m->user_version, e_inval); 3413 } else { 3414 m->replay_version = bad_replay_version; /* struct */ 3415 m->user_version = le64_to_cpu(m->replay_version.version); 3416 } 3417 3418 if (version >= 6) { 3419 if (version >= 7) 3420 ceph_decode_8_safe(&p, end, decode_redir, e_inval); 3421 else 3422 decode_redir = 1; 3423 } else { 3424 decode_redir = 0; 3425 } 3426 3427 if (decode_redir) { 3428 ret = ceph_redirect_decode(&p, end, &m->redirect); 3429 if (ret) 3430 return ret; 3431 } else { 3432 ceph_oloc_init(&m->redirect.oloc); 3433 } 3434 3435 return 0; 3436 3437 e_inval: 3438 return -EINVAL; 3439 } 3440 3441 /* 3442 * Handle MOSDOpReply. Set ->r_result and call the callback if it is 3443 * specified. 3444 */ 3445 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) 3446 { 3447 struct ceph_osd_client *osdc = osd->o_osdc; 3448 struct ceph_osd_request *req; 3449 struct MOSDOpReply m; 3450 u64 tid = le64_to_cpu(msg->hdr.tid); 3451 u32 data_len = 0; 3452 int ret; 3453 int i; 3454 3455 dout("%s msg %p tid %llu\n", __func__, msg, tid); 3456 3457 down_read(&osdc->lock); 3458 if (!osd_registered(osd)) { 3459 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3460 goto out_unlock_osdc; 3461 } 3462 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 3463 3464 mutex_lock(&osd->lock); 3465 req = lookup_request(&osd->o_requests, tid); 3466 if (!req) { 3467 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid); 3468 goto out_unlock_session; 3469 } 3470 3471 m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns; 3472 ret = decode_MOSDOpReply(msg, &m); 3473 m.redirect.oloc.pool_ns = NULL; 3474 if (ret) { 3475 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n", 3476 req->r_tid, ret); 3477 ceph_msg_dump(msg); 3478 goto fail_request; 3479 } 3480 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n", 3481 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed, 3482 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch), 3483 le64_to_cpu(m.replay_version.version), m.user_version); 3484 3485 if (m.retry_attempt >= 0) { 3486 if (m.retry_attempt != req->r_attempts - 1) { 3487 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n", 3488 req, req->r_tid, m.retry_attempt, 3489 req->r_attempts - 1); 3490 goto out_unlock_session; 3491 } 3492 } else { 3493 WARN_ON(1); /* MOSDOpReply v4 is assumed */ 3494 } 3495 3496 if (!ceph_oloc_empty(&m.redirect.oloc)) { 3497 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid, 3498 m.redirect.oloc.pool); 3499 unlink_request(osd, req); 3500 mutex_unlock(&osd->lock); 3501 3502 /* 3503 * Not ceph_oloc_copy() - changing pool_ns is not 3504 * supported. 3505 */ 3506 req->r_t.target_oloc.pool = m.redirect.oloc.pool; 3507 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED; 3508 req->r_tid = 0; 3509 __submit_request(req, false); 3510 goto out_unlock_osdc; 3511 } 3512 3513 if (m.num_ops != req->r_num_ops) { 3514 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops, 3515 req->r_num_ops, req->r_tid); 3516 goto fail_request; 3517 } 3518 for (i = 0; i < req->r_num_ops; i++) { 3519 dout(" req %p tid %llu op %d rval %d len %u\n", req, 3520 req->r_tid, i, m.rval[i], m.outdata_len[i]); 3521 req->r_ops[i].rval = m.rval[i]; 3522 req->r_ops[i].outdata_len = m.outdata_len[i]; 3523 data_len += m.outdata_len[i]; 3524 } 3525 if (data_len != le32_to_cpu(msg->hdr.data_len)) { 3526 pr_err("sum of lens %u != %u for tid %llu\n", data_len, 3527 le32_to_cpu(msg->hdr.data_len), req->r_tid); 3528 goto fail_request; 3529 } 3530 dout("%s req %p tid %llu result %d data_len %u\n", __func__, 3531 req, req->r_tid, m.result, data_len); 3532 3533 /* 3534 * Since we only ever request ONDISK, we should only ever get 3535 * one (type of) reply back. 3536 */ 3537 WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); 3538 req->r_result = m.result ?: data_len; 3539 finish_request(req); 3540 mutex_unlock(&osd->lock); 3541 up_read(&osdc->lock); 3542 3543 __complete_request(req); 3544 complete_all(&req->r_completion); 3545 ceph_osdc_put_request(req); 3546 return; 3547 3548 fail_request: 3549 complete_request(req, -EIO); 3550 out_unlock_session: 3551 mutex_unlock(&osd->lock); 3552 out_unlock_osdc: 3553 up_read(&osdc->lock); 3554 } 3555 3556 static void set_pool_was_full(struct ceph_osd_client *osdc) 3557 { 3558 struct rb_node *n; 3559 3560 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 3561 struct ceph_pg_pool_info *pi = 3562 rb_entry(n, struct ceph_pg_pool_info, node); 3563 3564 pi->was_full = __pool_full(pi); 3565 } 3566 } 3567 3568 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) 3569 { 3570 struct ceph_pg_pool_info *pi; 3571 3572 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 3573 if (!pi) 3574 return false; 3575 3576 return pi->was_full && !__pool_full(pi); 3577 } 3578 3579 static enum calc_target_result 3580 recalc_linger_target(struct ceph_osd_linger_request *lreq) 3581 { 3582 struct ceph_osd_client *osdc = lreq->osdc; 3583 enum calc_target_result ct_res; 3584 3585 ct_res = calc_target(osdc, &lreq->t, NULL, true); 3586 if (ct_res == CALC_TARGET_NEED_RESEND) { 3587 struct ceph_osd *osd; 3588 3589 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3590 if (osd != lreq->osd) { 3591 unlink_linger(lreq->osd, lreq); 3592 link_linger(osd, lreq); 3593 } 3594 } 3595 3596 return ct_res; 3597 } 3598 3599 /* 3600 * Requeue requests whose mapping to an OSD has changed. 3601 */ 3602 static void scan_requests(struct ceph_osd *osd, 3603 bool force_resend, 3604 bool cleared_full, 3605 bool check_pool_cleared_full, 3606 struct rb_root *need_resend, 3607 struct list_head *need_resend_linger) 3608 { 3609 struct ceph_osd_client *osdc = osd->o_osdc; 3610 struct rb_node *n; 3611 bool force_resend_writes; 3612 3613 for (n = rb_first(&osd->o_linger_requests); n; ) { 3614 struct ceph_osd_linger_request *lreq = 3615 rb_entry(n, struct ceph_osd_linger_request, node); 3616 enum calc_target_result ct_res; 3617 3618 n = rb_next(n); /* recalc_linger_target() */ 3619 3620 dout("%s lreq %p linger_id %llu\n", __func__, lreq, 3621 lreq->linger_id); 3622 ct_res = recalc_linger_target(lreq); 3623 switch (ct_res) { 3624 case CALC_TARGET_NO_ACTION: 3625 force_resend_writes = cleared_full || 3626 (check_pool_cleared_full && 3627 pool_cleared_full(osdc, lreq->t.base_oloc.pool)); 3628 if (!force_resend && !force_resend_writes) 3629 break; 3630 3631 /* fall through */ 3632 case CALC_TARGET_NEED_RESEND: 3633 cancel_linger_map_check(lreq); 3634 /* 3635 * scan_requests() for the previous epoch(s) 3636 * may have already added it to the list, since 3637 * it's not unlinked here. 3638 */ 3639 if (list_empty(&lreq->scan_item)) 3640 list_add_tail(&lreq->scan_item, need_resend_linger); 3641 break; 3642 case CALC_TARGET_POOL_DNE: 3643 list_del_init(&lreq->scan_item); 3644 check_linger_pool_dne(lreq); 3645 break; 3646 } 3647 } 3648 3649 for (n = rb_first(&osd->o_requests); n; ) { 3650 struct ceph_osd_request *req = 3651 rb_entry(n, struct ceph_osd_request, r_node); 3652 enum calc_target_result ct_res; 3653 3654 n = rb_next(n); /* unlink_request(), check_pool_dne() */ 3655 3656 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 3657 ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con, 3658 false); 3659 switch (ct_res) { 3660 case CALC_TARGET_NO_ACTION: 3661 force_resend_writes = cleared_full || 3662 (check_pool_cleared_full && 3663 pool_cleared_full(osdc, req->r_t.base_oloc.pool)); 3664 if (!force_resend && 3665 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) || 3666 !force_resend_writes)) 3667 break; 3668 3669 /* fall through */ 3670 case CALC_TARGET_NEED_RESEND: 3671 cancel_map_check(req); 3672 unlink_request(osd, req); 3673 insert_request(need_resend, req); 3674 break; 3675 case CALC_TARGET_POOL_DNE: 3676 check_pool_dne(req); 3677 break; 3678 } 3679 } 3680 } 3681 3682 static int handle_one_map(struct ceph_osd_client *osdc, 3683 void *p, void *end, bool incremental, 3684 struct rb_root *need_resend, 3685 struct list_head *need_resend_linger) 3686 { 3687 struct ceph_osdmap *newmap; 3688 struct rb_node *n; 3689 bool skipped_map = false; 3690 bool was_full; 3691 3692 was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3693 set_pool_was_full(osdc); 3694 3695 if (incremental) 3696 newmap = osdmap_apply_incremental(&p, end, osdc->osdmap); 3697 else 3698 newmap = ceph_osdmap_decode(&p, end); 3699 if (IS_ERR(newmap)) 3700 return PTR_ERR(newmap); 3701 3702 if (newmap != osdc->osdmap) { 3703 /* 3704 * Preserve ->was_full before destroying the old map. 3705 * For pools that weren't in the old map, ->was_full 3706 * should be false. 3707 */ 3708 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) { 3709 struct ceph_pg_pool_info *pi = 3710 rb_entry(n, struct ceph_pg_pool_info, node); 3711 struct ceph_pg_pool_info *old_pi; 3712 3713 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id); 3714 if (old_pi) 3715 pi->was_full = old_pi->was_full; 3716 else 3717 WARN_ON(pi->was_full); 3718 } 3719 3720 if (osdc->osdmap->epoch && 3721 osdc->osdmap->epoch + 1 < newmap->epoch) { 3722 WARN_ON(incremental); 3723 skipped_map = true; 3724 } 3725 3726 ceph_osdmap_destroy(osdc->osdmap); 3727 osdc->osdmap = newmap; 3728 } 3729 3730 was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3731 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true, 3732 need_resend, need_resend_linger); 3733 3734 for (n = rb_first(&osdc->osds); n; ) { 3735 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3736 3737 n = rb_next(n); /* close_osd() */ 3738 3739 scan_requests(osd, skipped_map, was_full, true, need_resend, 3740 need_resend_linger); 3741 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 3742 memcmp(&osd->o_con.peer_addr, 3743 ceph_osd_addr(osdc->osdmap, osd->o_osd), 3744 sizeof(struct ceph_entity_addr))) 3745 close_osd(osd); 3746 } 3747 3748 return 0; 3749 } 3750 3751 static void kick_requests(struct ceph_osd_client *osdc, 3752 struct rb_root *need_resend, 3753 struct list_head *need_resend_linger) 3754 { 3755 struct ceph_osd_linger_request *lreq, *nlreq; 3756 enum calc_target_result ct_res; 3757 struct rb_node *n; 3758 3759 /* make sure need_resend targets reflect latest map */ 3760 for (n = rb_first(need_resend); n; ) { 3761 struct ceph_osd_request *req = 3762 rb_entry(n, struct ceph_osd_request, r_node); 3763 3764 n = rb_next(n); 3765 3766 if (req->r_t.epoch < osdc->osdmap->epoch) { 3767 ct_res = calc_target(osdc, &req->r_t, NULL, false); 3768 if (ct_res == CALC_TARGET_POOL_DNE) { 3769 erase_request(need_resend, req); 3770 check_pool_dne(req); 3771 } 3772 } 3773 } 3774 3775 for (n = rb_first(need_resend); n; ) { 3776 struct ceph_osd_request *req = 3777 rb_entry(n, struct ceph_osd_request, r_node); 3778 struct ceph_osd *osd; 3779 3780 n = rb_next(n); 3781 erase_request(need_resend, req); /* before link_request() */ 3782 3783 osd = lookup_create_osd(osdc, req->r_t.osd, true); 3784 link_request(osd, req); 3785 if (!req->r_linger) { 3786 if (!osd_homeless(osd) && !req->r_t.paused) 3787 send_request(req); 3788 } else { 3789 cancel_linger_request(req); 3790 } 3791 } 3792 3793 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { 3794 if (!osd_homeless(lreq->osd)) 3795 send_linger(lreq); 3796 3797 list_del_init(&lreq->scan_item); 3798 } 3799 } 3800 3801 /* 3802 * Process updated osd map. 3803 * 3804 * The message contains any number of incremental and full maps, normally 3805 * indicating some sort of topology change in the cluster. Kick requests 3806 * off to different OSDs as needed. 3807 */ 3808 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 3809 { 3810 void *p = msg->front.iov_base; 3811 void *const end = p + msg->front.iov_len; 3812 u32 nr_maps, maplen; 3813 u32 epoch; 3814 struct ceph_fsid fsid; 3815 struct rb_root need_resend = RB_ROOT; 3816 LIST_HEAD(need_resend_linger); 3817 bool handled_incremental = false; 3818 bool was_pauserd, was_pausewr; 3819 bool pauserd, pausewr; 3820 int err; 3821 3822 dout("%s have %u\n", __func__, osdc->osdmap->epoch); 3823 down_write(&osdc->lock); 3824 3825 /* verify fsid */ 3826 ceph_decode_need(&p, end, sizeof(fsid), bad); 3827 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3828 if (ceph_check_fsid(osdc->client, &fsid) < 0) 3829 goto bad; 3830 3831 was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3832 was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3833 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3834 have_pool_full(osdc); 3835 3836 /* incremental maps */ 3837 ceph_decode_32_safe(&p, end, nr_maps, bad); 3838 dout(" %d inc maps\n", nr_maps); 3839 while (nr_maps > 0) { 3840 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3841 epoch = ceph_decode_32(&p); 3842 maplen = ceph_decode_32(&p); 3843 ceph_decode_need(&p, end, maplen, bad); 3844 if (osdc->osdmap->epoch && 3845 osdc->osdmap->epoch + 1 == epoch) { 3846 dout("applying incremental map %u len %d\n", 3847 epoch, maplen); 3848 err = handle_one_map(osdc, p, p + maplen, true, 3849 &need_resend, &need_resend_linger); 3850 if (err) 3851 goto bad; 3852 handled_incremental = true; 3853 } else { 3854 dout("ignoring incremental map %u len %d\n", 3855 epoch, maplen); 3856 } 3857 p += maplen; 3858 nr_maps--; 3859 } 3860 if (handled_incremental) 3861 goto done; 3862 3863 /* full maps */ 3864 ceph_decode_32_safe(&p, end, nr_maps, bad); 3865 dout(" %d full maps\n", nr_maps); 3866 while (nr_maps) { 3867 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3868 epoch = ceph_decode_32(&p); 3869 maplen = ceph_decode_32(&p); 3870 ceph_decode_need(&p, end, maplen, bad); 3871 if (nr_maps > 1) { 3872 dout("skipping non-latest full map %u len %d\n", 3873 epoch, maplen); 3874 } else if (osdc->osdmap->epoch >= epoch) { 3875 dout("skipping full map %u len %d, " 3876 "older than our %u\n", epoch, maplen, 3877 osdc->osdmap->epoch); 3878 } else { 3879 dout("taking full map %u len %d\n", epoch, maplen); 3880 err = handle_one_map(osdc, p, p + maplen, false, 3881 &need_resend, &need_resend_linger); 3882 if (err) 3883 goto bad; 3884 } 3885 p += maplen; 3886 nr_maps--; 3887 } 3888 3889 done: 3890 /* 3891 * subscribe to subsequent osdmap updates if full to ensure 3892 * we find out when we are no longer full and stop returning 3893 * ENOSPC. 3894 */ 3895 pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3896 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3897 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3898 have_pool_full(osdc); 3899 if (was_pauserd || was_pausewr || pauserd || pausewr || 3900 osdc->osdmap->epoch < osdc->epoch_barrier) 3901 maybe_request_map(osdc); 3902 3903 kick_requests(osdc, &need_resend, &need_resend_linger); 3904 3905 ceph_osdc_abort_on_full(osdc); 3906 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 3907 osdc->osdmap->epoch); 3908 up_write(&osdc->lock); 3909 wake_up_all(&osdc->client->auth_wq); 3910 return; 3911 3912 bad: 3913 pr_err("osdc handle_map corrupt msg\n"); 3914 ceph_msg_dump(msg); 3915 up_write(&osdc->lock); 3916 } 3917 3918 /* 3919 * Resubmit requests pending on the given osd. 3920 */ 3921 static void kick_osd_requests(struct ceph_osd *osd) 3922 { 3923 struct rb_node *n; 3924 3925 clear_backoffs(osd); 3926 3927 for (n = rb_first(&osd->o_requests); n; ) { 3928 struct ceph_osd_request *req = 3929 rb_entry(n, struct ceph_osd_request, r_node); 3930 3931 n = rb_next(n); /* cancel_linger_request() */ 3932 3933 if (!req->r_linger) { 3934 if (!req->r_t.paused) 3935 send_request(req); 3936 } else { 3937 cancel_linger_request(req); 3938 } 3939 } 3940 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { 3941 struct ceph_osd_linger_request *lreq = 3942 rb_entry(n, struct ceph_osd_linger_request, node); 3943 3944 send_linger(lreq); 3945 } 3946 } 3947 3948 /* 3949 * If the osd connection drops, we need to resubmit all requests. 3950 */ 3951 static void osd_fault(struct ceph_connection *con) 3952 { 3953 struct ceph_osd *osd = con->private; 3954 struct ceph_osd_client *osdc = osd->o_osdc; 3955 3956 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 3957 3958 down_write(&osdc->lock); 3959 if (!osd_registered(osd)) { 3960 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3961 goto out_unlock; 3962 } 3963 3964 if (!reopen_osd(osd)) 3965 kick_osd_requests(osd); 3966 maybe_request_map(osdc); 3967 3968 out_unlock: 3969 up_write(&osdc->lock); 3970 } 3971 3972 struct MOSDBackoff { 3973 struct ceph_spg spgid; 3974 u32 map_epoch; 3975 u8 op; 3976 u64 id; 3977 struct ceph_hobject_id *begin; 3978 struct ceph_hobject_id *end; 3979 }; 3980 3981 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m) 3982 { 3983 void *p = msg->front.iov_base; 3984 void *const end = p + msg->front.iov_len; 3985 u8 struct_v; 3986 u32 struct_len; 3987 int ret; 3988 3989 ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len); 3990 if (ret) 3991 return ret; 3992 3993 ret = ceph_decode_pgid(&p, end, &m->spgid.pgid); 3994 if (ret) 3995 return ret; 3996 3997 ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval); 3998 ceph_decode_32_safe(&p, end, m->map_epoch, e_inval); 3999 ceph_decode_8_safe(&p, end, m->op, e_inval); 4000 ceph_decode_64_safe(&p, end, m->id, e_inval); 4001 4002 m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO); 4003 if (!m->begin) 4004 return -ENOMEM; 4005 4006 ret = decode_hoid(&p, end, m->begin); 4007 if (ret) { 4008 free_hoid(m->begin); 4009 return ret; 4010 } 4011 4012 m->end = kzalloc(sizeof(*m->end), GFP_NOIO); 4013 if (!m->end) { 4014 free_hoid(m->begin); 4015 return -ENOMEM; 4016 } 4017 4018 ret = decode_hoid(&p, end, m->end); 4019 if (ret) { 4020 free_hoid(m->begin); 4021 free_hoid(m->end); 4022 return ret; 4023 } 4024 4025 return 0; 4026 4027 e_inval: 4028 return -EINVAL; 4029 } 4030 4031 static struct ceph_msg *create_backoff_message( 4032 const struct ceph_osd_backoff *backoff, 4033 u32 map_epoch) 4034 { 4035 struct ceph_msg *msg; 4036 void *p, *end; 4037 int msg_size; 4038 4039 msg_size = CEPH_ENCODING_START_BLK_LEN + 4040 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 4041 msg_size += 4 + 1 + 8; /* map_epoch, op, id */ 4042 msg_size += CEPH_ENCODING_START_BLK_LEN + 4043 hoid_encoding_size(backoff->begin); 4044 msg_size += CEPH_ENCODING_START_BLK_LEN + 4045 hoid_encoding_size(backoff->end); 4046 4047 msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true); 4048 if (!msg) 4049 return NULL; 4050 4051 p = msg->front.iov_base; 4052 end = p + msg->front_alloc_len; 4053 4054 encode_spgid(&p, &backoff->spgid); 4055 ceph_encode_32(&p, map_epoch); 4056 ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK); 4057 ceph_encode_64(&p, backoff->id); 4058 encode_hoid(&p, end, backoff->begin); 4059 encode_hoid(&p, end, backoff->end); 4060 BUG_ON(p != end); 4061 4062 msg->front.iov_len = p - msg->front.iov_base; 4063 msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */ 4064 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 4065 4066 return msg; 4067 } 4068 4069 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m) 4070 { 4071 struct ceph_spg_mapping *spg; 4072 struct ceph_osd_backoff *backoff; 4073 struct ceph_msg *msg; 4074 4075 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4076 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4077 4078 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid); 4079 if (!spg) { 4080 spg = alloc_spg_mapping(); 4081 if (!spg) { 4082 pr_err("%s failed to allocate spg\n", __func__); 4083 return; 4084 } 4085 spg->spgid = m->spgid; /* struct */ 4086 insert_spg_mapping(&osd->o_backoff_mappings, spg); 4087 } 4088 4089 backoff = alloc_backoff(); 4090 if (!backoff) { 4091 pr_err("%s failed to allocate backoff\n", __func__); 4092 return; 4093 } 4094 backoff->spgid = m->spgid; /* struct */ 4095 backoff->id = m->id; 4096 backoff->begin = m->begin; 4097 m->begin = NULL; /* backoff now owns this */ 4098 backoff->end = m->end; 4099 m->end = NULL; /* ditto */ 4100 4101 insert_backoff(&spg->backoffs, backoff); 4102 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4103 4104 /* 4105 * Ack with original backoff's epoch so that the OSD can 4106 * discard this if there was a PG split. 4107 */ 4108 msg = create_backoff_message(backoff, m->map_epoch); 4109 if (!msg) { 4110 pr_err("%s failed to allocate msg\n", __func__); 4111 return; 4112 } 4113 ceph_con_send(&osd->o_con, msg); 4114 } 4115 4116 static bool target_contained_by(const struct ceph_osd_request_target *t, 4117 const struct ceph_hobject_id *begin, 4118 const struct ceph_hobject_id *end) 4119 { 4120 struct ceph_hobject_id hoid; 4121 int cmp; 4122 4123 hoid_fill_from_target(&hoid, t); 4124 cmp = hoid_compare(&hoid, begin); 4125 return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0); 4126 } 4127 4128 static void handle_backoff_unblock(struct ceph_osd *osd, 4129 const struct MOSDBackoff *m) 4130 { 4131 struct ceph_spg_mapping *spg; 4132 struct ceph_osd_backoff *backoff; 4133 struct rb_node *n; 4134 4135 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4136 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4137 4138 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id); 4139 if (!backoff) { 4140 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n", 4141 __func__, osd->o_osd, m->spgid.pgid.pool, 4142 m->spgid.pgid.seed, m->spgid.shard, m->id); 4143 return; 4144 } 4145 4146 if (hoid_compare(backoff->begin, m->begin) && 4147 hoid_compare(backoff->end, m->end)) { 4148 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n", 4149 __func__, osd->o_osd, m->spgid.pgid.pool, 4150 m->spgid.pgid.seed, m->spgid.shard, m->id); 4151 /* unblock it anyway... */ 4152 } 4153 4154 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid); 4155 BUG_ON(!spg); 4156 4157 erase_backoff(&spg->backoffs, backoff); 4158 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4159 free_backoff(backoff); 4160 4161 if (RB_EMPTY_ROOT(&spg->backoffs)) { 4162 erase_spg_mapping(&osd->o_backoff_mappings, spg); 4163 free_spg_mapping(spg); 4164 } 4165 4166 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 4167 struct ceph_osd_request *req = 4168 rb_entry(n, struct ceph_osd_request, r_node); 4169 4170 if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) { 4171 /* 4172 * Match against @m, not @backoff -- the PG may 4173 * have split on the OSD. 4174 */ 4175 if (target_contained_by(&req->r_t, m->begin, m->end)) { 4176 /* 4177 * If no other installed backoff applies, 4178 * resend. 4179 */ 4180 send_request(req); 4181 } 4182 } 4183 } 4184 } 4185 4186 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg) 4187 { 4188 struct ceph_osd_client *osdc = osd->o_osdc; 4189 struct MOSDBackoff m; 4190 int ret; 4191 4192 down_read(&osdc->lock); 4193 if (!osd_registered(osd)) { 4194 dout("%s osd%d unknown\n", __func__, osd->o_osd); 4195 up_read(&osdc->lock); 4196 return; 4197 } 4198 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 4199 4200 mutex_lock(&osd->lock); 4201 ret = decode_MOSDBackoff(msg, &m); 4202 if (ret) { 4203 pr_err("failed to decode MOSDBackoff: %d\n", ret); 4204 ceph_msg_dump(msg); 4205 goto out_unlock; 4206 } 4207 4208 switch (m.op) { 4209 case CEPH_OSD_BACKOFF_OP_BLOCK: 4210 handle_backoff_block(osd, &m); 4211 break; 4212 case CEPH_OSD_BACKOFF_OP_UNBLOCK: 4213 handle_backoff_unblock(osd, &m); 4214 break; 4215 default: 4216 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op); 4217 } 4218 4219 free_hoid(m.begin); 4220 free_hoid(m.end); 4221 4222 out_unlock: 4223 mutex_unlock(&osd->lock); 4224 up_read(&osdc->lock); 4225 } 4226 4227 /* 4228 * Process osd watch notifications 4229 */ 4230 static void handle_watch_notify(struct ceph_osd_client *osdc, 4231 struct ceph_msg *msg) 4232 { 4233 void *p = msg->front.iov_base; 4234 void *const end = p + msg->front.iov_len; 4235 struct ceph_osd_linger_request *lreq; 4236 struct linger_work *lwork; 4237 u8 proto_ver, opcode; 4238 u64 cookie, notify_id; 4239 u64 notifier_id = 0; 4240 s32 return_code = 0; 4241 void *payload = NULL; 4242 u32 payload_len = 0; 4243 4244 ceph_decode_8_safe(&p, end, proto_ver, bad); 4245 ceph_decode_8_safe(&p, end, opcode, bad); 4246 ceph_decode_64_safe(&p, end, cookie, bad); 4247 p += 8; /* skip ver */ 4248 ceph_decode_64_safe(&p, end, notify_id, bad); 4249 4250 if (proto_ver >= 1) { 4251 ceph_decode_32_safe(&p, end, payload_len, bad); 4252 ceph_decode_need(&p, end, payload_len, bad); 4253 payload = p; 4254 p += payload_len; 4255 } 4256 4257 if (le16_to_cpu(msg->hdr.version) >= 2) 4258 ceph_decode_32_safe(&p, end, return_code, bad); 4259 4260 if (le16_to_cpu(msg->hdr.version) >= 3) 4261 ceph_decode_64_safe(&p, end, notifier_id, bad); 4262 4263 down_read(&osdc->lock); 4264 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); 4265 if (!lreq) { 4266 dout("%s opcode %d cookie %llu dne\n", __func__, opcode, 4267 cookie); 4268 goto out_unlock_osdc; 4269 } 4270 4271 mutex_lock(&lreq->lock); 4272 dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__, 4273 opcode, cookie, lreq, lreq->is_watch); 4274 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { 4275 if (!lreq->last_error) { 4276 lreq->last_error = -ENOTCONN; 4277 queue_watch_error(lreq); 4278 } 4279 } else if (!lreq->is_watch) { 4280 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */ 4281 if (lreq->notify_id && lreq->notify_id != notify_id) { 4282 dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq, 4283 lreq->notify_id, notify_id); 4284 } else if (!completion_done(&lreq->notify_finish_wait)) { 4285 struct ceph_msg_data *data = 4286 list_first_entry_or_null(&msg->data, 4287 struct ceph_msg_data, 4288 links); 4289 4290 if (data) { 4291 if (lreq->preply_pages) { 4292 WARN_ON(data->type != 4293 CEPH_MSG_DATA_PAGES); 4294 *lreq->preply_pages = data->pages; 4295 *lreq->preply_len = data->length; 4296 } else { 4297 ceph_release_page_vector(data->pages, 4298 calc_pages_for(0, data->length)); 4299 } 4300 } 4301 lreq->notify_finish_error = return_code; 4302 complete_all(&lreq->notify_finish_wait); 4303 } 4304 } else { 4305 /* CEPH_WATCH_EVENT_NOTIFY */ 4306 lwork = lwork_alloc(lreq, do_watch_notify); 4307 if (!lwork) { 4308 pr_err("failed to allocate notify-lwork\n"); 4309 goto out_unlock_lreq; 4310 } 4311 4312 lwork->notify.notify_id = notify_id; 4313 lwork->notify.notifier_id = notifier_id; 4314 lwork->notify.payload = payload; 4315 lwork->notify.payload_len = payload_len; 4316 lwork->notify.msg = ceph_msg_get(msg); 4317 lwork_queue(lwork); 4318 } 4319 4320 out_unlock_lreq: 4321 mutex_unlock(&lreq->lock); 4322 out_unlock_osdc: 4323 up_read(&osdc->lock); 4324 return; 4325 4326 bad: 4327 pr_err("osdc handle_watch_notify corrupt msg\n"); 4328 } 4329 4330 /* 4331 * Register request, send initial attempt. 4332 */ 4333 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 4334 struct ceph_osd_request *req, 4335 bool nofail) 4336 { 4337 down_read(&osdc->lock); 4338 submit_request(req, false); 4339 up_read(&osdc->lock); 4340 4341 return 0; 4342 } 4343 EXPORT_SYMBOL(ceph_osdc_start_request); 4344 4345 /* 4346 * Unregister a registered request. The request is not completed: 4347 * ->r_result isn't set and __complete_request() isn't called. 4348 */ 4349 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 4350 { 4351 struct ceph_osd_client *osdc = req->r_osdc; 4352 4353 down_write(&osdc->lock); 4354 if (req->r_osd) 4355 cancel_request(req); 4356 up_write(&osdc->lock); 4357 } 4358 EXPORT_SYMBOL(ceph_osdc_cancel_request); 4359 4360 /* 4361 * @timeout: in jiffies, 0 means "wait forever" 4362 */ 4363 static int wait_request_timeout(struct ceph_osd_request *req, 4364 unsigned long timeout) 4365 { 4366 long left; 4367 4368 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 4369 left = wait_for_completion_killable_timeout(&req->r_completion, 4370 ceph_timeout_jiffies(timeout)); 4371 if (left <= 0) { 4372 left = left ?: -ETIMEDOUT; 4373 ceph_osdc_cancel_request(req); 4374 } else { 4375 left = req->r_result; /* completed */ 4376 } 4377 4378 return left; 4379 } 4380 4381 /* 4382 * wait for a request to complete 4383 */ 4384 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 4385 struct ceph_osd_request *req) 4386 { 4387 return wait_request_timeout(req, 0); 4388 } 4389 EXPORT_SYMBOL(ceph_osdc_wait_request); 4390 4391 /* 4392 * sync - wait for all in-flight requests to flush. avoid starvation. 4393 */ 4394 void ceph_osdc_sync(struct ceph_osd_client *osdc) 4395 { 4396 struct rb_node *n, *p; 4397 u64 last_tid = atomic64_read(&osdc->last_tid); 4398 4399 again: 4400 down_read(&osdc->lock); 4401 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 4402 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 4403 4404 mutex_lock(&osd->lock); 4405 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) { 4406 struct ceph_osd_request *req = 4407 rb_entry(p, struct ceph_osd_request, r_node); 4408 4409 if (req->r_tid > last_tid) 4410 break; 4411 4412 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE)) 4413 continue; 4414 4415 ceph_osdc_get_request(req); 4416 mutex_unlock(&osd->lock); 4417 up_read(&osdc->lock); 4418 dout("%s waiting on req %p tid %llu last_tid %llu\n", 4419 __func__, req, req->r_tid, last_tid); 4420 wait_for_completion(&req->r_completion); 4421 ceph_osdc_put_request(req); 4422 goto again; 4423 } 4424 4425 mutex_unlock(&osd->lock); 4426 } 4427 4428 up_read(&osdc->lock); 4429 dout("%s done last_tid %llu\n", __func__, last_tid); 4430 } 4431 EXPORT_SYMBOL(ceph_osdc_sync); 4432 4433 static struct ceph_osd_request * 4434 alloc_linger_request(struct ceph_osd_linger_request *lreq) 4435 { 4436 struct ceph_osd_request *req; 4437 4438 req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO); 4439 if (!req) 4440 return NULL; 4441 4442 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4443 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4444 4445 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { 4446 ceph_osdc_put_request(req); 4447 return NULL; 4448 } 4449 4450 return req; 4451 } 4452 4453 /* 4454 * Returns a handle, caller owns a ref. 4455 */ 4456 struct ceph_osd_linger_request * 4457 ceph_osdc_watch(struct ceph_osd_client *osdc, 4458 struct ceph_object_id *oid, 4459 struct ceph_object_locator *oloc, 4460 rados_watchcb2_t wcb, 4461 rados_watcherrcb_t errcb, 4462 void *data) 4463 { 4464 struct ceph_osd_linger_request *lreq; 4465 int ret; 4466 4467 lreq = linger_alloc(osdc); 4468 if (!lreq) 4469 return ERR_PTR(-ENOMEM); 4470 4471 lreq->is_watch = true; 4472 lreq->wcb = wcb; 4473 lreq->errcb = errcb; 4474 lreq->data = data; 4475 lreq->watch_valid_thru = jiffies; 4476 4477 ceph_oid_copy(&lreq->t.base_oid, oid); 4478 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4479 lreq->t.flags = CEPH_OSD_FLAG_WRITE; 4480 ktime_get_real_ts(&lreq->mtime); 4481 4482 lreq->reg_req = alloc_linger_request(lreq); 4483 if (!lreq->reg_req) { 4484 ret = -ENOMEM; 4485 goto err_put_lreq; 4486 } 4487 4488 lreq->ping_req = alloc_linger_request(lreq); 4489 if (!lreq->ping_req) { 4490 ret = -ENOMEM; 4491 goto err_put_lreq; 4492 } 4493 4494 down_write(&osdc->lock); 4495 linger_register(lreq); /* before osd_req_op_* */ 4496 osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, 4497 CEPH_OSD_WATCH_OP_WATCH); 4498 osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, 4499 CEPH_OSD_WATCH_OP_PING); 4500 linger_submit(lreq); 4501 up_write(&osdc->lock); 4502 4503 ret = linger_reg_commit_wait(lreq); 4504 if (ret) { 4505 linger_cancel(lreq); 4506 goto err_put_lreq; 4507 } 4508 4509 return lreq; 4510 4511 err_put_lreq: 4512 linger_put(lreq); 4513 return ERR_PTR(ret); 4514 } 4515 EXPORT_SYMBOL(ceph_osdc_watch); 4516 4517 /* 4518 * Releases a ref. 4519 * 4520 * Times out after mount_timeout to preserve rbd unmap behaviour 4521 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap 4522 * with mount_timeout"). 4523 */ 4524 int ceph_osdc_unwatch(struct ceph_osd_client *osdc, 4525 struct ceph_osd_linger_request *lreq) 4526 { 4527 struct ceph_options *opts = osdc->client->options; 4528 struct ceph_osd_request *req; 4529 int ret; 4530 4531 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4532 if (!req) 4533 return -ENOMEM; 4534 4535 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4536 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4537 req->r_flags = CEPH_OSD_FLAG_WRITE; 4538 ktime_get_real_ts(&req->r_mtime); 4539 osd_req_op_watch_init(req, 0, lreq->linger_id, 4540 CEPH_OSD_WATCH_OP_UNWATCH); 4541 4542 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4543 if (ret) 4544 goto out_put_req; 4545 4546 ceph_osdc_start_request(osdc, req, false); 4547 linger_cancel(lreq); 4548 linger_put(lreq); 4549 ret = wait_request_timeout(req, opts->mount_timeout); 4550 4551 out_put_req: 4552 ceph_osdc_put_request(req); 4553 return ret; 4554 } 4555 EXPORT_SYMBOL(ceph_osdc_unwatch); 4556 4557 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, 4558 u64 notify_id, u64 cookie, void *payload, 4559 size_t payload_len) 4560 { 4561 struct ceph_osd_req_op *op; 4562 struct ceph_pagelist *pl; 4563 int ret; 4564 4565 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); 4566 4567 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4568 if (!pl) 4569 return -ENOMEM; 4570 4571 ceph_pagelist_init(pl); 4572 ret = ceph_pagelist_encode_64(pl, notify_id); 4573 ret |= ceph_pagelist_encode_64(pl, cookie); 4574 if (payload) { 4575 ret |= ceph_pagelist_encode_32(pl, payload_len); 4576 ret |= ceph_pagelist_append(pl, payload, payload_len); 4577 } else { 4578 ret |= ceph_pagelist_encode_32(pl, 0); 4579 } 4580 if (ret) { 4581 ceph_pagelist_release(pl); 4582 return -ENOMEM; 4583 } 4584 4585 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); 4586 op->indata_len = pl->length; 4587 return 0; 4588 } 4589 4590 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, 4591 struct ceph_object_id *oid, 4592 struct ceph_object_locator *oloc, 4593 u64 notify_id, 4594 u64 cookie, 4595 void *payload, 4596 size_t payload_len) 4597 { 4598 struct ceph_osd_request *req; 4599 int ret; 4600 4601 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4602 if (!req) 4603 return -ENOMEM; 4604 4605 ceph_oid_copy(&req->r_base_oid, oid); 4606 ceph_oloc_copy(&req->r_base_oloc, oloc); 4607 req->r_flags = CEPH_OSD_FLAG_READ; 4608 4609 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4610 if (ret) 4611 goto out_put_req; 4612 4613 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, 4614 payload_len); 4615 if (ret) 4616 goto out_put_req; 4617 4618 ceph_osdc_start_request(osdc, req, false); 4619 ret = ceph_osdc_wait_request(osdc, req); 4620 4621 out_put_req: 4622 ceph_osdc_put_request(req); 4623 return ret; 4624 } 4625 EXPORT_SYMBOL(ceph_osdc_notify_ack); 4626 4627 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which, 4628 u64 cookie, u32 prot_ver, u32 timeout, 4629 void *payload, size_t payload_len) 4630 { 4631 struct ceph_osd_req_op *op; 4632 struct ceph_pagelist *pl; 4633 int ret; 4634 4635 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); 4636 op->notify.cookie = cookie; 4637 4638 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4639 if (!pl) 4640 return -ENOMEM; 4641 4642 ceph_pagelist_init(pl); 4643 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ 4644 ret |= ceph_pagelist_encode_32(pl, timeout); 4645 ret |= ceph_pagelist_encode_32(pl, payload_len); 4646 ret |= ceph_pagelist_append(pl, payload, payload_len); 4647 if (ret) { 4648 ceph_pagelist_release(pl); 4649 return -ENOMEM; 4650 } 4651 4652 ceph_osd_data_pagelist_init(&op->notify.request_data, pl); 4653 op->indata_len = pl->length; 4654 return 0; 4655 } 4656 4657 /* 4658 * @timeout: in seconds 4659 * 4660 * @preply_{pages,len} are initialized both on success and error. 4661 * The caller is responsible for: 4662 * 4663 * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)) 4664 */ 4665 int ceph_osdc_notify(struct ceph_osd_client *osdc, 4666 struct ceph_object_id *oid, 4667 struct ceph_object_locator *oloc, 4668 void *payload, 4669 size_t payload_len, 4670 u32 timeout, 4671 struct page ***preply_pages, 4672 size_t *preply_len) 4673 { 4674 struct ceph_osd_linger_request *lreq; 4675 struct page **pages; 4676 int ret; 4677 4678 WARN_ON(!timeout); 4679 if (preply_pages) { 4680 *preply_pages = NULL; 4681 *preply_len = 0; 4682 } 4683 4684 lreq = linger_alloc(osdc); 4685 if (!lreq) 4686 return -ENOMEM; 4687 4688 lreq->preply_pages = preply_pages; 4689 lreq->preply_len = preply_len; 4690 4691 ceph_oid_copy(&lreq->t.base_oid, oid); 4692 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4693 lreq->t.flags = CEPH_OSD_FLAG_READ; 4694 4695 lreq->reg_req = alloc_linger_request(lreq); 4696 if (!lreq->reg_req) { 4697 ret = -ENOMEM; 4698 goto out_put_lreq; 4699 } 4700 4701 /* for notify_id */ 4702 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4703 if (IS_ERR(pages)) { 4704 ret = PTR_ERR(pages); 4705 goto out_put_lreq; 4706 } 4707 4708 down_write(&osdc->lock); 4709 linger_register(lreq); /* before osd_req_op_* */ 4710 ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1, 4711 timeout, payload, payload_len); 4712 if (ret) { 4713 linger_unregister(lreq); 4714 up_write(&osdc->lock); 4715 ceph_release_page_vector(pages, 1); 4716 goto out_put_lreq; 4717 } 4718 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, 4719 response_data), 4720 pages, PAGE_SIZE, 0, false, true); 4721 linger_submit(lreq); 4722 up_write(&osdc->lock); 4723 4724 ret = linger_reg_commit_wait(lreq); 4725 if (!ret) 4726 ret = linger_notify_finish_wait(lreq); 4727 else 4728 dout("lreq %p failed to initiate notify %d\n", lreq, ret); 4729 4730 linger_cancel(lreq); 4731 out_put_lreq: 4732 linger_put(lreq); 4733 return ret; 4734 } 4735 EXPORT_SYMBOL(ceph_osdc_notify); 4736 4737 /* 4738 * Return the number of milliseconds since the watch was last 4739 * confirmed, or an error. If there is an error, the watch is no 4740 * longer valid, and should be destroyed with ceph_osdc_unwatch(). 4741 */ 4742 int ceph_osdc_watch_check(struct ceph_osd_client *osdc, 4743 struct ceph_osd_linger_request *lreq) 4744 { 4745 unsigned long stamp, age; 4746 int ret; 4747 4748 down_read(&osdc->lock); 4749 mutex_lock(&lreq->lock); 4750 stamp = lreq->watch_valid_thru; 4751 if (!list_empty(&lreq->pending_lworks)) { 4752 struct linger_work *lwork = 4753 list_first_entry(&lreq->pending_lworks, 4754 struct linger_work, 4755 pending_item); 4756 4757 if (time_before(lwork->queued_stamp, stamp)) 4758 stamp = lwork->queued_stamp; 4759 } 4760 age = jiffies - stamp; 4761 dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__, 4762 lreq, lreq->linger_id, age, lreq->last_error); 4763 /* we are truncating to msecs, so return a safe upper bound */ 4764 ret = lreq->last_error ?: 1 + jiffies_to_msecs(age); 4765 4766 mutex_unlock(&lreq->lock); 4767 up_read(&osdc->lock); 4768 return ret; 4769 } 4770 4771 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) 4772 { 4773 u8 struct_v; 4774 u32 struct_len; 4775 int ret; 4776 4777 ret = ceph_start_decoding(p, end, 2, "watch_item_t", 4778 &struct_v, &struct_len); 4779 if (ret) 4780 return ret; 4781 4782 ceph_decode_copy(p, &item->name, sizeof(item->name)); 4783 item->cookie = ceph_decode_64(p); 4784 *p += 4; /* skip timeout_seconds */ 4785 if (struct_v >= 2) { 4786 ceph_decode_copy(p, &item->addr, sizeof(item->addr)); 4787 ceph_decode_addr(&item->addr); 4788 } 4789 4790 dout("%s %s%llu cookie %llu addr %s\n", __func__, 4791 ENTITY_NAME(item->name), item->cookie, 4792 ceph_pr_addr(&item->addr.in_addr)); 4793 return 0; 4794 } 4795 4796 static int decode_watchers(void **p, void *end, 4797 struct ceph_watch_item **watchers, 4798 u32 *num_watchers) 4799 { 4800 u8 struct_v; 4801 u32 struct_len; 4802 int i; 4803 int ret; 4804 4805 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t", 4806 &struct_v, &struct_len); 4807 if (ret) 4808 return ret; 4809 4810 *num_watchers = ceph_decode_32(p); 4811 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO); 4812 if (!*watchers) 4813 return -ENOMEM; 4814 4815 for (i = 0; i < *num_watchers; i++) { 4816 ret = decode_watcher(p, end, *watchers + i); 4817 if (ret) { 4818 kfree(*watchers); 4819 return ret; 4820 } 4821 } 4822 4823 return 0; 4824 } 4825 4826 /* 4827 * On success, the caller is responsible for: 4828 * 4829 * kfree(watchers); 4830 */ 4831 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, 4832 struct ceph_object_id *oid, 4833 struct ceph_object_locator *oloc, 4834 struct ceph_watch_item **watchers, 4835 u32 *num_watchers) 4836 { 4837 struct ceph_osd_request *req; 4838 struct page **pages; 4839 int ret; 4840 4841 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4842 if (!req) 4843 return -ENOMEM; 4844 4845 ceph_oid_copy(&req->r_base_oid, oid); 4846 ceph_oloc_copy(&req->r_base_oloc, oloc); 4847 req->r_flags = CEPH_OSD_FLAG_READ; 4848 4849 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4850 if (ret) 4851 goto out_put_req; 4852 4853 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4854 if (IS_ERR(pages)) { 4855 ret = PTR_ERR(pages); 4856 goto out_put_req; 4857 } 4858 4859 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0); 4860 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers, 4861 response_data), 4862 pages, PAGE_SIZE, 0, false, true); 4863 4864 ceph_osdc_start_request(osdc, req, false); 4865 ret = ceph_osdc_wait_request(osdc, req); 4866 if (ret >= 0) { 4867 void *p = page_address(pages[0]); 4868 void *const end = p + req->r_ops[0].outdata_len; 4869 4870 ret = decode_watchers(&p, end, watchers, num_watchers); 4871 } 4872 4873 out_put_req: 4874 ceph_osdc_put_request(req); 4875 return ret; 4876 } 4877 EXPORT_SYMBOL(ceph_osdc_list_watchers); 4878 4879 /* 4880 * Call all pending notify callbacks - for use after a watch is 4881 * unregistered, to make sure no more callbacks for it will be invoked 4882 */ 4883 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 4884 { 4885 dout("%s osdc %p\n", __func__, osdc); 4886 flush_workqueue(osdc->notify_wq); 4887 } 4888 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 4889 4890 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) 4891 { 4892 down_read(&osdc->lock); 4893 maybe_request_map(osdc); 4894 up_read(&osdc->lock); 4895 } 4896 EXPORT_SYMBOL(ceph_osdc_maybe_request_map); 4897 4898 /* 4899 * Execute an OSD class method on an object. 4900 * 4901 * @flags: CEPH_OSD_FLAG_* 4902 * @resp_len: in/out param for reply length 4903 */ 4904 int ceph_osdc_call(struct ceph_osd_client *osdc, 4905 struct ceph_object_id *oid, 4906 struct ceph_object_locator *oloc, 4907 const char *class, const char *method, 4908 unsigned int flags, 4909 struct page *req_page, size_t req_len, 4910 struct page *resp_page, size_t *resp_len) 4911 { 4912 struct ceph_osd_request *req; 4913 int ret; 4914 4915 if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE)) 4916 return -E2BIG; 4917 4918 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4919 if (!req) 4920 return -ENOMEM; 4921 4922 ceph_oid_copy(&req->r_base_oid, oid); 4923 ceph_oloc_copy(&req->r_base_oloc, oloc); 4924 req->r_flags = flags; 4925 4926 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4927 if (ret) 4928 goto out_put_req; 4929 4930 osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); 4931 if (req_page) 4932 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 4933 0, false, false); 4934 if (resp_page) 4935 osd_req_op_cls_response_data_pages(req, 0, &resp_page, 4936 *resp_len, 0, false, false); 4937 4938 ceph_osdc_start_request(osdc, req, false); 4939 ret = ceph_osdc_wait_request(osdc, req); 4940 if (ret >= 0) { 4941 ret = req->r_ops[0].rval; 4942 if (resp_page) 4943 *resp_len = req->r_ops[0].outdata_len; 4944 } 4945 4946 out_put_req: 4947 ceph_osdc_put_request(req); 4948 return ret; 4949 } 4950 EXPORT_SYMBOL(ceph_osdc_call); 4951 4952 /* 4953 * init, shutdown 4954 */ 4955 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 4956 { 4957 int err; 4958 4959 dout("init\n"); 4960 osdc->client = client; 4961 init_rwsem(&osdc->lock); 4962 osdc->osds = RB_ROOT; 4963 INIT_LIST_HEAD(&osdc->osd_lru); 4964 spin_lock_init(&osdc->osd_lru_lock); 4965 osd_init(&osdc->homeless_osd); 4966 osdc->homeless_osd.o_osdc = osdc; 4967 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; 4968 osdc->last_linger_id = CEPH_LINGER_ID_START; 4969 osdc->linger_requests = RB_ROOT; 4970 osdc->map_checks = RB_ROOT; 4971 osdc->linger_map_checks = RB_ROOT; 4972 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 4973 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 4974 4975 err = -ENOMEM; 4976 osdc->osdmap = ceph_osdmap_alloc(); 4977 if (!osdc->osdmap) 4978 goto out; 4979 4980 osdc->req_mempool = mempool_create_slab_pool(10, 4981 ceph_osd_request_cache); 4982 if (!osdc->req_mempool) 4983 goto out_map; 4984 4985 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 4986 PAGE_SIZE, 10, true, "osd_op"); 4987 if (err < 0) 4988 goto out_mempool; 4989 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 4990 PAGE_SIZE, 10, true, "osd_op_reply"); 4991 if (err < 0) 4992 goto out_msgpool; 4993 4994 err = -ENOMEM; 4995 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 4996 if (!osdc->notify_wq) 4997 goto out_msgpool_reply; 4998 4999 schedule_delayed_work(&osdc->timeout_work, 5000 osdc->client->options->osd_keepalive_timeout); 5001 schedule_delayed_work(&osdc->osds_timeout_work, 5002 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 5003 5004 return 0; 5005 5006 out_msgpool_reply: 5007 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5008 out_msgpool: 5009 ceph_msgpool_destroy(&osdc->msgpool_op); 5010 out_mempool: 5011 mempool_destroy(osdc->req_mempool); 5012 out_map: 5013 ceph_osdmap_destroy(osdc->osdmap); 5014 out: 5015 return err; 5016 } 5017 5018 void ceph_osdc_stop(struct ceph_osd_client *osdc) 5019 { 5020 flush_workqueue(osdc->notify_wq); 5021 destroy_workqueue(osdc->notify_wq); 5022 cancel_delayed_work_sync(&osdc->timeout_work); 5023 cancel_delayed_work_sync(&osdc->osds_timeout_work); 5024 5025 down_write(&osdc->lock); 5026 while (!RB_EMPTY_ROOT(&osdc->osds)) { 5027 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 5028 struct ceph_osd, o_node); 5029 close_osd(osd); 5030 } 5031 up_write(&osdc->lock); 5032 WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1); 5033 osd_cleanup(&osdc->homeless_osd); 5034 5035 WARN_ON(!list_empty(&osdc->osd_lru)); 5036 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); 5037 WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks)); 5038 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks)); 5039 WARN_ON(atomic_read(&osdc->num_requests)); 5040 WARN_ON(atomic_read(&osdc->num_homeless)); 5041 5042 ceph_osdmap_destroy(osdc->osdmap); 5043 mempool_destroy(osdc->req_mempool); 5044 ceph_msgpool_destroy(&osdc->msgpool_op); 5045 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5046 } 5047 5048 /* 5049 * Read some contiguous pages. If we cross a stripe boundary, shorten 5050 * *plen. Return number of bytes read, or error. 5051 */ 5052 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 5053 struct ceph_vino vino, struct ceph_file_layout *layout, 5054 u64 off, u64 *plen, 5055 u32 truncate_seq, u64 truncate_size, 5056 struct page **pages, int num_pages, int page_align) 5057 { 5058 struct ceph_osd_request *req; 5059 int rc = 0; 5060 5061 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 5062 vino.snap, off, *plen); 5063 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, 5064 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 5065 NULL, truncate_seq, truncate_size, 5066 false); 5067 if (IS_ERR(req)) 5068 return PTR_ERR(req); 5069 5070 /* it may be a short read due to an object boundary */ 5071 osd_req_op_extent_osd_data_pages(req, 0, 5072 pages, *plen, page_align, false, false); 5073 5074 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 5075 off, *plen, *plen, page_align); 5076 5077 rc = ceph_osdc_start_request(osdc, req, false); 5078 if (!rc) 5079 rc = ceph_osdc_wait_request(osdc, req); 5080 5081 ceph_osdc_put_request(req); 5082 dout("readpages result %d\n", rc); 5083 return rc; 5084 } 5085 EXPORT_SYMBOL(ceph_osdc_readpages); 5086 5087 /* 5088 * do a synchronous write on N pages 5089 */ 5090 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 5091 struct ceph_file_layout *layout, 5092 struct ceph_snap_context *snapc, 5093 u64 off, u64 len, 5094 u32 truncate_seq, u64 truncate_size, 5095 struct timespec *mtime, 5096 struct page **pages, int num_pages) 5097 { 5098 struct ceph_osd_request *req; 5099 int rc = 0; 5100 int page_align = off & ~PAGE_MASK; 5101 5102 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 5103 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 5104 snapc, truncate_seq, truncate_size, 5105 true); 5106 if (IS_ERR(req)) 5107 return PTR_ERR(req); 5108 5109 /* it may be a short write due to an object boundary */ 5110 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 5111 false, false); 5112 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 5113 5114 req->r_mtime = *mtime; 5115 rc = ceph_osdc_start_request(osdc, req, true); 5116 if (!rc) 5117 rc = ceph_osdc_wait_request(osdc, req); 5118 5119 ceph_osdc_put_request(req); 5120 if (rc == 0) 5121 rc = len; 5122 dout("writepages result %d\n", rc); 5123 return rc; 5124 } 5125 EXPORT_SYMBOL(ceph_osdc_writepages); 5126 5127 int __init ceph_osdc_setup(void) 5128 { 5129 size_t size = sizeof(struct ceph_osd_request) + 5130 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 5131 5132 BUG_ON(ceph_osd_request_cache); 5133 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 5134 0, 0, NULL); 5135 5136 return ceph_osd_request_cache ? 0 : -ENOMEM; 5137 } 5138 5139 void ceph_osdc_cleanup(void) 5140 { 5141 BUG_ON(!ceph_osd_request_cache); 5142 kmem_cache_destroy(ceph_osd_request_cache); 5143 ceph_osd_request_cache = NULL; 5144 } 5145 5146 /* 5147 * handle incoming message 5148 */ 5149 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5150 { 5151 struct ceph_osd *osd = con->private; 5152 struct ceph_osd_client *osdc = osd->o_osdc; 5153 int type = le16_to_cpu(msg->hdr.type); 5154 5155 switch (type) { 5156 case CEPH_MSG_OSD_MAP: 5157 ceph_osdc_handle_map(osdc, msg); 5158 break; 5159 case CEPH_MSG_OSD_OPREPLY: 5160 handle_reply(osd, msg); 5161 break; 5162 case CEPH_MSG_OSD_BACKOFF: 5163 handle_backoff(osd, msg); 5164 break; 5165 case CEPH_MSG_WATCH_NOTIFY: 5166 handle_watch_notify(osdc, msg); 5167 break; 5168 5169 default: 5170 pr_err("received unknown message type %d %s\n", type, 5171 ceph_msg_type_name(type)); 5172 } 5173 5174 ceph_msg_put(msg); 5175 } 5176 5177 /* 5178 * Lookup and return message for incoming reply. Don't try to do 5179 * anything about a larger than preallocated data portion of the 5180 * message at the moment - for now, just skip the message. 5181 */ 5182 static struct ceph_msg *get_reply(struct ceph_connection *con, 5183 struct ceph_msg_header *hdr, 5184 int *skip) 5185 { 5186 struct ceph_osd *osd = con->private; 5187 struct ceph_osd_client *osdc = osd->o_osdc; 5188 struct ceph_msg *m = NULL; 5189 struct ceph_osd_request *req; 5190 int front_len = le32_to_cpu(hdr->front_len); 5191 int data_len = le32_to_cpu(hdr->data_len); 5192 u64 tid = le64_to_cpu(hdr->tid); 5193 5194 down_read(&osdc->lock); 5195 if (!osd_registered(osd)) { 5196 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd); 5197 *skip = 1; 5198 goto out_unlock_osdc; 5199 } 5200 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num)); 5201 5202 mutex_lock(&osd->lock); 5203 req = lookup_request(&osd->o_requests, tid); 5204 if (!req) { 5205 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 5206 osd->o_osd, tid); 5207 *skip = 1; 5208 goto out_unlock_session; 5209 } 5210 5211 ceph_msg_revoke_incoming(req->r_reply); 5212 5213 if (front_len > req->r_reply->front_alloc_len) { 5214 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 5215 __func__, osd->o_osd, req->r_tid, front_len, 5216 req->r_reply->front_alloc_len); 5217 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 5218 false); 5219 if (!m) 5220 goto out_unlock_session; 5221 ceph_msg_put(req->r_reply); 5222 req->r_reply = m; 5223 } 5224 5225 if (data_len > req->r_reply->data_length) { 5226 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 5227 __func__, osd->o_osd, req->r_tid, data_len, 5228 req->r_reply->data_length); 5229 m = NULL; 5230 *skip = 1; 5231 goto out_unlock_session; 5232 } 5233 5234 m = ceph_msg_get(req->r_reply); 5235 dout("get_reply tid %lld %p\n", tid, m); 5236 5237 out_unlock_session: 5238 mutex_unlock(&osd->lock); 5239 out_unlock_osdc: 5240 up_read(&osdc->lock); 5241 return m; 5242 } 5243 5244 /* 5245 * TODO: switch to a msg-owned pagelist 5246 */ 5247 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) 5248 { 5249 struct ceph_msg *m; 5250 int type = le16_to_cpu(hdr->type); 5251 u32 front_len = le32_to_cpu(hdr->front_len); 5252 u32 data_len = le32_to_cpu(hdr->data_len); 5253 5254 m = ceph_msg_new(type, front_len, GFP_NOIO, false); 5255 if (!m) 5256 return NULL; 5257 5258 if (data_len) { 5259 struct page **pages; 5260 struct ceph_osd_data osd_data; 5261 5262 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len), 5263 GFP_NOIO); 5264 if (IS_ERR(pages)) { 5265 ceph_msg_put(m); 5266 return NULL; 5267 } 5268 5269 ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false, 5270 false); 5271 ceph_osdc_msg_data_add(m, &osd_data); 5272 } 5273 5274 return m; 5275 } 5276 5277 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 5278 struct ceph_msg_header *hdr, 5279 int *skip) 5280 { 5281 struct ceph_osd *osd = con->private; 5282 int type = le16_to_cpu(hdr->type); 5283 5284 *skip = 0; 5285 switch (type) { 5286 case CEPH_MSG_OSD_MAP: 5287 case CEPH_MSG_OSD_BACKOFF: 5288 case CEPH_MSG_WATCH_NOTIFY: 5289 return alloc_msg_with_page_vector(hdr); 5290 case CEPH_MSG_OSD_OPREPLY: 5291 return get_reply(con, hdr, skip); 5292 default: 5293 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__, 5294 osd->o_osd, type); 5295 *skip = 1; 5296 return NULL; 5297 } 5298 } 5299 5300 /* 5301 * Wrappers to refcount containing ceph_osd struct 5302 */ 5303 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 5304 { 5305 struct ceph_osd *osd = con->private; 5306 if (get_osd(osd)) 5307 return con; 5308 return NULL; 5309 } 5310 5311 static void put_osd_con(struct ceph_connection *con) 5312 { 5313 struct ceph_osd *osd = con->private; 5314 put_osd(osd); 5315 } 5316 5317 /* 5318 * authentication 5319 */ 5320 /* 5321 * Note: returned pointer is the address of a structure that's 5322 * managed separately. Caller must *not* attempt to free it. 5323 */ 5324 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 5325 int *proto, int force_new) 5326 { 5327 struct ceph_osd *o = con->private; 5328 struct ceph_osd_client *osdc = o->o_osdc; 5329 struct ceph_auth_client *ac = osdc->client->monc.auth; 5330 struct ceph_auth_handshake *auth = &o->o_auth; 5331 5332 if (force_new && auth->authorizer) { 5333 ceph_auth_destroy_authorizer(auth->authorizer); 5334 auth->authorizer = NULL; 5335 } 5336 if (!auth->authorizer) { 5337 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 5338 auth); 5339 if (ret) 5340 return ERR_PTR(ret); 5341 } else { 5342 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 5343 auth); 5344 if (ret) 5345 return ERR_PTR(ret); 5346 } 5347 *proto = ac->protocol; 5348 5349 return auth; 5350 } 5351 5352 5353 static int verify_authorizer_reply(struct ceph_connection *con) 5354 { 5355 struct ceph_osd *o = con->private; 5356 struct ceph_osd_client *osdc = o->o_osdc; 5357 struct ceph_auth_client *ac = osdc->client->monc.auth; 5358 5359 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer); 5360 } 5361 5362 static int invalidate_authorizer(struct ceph_connection *con) 5363 { 5364 struct ceph_osd *o = con->private; 5365 struct ceph_osd_client *osdc = o->o_osdc; 5366 struct ceph_auth_client *ac = osdc->client->monc.auth; 5367 5368 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 5369 return ceph_monc_validate_auth(&osdc->client->monc); 5370 } 5371 5372 static void osd_reencode_message(struct ceph_msg *msg) 5373 { 5374 int type = le16_to_cpu(msg->hdr.type); 5375 5376 if (type == CEPH_MSG_OSD_OP) 5377 encode_request_finish(msg); 5378 } 5379 5380 static int osd_sign_message(struct ceph_msg *msg) 5381 { 5382 struct ceph_osd *o = msg->con->private; 5383 struct ceph_auth_handshake *auth = &o->o_auth; 5384 5385 return ceph_auth_sign_message(auth, msg); 5386 } 5387 5388 static int osd_check_message_signature(struct ceph_msg *msg) 5389 { 5390 struct ceph_osd *o = msg->con->private; 5391 struct ceph_auth_handshake *auth = &o->o_auth; 5392 5393 return ceph_auth_check_message_signature(auth, msg); 5394 } 5395 5396 static const struct ceph_connection_operations osd_con_ops = { 5397 .get = get_osd_con, 5398 .put = put_osd_con, 5399 .dispatch = dispatch, 5400 .get_authorizer = get_authorizer, 5401 .verify_authorizer_reply = verify_authorizer_reply, 5402 .invalidate_authorizer = invalidate_authorizer, 5403 .alloc_msg = alloc_msg, 5404 .reencode_message = osd_reencode_message, 5405 .sign_message = osd_sign_message, 5406 .check_message_signature = osd_check_message_signature, 5407 .fault = osd_fault, 5408 }; 5409