1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/err.h> 7 #include <linux/highmem.h> 8 #include <linux/mm.h> 9 #include <linux/pagemap.h> 10 #include <linux/slab.h> 11 #include <linux/uaccess.h> 12 #ifdef CONFIG_BLOCK 13 #include <linux/bio.h> 14 #endif 15 16 #include <linux/ceph/ceph_features.h> 17 #include <linux/ceph/libceph.h> 18 #include <linux/ceph/osd_client.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/auth.h> 22 #include <linux/ceph/pagelist.h> 23 #include <linux/ceph/striper.h> 24 25 #define OSD_OPREPLY_FRONT_LEN 512 26 27 static struct kmem_cache *ceph_osd_request_cache; 28 29 static const struct ceph_connection_operations osd_con_ops; 30 31 /* 32 * Implement client access to distributed object storage cluster. 33 * 34 * All data objects are stored within a cluster/cloud of OSDs, or 35 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 36 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 37 * remote daemons serving up and coordinating consistent and safe 38 * access to storage. 39 * 40 * Cluster membership and the mapping of data objects onto storage devices 41 * are described by the osd map. 42 * 43 * We keep track of pending OSD requests (read, write), resubmit 44 * requests to different OSDs when the cluster topology/data layout 45 * change, or retry the affected requests when the communications 46 * channel with an OSD is reset. 47 */ 48 49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); 50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); 51 static void link_linger(struct ceph_osd *osd, 52 struct ceph_osd_linger_request *lreq); 53 static void unlink_linger(struct ceph_osd *osd, 54 struct ceph_osd_linger_request *lreq); 55 static void clear_backoffs(struct ceph_osd *osd); 56 57 #if 1 58 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) 59 { 60 bool wrlocked = true; 61 62 if (unlikely(down_read_trylock(sem))) { 63 wrlocked = false; 64 up_read(sem); 65 } 66 67 return wrlocked; 68 } 69 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) 70 { 71 WARN_ON(!rwsem_is_locked(&osdc->lock)); 72 } 73 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) 74 { 75 WARN_ON(!rwsem_is_wrlocked(&osdc->lock)); 76 } 77 static inline void verify_osd_locked(struct ceph_osd *osd) 78 { 79 struct ceph_osd_client *osdc = osd->o_osdc; 80 81 WARN_ON(!(mutex_is_locked(&osd->lock) && 82 rwsem_is_locked(&osdc->lock)) && 83 !rwsem_is_wrlocked(&osdc->lock)); 84 } 85 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) 86 { 87 WARN_ON(!mutex_is_locked(&lreq->lock)); 88 } 89 #else 90 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } 91 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } 92 static inline void verify_osd_locked(struct ceph_osd *osd) { } 93 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } 94 #endif 95 96 /* 97 * calculate the mapping of a file extent onto an object, and fill out the 98 * request accordingly. shorten extent as necessary if it crosses an 99 * object boundary. 100 * 101 * fill osd op in request message. 102 */ 103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 104 u64 *objnum, u64 *objoff, u64 *objlen) 105 { 106 u64 orig_len = *plen; 107 u32 xlen; 108 109 /* object extent? */ 110 ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 111 objoff, &xlen); 112 *objlen = xlen; 113 if (*objlen < orig_len) { 114 *plen = *objlen; 115 dout(" skipping last %llu, final file extent %llu~%llu\n", 116 orig_len - *plen, off, *plen); 117 } 118 119 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 120 return 0; 121 } 122 123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 124 { 125 memset(osd_data, 0, sizeof (*osd_data)); 126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 127 } 128 129 /* 130 * Consumes @pages if @own_pages is true. 131 */ 132 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 133 struct page **pages, u64 length, u32 alignment, 134 bool pages_from_pool, bool own_pages) 135 { 136 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 137 osd_data->pages = pages; 138 osd_data->length = length; 139 osd_data->alignment = alignment; 140 osd_data->pages_from_pool = pages_from_pool; 141 osd_data->own_pages = own_pages; 142 } 143 144 /* 145 * Consumes a ref on @pagelist. 146 */ 147 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 148 struct ceph_pagelist *pagelist) 149 { 150 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 151 osd_data->pagelist = pagelist; 152 } 153 154 #ifdef CONFIG_BLOCK 155 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 156 struct ceph_bio_iter *bio_pos, 157 u32 bio_length) 158 { 159 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 160 osd_data->bio_pos = *bio_pos; 161 osd_data->bio_length = bio_length; 162 } 163 #endif /* CONFIG_BLOCK */ 164 165 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, 166 struct ceph_bvec_iter *bvec_pos, 167 u32 num_bvecs) 168 { 169 osd_data->type = CEPH_OSD_DATA_TYPE_BVECS; 170 osd_data->bvec_pos = *bvec_pos; 171 osd_data->num_bvecs = num_bvecs; 172 } 173 174 static struct ceph_osd_data * 175 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 176 { 177 BUG_ON(which >= osd_req->r_num_ops); 178 179 return &osd_req->r_ops[which].raw_data_in; 180 } 181 182 struct ceph_osd_data * 183 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 184 unsigned int which) 185 { 186 return osd_req_op_data(osd_req, which, extent, osd_data); 187 } 188 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 189 190 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 191 unsigned int which, struct page **pages, 192 u64 length, u32 alignment, 193 bool pages_from_pool, bool own_pages) 194 { 195 struct ceph_osd_data *osd_data; 196 197 osd_data = osd_req_op_raw_data_in(osd_req, which); 198 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 199 pages_from_pool, own_pages); 200 } 201 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 202 203 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 204 unsigned int which, struct page **pages, 205 u64 length, u32 alignment, 206 bool pages_from_pool, bool own_pages) 207 { 208 struct ceph_osd_data *osd_data; 209 210 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 211 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 212 pages_from_pool, own_pages); 213 } 214 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 215 216 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 217 unsigned int which, struct ceph_pagelist *pagelist) 218 { 219 struct ceph_osd_data *osd_data; 220 221 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 222 ceph_osd_data_pagelist_init(osd_data, pagelist); 223 } 224 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 225 226 #ifdef CONFIG_BLOCK 227 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 228 unsigned int which, 229 struct ceph_bio_iter *bio_pos, 230 u32 bio_length) 231 { 232 struct ceph_osd_data *osd_data; 233 234 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 235 ceph_osd_data_bio_init(osd_data, bio_pos, bio_length); 236 } 237 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 238 #endif /* CONFIG_BLOCK */ 239 240 void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req, 241 unsigned int which, 242 struct bio_vec *bvecs, u32 num_bvecs, 243 u32 bytes) 244 { 245 struct ceph_osd_data *osd_data; 246 struct ceph_bvec_iter it = { 247 .bvecs = bvecs, 248 .iter = { .bi_size = bytes }, 249 }; 250 251 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 252 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs); 253 } 254 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs); 255 256 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, 257 unsigned int which, 258 struct ceph_bvec_iter *bvec_pos) 259 { 260 struct ceph_osd_data *osd_data; 261 262 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 263 ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0); 264 } 265 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); 266 267 static void osd_req_op_cls_request_info_pagelist( 268 struct ceph_osd_request *osd_req, 269 unsigned int which, struct ceph_pagelist *pagelist) 270 { 271 struct ceph_osd_data *osd_data; 272 273 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 274 ceph_osd_data_pagelist_init(osd_data, pagelist); 275 } 276 277 void osd_req_op_cls_request_data_pagelist( 278 struct ceph_osd_request *osd_req, 279 unsigned int which, struct ceph_pagelist *pagelist) 280 { 281 struct ceph_osd_data *osd_data; 282 283 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 284 ceph_osd_data_pagelist_init(osd_data, pagelist); 285 osd_req->r_ops[which].cls.indata_len += pagelist->length; 286 osd_req->r_ops[which].indata_len += pagelist->length; 287 } 288 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 289 290 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 291 unsigned int which, struct page **pages, u64 length, 292 u32 alignment, bool pages_from_pool, bool own_pages) 293 { 294 struct ceph_osd_data *osd_data; 295 296 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 297 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 298 pages_from_pool, own_pages); 299 osd_req->r_ops[which].cls.indata_len += length; 300 osd_req->r_ops[which].indata_len += length; 301 } 302 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 303 304 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, 305 unsigned int which, 306 struct bio_vec *bvecs, u32 num_bvecs, 307 u32 bytes) 308 { 309 struct ceph_osd_data *osd_data; 310 struct ceph_bvec_iter it = { 311 .bvecs = bvecs, 312 .iter = { .bi_size = bytes }, 313 }; 314 315 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 316 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs); 317 osd_req->r_ops[which].cls.indata_len += bytes; 318 osd_req->r_ops[which].indata_len += bytes; 319 } 320 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs); 321 322 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 323 unsigned int which, struct page **pages, u64 length, 324 u32 alignment, bool pages_from_pool, bool own_pages) 325 { 326 struct ceph_osd_data *osd_data; 327 328 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 329 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 330 pages_from_pool, own_pages); 331 } 332 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 333 334 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 335 { 336 switch (osd_data->type) { 337 case CEPH_OSD_DATA_TYPE_NONE: 338 return 0; 339 case CEPH_OSD_DATA_TYPE_PAGES: 340 return osd_data->length; 341 case CEPH_OSD_DATA_TYPE_PAGELIST: 342 return (u64)osd_data->pagelist->length; 343 #ifdef CONFIG_BLOCK 344 case CEPH_OSD_DATA_TYPE_BIO: 345 return (u64)osd_data->bio_length; 346 #endif /* CONFIG_BLOCK */ 347 case CEPH_OSD_DATA_TYPE_BVECS: 348 return osd_data->bvec_pos.iter.bi_size; 349 default: 350 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 351 return 0; 352 } 353 } 354 355 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 356 { 357 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 358 int num_pages; 359 360 num_pages = calc_pages_for((u64)osd_data->alignment, 361 (u64)osd_data->length); 362 ceph_release_page_vector(osd_data->pages, num_pages); 363 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 364 ceph_pagelist_release(osd_data->pagelist); 365 } 366 ceph_osd_data_init(osd_data); 367 } 368 369 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 370 unsigned int which) 371 { 372 struct ceph_osd_req_op *op; 373 374 BUG_ON(which >= osd_req->r_num_ops); 375 op = &osd_req->r_ops[which]; 376 377 switch (op->op) { 378 case CEPH_OSD_OP_READ: 379 case CEPH_OSD_OP_WRITE: 380 case CEPH_OSD_OP_WRITEFULL: 381 ceph_osd_data_release(&op->extent.osd_data); 382 break; 383 case CEPH_OSD_OP_CALL: 384 ceph_osd_data_release(&op->cls.request_info); 385 ceph_osd_data_release(&op->cls.request_data); 386 ceph_osd_data_release(&op->cls.response_data); 387 break; 388 case CEPH_OSD_OP_SETXATTR: 389 case CEPH_OSD_OP_CMPXATTR: 390 ceph_osd_data_release(&op->xattr.osd_data); 391 break; 392 case CEPH_OSD_OP_STAT: 393 ceph_osd_data_release(&op->raw_data_in); 394 break; 395 case CEPH_OSD_OP_NOTIFY_ACK: 396 ceph_osd_data_release(&op->notify_ack.request_data); 397 break; 398 case CEPH_OSD_OP_NOTIFY: 399 ceph_osd_data_release(&op->notify.request_data); 400 ceph_osd_data_release(&op->notify.response_data); 401 break; 402 case CEPH_OSD_OP_LIST_WATCHERS: 403 ceph_osd_data_release(&op->list_watchers.response_data); 404 break; 405 case CEPH_OSD_OP_COPY_FROM2: 406 ceph_osd_data_release(&op->copy_from.osd_data); 407 break; 408 default: 409 break; 410 } 411 } 412 413 /* 414 * Assumes @t is zero-initialized. 415 */ 416 static void target_init(struct ceph_osd_request_target *t) 417 { 418 ceph_oid_init(&t->base_oid); 419 ceph_oloc_init(&t->base_oloc); 420 ceph_oid_init(&t->target_oid); 421 ceph_oloc_init(&t->target_oloc); 422 423 ceph_osds_init(&t->acting); 424 ceph_osds_init(&t->up); 425 t->size = -1; 426 t->min_size = -1; 427 428 t->osd = CEPH_HOMELESS_OSD; 429 } 430 431 static void target_copy(struct ceph_osd_request_target *dest, 432 const struct ceph_osd_request_target *src) 433 { 434 ceph_oid_copy(&dest->base_oid, &src->base_oid); 435 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); 436 ceph_oid_copy(&dest->target_oid, &src->target_oid); 437 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); 438 439 dest->pgid = src->pgid; /* struct */ 440 dest->spgid = src->spgid; /* struct */ 441 dest->pg_num = src->pg_num; 442 dest->pg_num_mask = src->pg_num_mask; 443 ceph_osds_copy(&dest->acting, &src->acting); 444 ceph_osds_copy(&dest->up, &src->up); 445 dest->size = src->size; 446 dest->min_size = src->min_size; 447 dest->sort_bitwise = src->sort_bitwise; 448 dest->recovery_deletes = src->recovery_deletes; 449 450 dest->flags = src->flags; 451 dest->used_replica = src->used_replica; 452 dest->paused = src->paused; 453 454 dest->epoch = src->epoch; 455 dest->last_force_resend = src->last_force_resend; 456 457 dest->osd = src->osd; 458 } 459 460 static void target_destroy(struct ceph_osd_request_target *t) 461 { 462 ceph_oid_destroy(&t->base_oid); 463 ceph_oloc_destroy(&t->base_oloc); 464 ceph_oid_destroy(&t->target_oid); 465 ceph_oloc_destroy(&t->target_oloc); 466 } 467 468 /* 469 * requests 470 */ 471 static void request_release_checks(struct ceph_osd_request *req) 472 { 473 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 474 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); 475 WARN_ON(!list_empty(&req->r_private_item)); 476 WARN_ON(req->r_osd); 477 } 478 479 static void ceph_osdc_release_request(struct kref *kref) 480 { 481 struct ceph_osd_request *req = container_of(kref, 482 struct ceph_osd_request, r_kref); 483 unsigned int which; 484 485 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 486 req->r_request, req->r_reply); 487 request_release_checks(req); 488 489 if (req->r_request) 490 ceph_msg_put(req->r_request); 491 if (req->r_reply) 492 ceph_msg_put(req->r_reply); 493 494 for (which = 0; which < req->r_num_ops; which++) 495 osd_req_op_data_release(req, which); 496 497 target_destroy(&req->r_t); 498 ceph_put_snap_context(req->r_snapc); 499 500 if (req->r_mempool) 501 mempool_free(req, req->r_osdc->req_mempool); 502 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 503 kmem_cache_free(ceph_osd_request_cache, req); 504 else 505 kfree(req); 506 } 507 508 void ceph_osdc_get_request(struct ceph_osd_request *req) 509 { 510 dout("%s %p (was %d)\n", __func__, req, 511 kref_read(&req->r_kref)); 512 kref_get(&req->r_kref); 513 } 514 EXPORT_SYMBOL(ceph_osdc_get_request); 515 516 void ceph_osdc_put_request(struct ceph_osd_request *req) 517 { 518 if (req) { 519 dout("%s %p (was %d)\n", __func__, req, 520 kref_read(&req->r_kref)); 521 kref_put(&req->r_kref, ceph_osdc_release_request); 522 } 523 } 524 EXPORT_SYMBOL(ceph_osdc_put_request); 525 526 static void request_init(struct ceph_osd_request *req) 527 { 528 /* req only, each op is zeroed in osd_req_op_init() */ 529 memset(req, 0, sizeof(*req)); 530 531 kref_init(&req->r_kref); 532 init_completion(&req->r_completion); 533 RB_CLEAR_NODE(&req->r_node); 534 RB_CLEAR_NODE(&req->r_mc_node); 535 INIT_LIST_HEAD(&req->r_private_item); 536 537 target_init(&req->r_t); 538 } 539 540 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 541 struct ceph_snap_context *snapc, 542 unsigned int num_ops, 543 bool use_mempool, 544 gfp_t gfp_flags) 545 { 546 struct ceph_osd_request *req; 547 548 if (use_mempool) { 549 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 550 req = mempool_alloc(osdc->req_mempool, gfp_flags); 551 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 552 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 553 } else { 554 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 555 req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags); 556 } 557 if (unlikely(!req)) 558 return NULL; 559 560 request_init(req); 561 req->r_osdc = osdc; 562 req->r_mempool = use_mempool; 563 req->r_num_ops = num_ops; 564 req->r_snapid = CEPH_NOSNAP; 565 req->r_snapc = ceph_get_snap_context(snapc); 566 567 dout("%s req %p\n", __func__, req); 568 return req; 569 } 570 EXPORT_SYMBOL(ceph_osdc_alloc_request); 571 572 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) 573 { 574 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 575 } 576 577 static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp, 578 int num_request_data_items, 579 int num_reply_data_items) 580 { 581 struct ceph_osd_client *osdc = req->r_osdc; 582 struct ceph_msg *msg; 583 int msg_size; 584 585 WARN_ON(req->r_request || req->r_reply); 586 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 587 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 588 589 /* create request message */ 590 msg_size = CEPH_ENCODING_START_BLK_LEN + 591 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 592 msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */ 593 msg_size += CEPH_ENCODING_START_BLK_LEN + 594 sizeof(struct ceph_osd_reqid); /* reqid */ 595 msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */ 596 msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */ 597 msg_size += CEPH_ENCODING_START_BLK_LEN + 598 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ 599 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 600 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 601 msg_size += 8; /* snapid */ 602 msg_size += 8; /* snap_seq */ 603 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 604 msg_size += 4 + 8; /* retry_attempt, features */ 605 606 if (req->r_mempool) 607 msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size, 608 num_request_data_items); 609 else 610 msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size, 611 num_request_data_items, gfp, true); 612 if (!msg) 613 return -ENOMEM; 614 615 memset(msg->front.iov_base, 0, msg->front.iov_len); 616 req->r_request = msg; 617 618 /* create reply message */ 619 msg_size = OSD_OPREPLY_FRONT_LEN; 620 msg_size += req->r_base_oid.name_len; 621 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 622 623 if (req->r_mempool) 624 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size, 625 num_reply_data_items); 626 else 627 msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size, 628 num_reply_data_items, gfp, true); 629 if (!msg) 630 return -ENOMEM; 631 632 req->r_reply = msg; 633 634 return 0; 635 } 636 637 static bool osd_req_opcode_valid(u16 opcode) 638 { 639 switch (opcode) { 640 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 641 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 642 #undef GENERATE_CASE 643 default: 644 return false; 645 } 646 } 647 648 static void get_num_data_items(struct ceph_osd_request *req, 649 int *num_request_data_items, 650 int *num_reply_data_items) 651 { 652 struct ceph_osd_req_op *op; 653 654 *num_request_data_items = 0; 655 *num_reply_data_items = 0; 656 657 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { 658 switch (op->op) { 659 /* request */ 660 case CEPH_OSD_OP_WRITE: 661 case CEPH_OSD_OP_WRITEFULL: 662 case CEPH_OSD_OP_SETXATTR: 663 case CEPH_OSD_OP_CMPXATTR: 664 case CEPH_OSD_OP_NOTIFY_ACK: 665 case CEPH_OSD_OP_COPY_FROM2: 666 *num_request_data_items += 1; 667 break; 668 669 /* reply */ 670 case CEPH_OSD_OP_STAT: 671 case CEPH_OSD_OP_READ: 672 case CEPH_OSD_OP_LIST_WATCHERS: 673 *num_reply_data_items += 1; 674 break; 675 676 /* both */ 677 case CEPH_OSD_OP_NOTIFY: 678 *num_request_data_items += 1; 679 *num_reply_data_items += 1; 680 break; 681 case CEPH_OSD_OP_CALL: 682 *num_request_data_items += 2; 683 *num_reply_data_items += 1; 684 break; 685 686 default: 687 WARN_ON(!osd_req_opcode_valid(op->op)); 688 break; 689 } 690 } 691 } 692 693 /* 694 * oid, oloc and OSD op opcode(s) must be filled in before this function 695 * is called. 696 */ 697 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 698 { 699 int num_request_data_items, num_reply_data_items; 700 701 get_num_data_items(req, &num_request_data_items, &num_reply_data_items); 702 return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items, 703 num_reply_data_items); 704 } 705 EXPORT_SYMBOL(ceph_osdc_alloc_messages); 706 707 /* 708 * This is an osd op init function for opcodes that have no data or 709 * other information associated with them. It also serves as a 710 * common init routine for all the other init functions, below. 711 */ 712 struct ceph_osd_req_op * 713 osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 714 u16 opcode, u32 flags) 715 { 716 struct ceph_osd_req_op *op; 717 718 BUG_ON(which >= osd_req->r_num_ops); 719 BUG_ON(!osd_req_opcode_valid(opcode)); 720 721 op = &osd_req->r_ops[which]; 722 memset(op, 0, sizeof (*op)); 723 op->op = opcode; 724 op->flags = flags; 725 726 return op; 727 } 728 EXPORT_SYMBOL(osd_req_op_init); 729 730 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 731 unsigned int which, u16 opcode, 732 u64 offset, u64 length, 733 u64 truncate_size, u32 truncate_seq) 734 { 735 struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, 736 opcode, 0); 737 size_t payload_len = 0; 738 739 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 740 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 741 opcode != CEPH_OSD_OP_TRUNCATE); 742 743 op->extent.offset = offset; 744 op->extent.length = length; 745 op->extent.truncate_size = truncate_size; 746 op->extent.truncate_seq = truncate_seq; 747 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 748 payload_len += length; 749 750 op->indata_len = payload_len; 751 } 752 EXPORT_SYMBOL(osd_req_op_extent_init); 753 754 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 755 unsigned int which, u64 length) 756 { 757 struct ceph_osd_req_op *op; 758 u64 previous; 759 760 BUG_ON(which >= osd_req->r_num_ops); 761 op = &osd_req->r_ops[which]; 762 previous = op->extent.length; 763 764 if (length == previous) 765 return; /* Nothing to do */ 766 BUG_ON(length > previous); 767 768 op->extent.length = length; 769 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 770 op->indata_len -= previous - length; 771 } 772 EXPORT_SYMBOL(osd_req_op_extent_update); 773 774 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 775 unsigned int which, u64 offset_inc) 776 { 777 struct ceph_osd_req_op *op, *prev_op; 778 779 BUG_ON(which + 1 >= osd_req->r_num_ops); 780 781 prev_op = &osd_req->r_ops[which]; 782 op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 783 /* dup previous one */ 784 op->indata_len = prev_op->indata_len; 785 op->outdata_len = prev_op->outdata_len; 786 op->extent = prev_op->extent; 787 /* adjust offset */ 788 op->extent.offset += offset_inc; 789 op->extent.length -= offset_inc; 790 791 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 792 op->indata_len -= offset_inc; 793 } 794 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 795 796 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 797 const char *class, const char *method) 798 { 799 struct ceph_osd_req_op *op; 800 struct ceph_pagelist *pagelist; 801 size_t payload_len = 0; 802 size_t size; 803 int ret; 804 805 op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0); 806 807 pagelist = ceph_pagelist_alloc(GFP_NOFS); 808 if (!pagelist) 809 return -ENOMEM; 810 811 op->cls.class_name = class; 812 size = strlen(class); 813 BUG_ON(size > (size_t) U8_MAX); 814 op->cls.class_len = size; 815 ret = ceph_pagelist_append(pagelist, class, size); 816 if (ret) 817 goto err_pagelist_free; 818 payload_len += size; 819 820 op->cls.method_name = method; 821 size = strlen(method); 822 BUG_ON(size > (size_t) U8_MAX); 823 op->cls.method_len = size; 824 ret = ceph_pagelist_append(pagelist, method, size); 825 if (ret) 826 goto err_pagelist_free; 827 payload_len += size; 828 829 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 830 op->indata_len = payload_len; 831 return 0; 832 833 err_pagelist_free: 834 ceph_pagelist_release(pagelist); 835 return ret; 836 } 837 EXPORT_SYMBOL(osd_req_op_cls_init); 838 839 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 840 u16 opcode, const char *name, const void *value, 841 size_t size, u8 cmp_op, u8 cmp_mode) 842 { 843 struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, 844 opcode, 0); 845 struct ceph_pagelist *pagelist; 846 size_t payload_len; 847 int ret; 848 849 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 850 851 pagelist = ceph_pagelist_alloc(GFP_NOFS); 852 if (!pagelist) 853 return -ENOMEM; 854 855 payload_len = strlen(name); 856 op->xattr.name_len = payload_len; 857 ret = ceph_pagelist_append(pagelist, name, payload_len); 858 if (ret) 859 goto err_pagelist_free; 860 861 op->xattr.value_len = size; 862 ret = ceph_pagelist_append(pagelist, value, size); 863 if (ret) 864 goto err_pagelist_free; 865 payload_len += size; 866 867 op->xattr.cmp_op = cmp_op; 868 op->xattr.cmp_mode = cmp_mode; 869 870 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 871 op->indata_len = payload_len; 872 return 0; 873 874 err_pagelist_free: 875 ceph_pagelist_release(pagelist); 876 return ret; 877 } 878 EXPORT_SYMBOL(osd_req_op_xattr_init); 879 880 /* 881 * @watch_opcode: CEPH_OSD_WATCH_OP_* 882 */ 883 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, 884 u8 watch_opcode, u64 cookie, u32 gen) 885 { 886 struct ceph_osd_req_op *op; 887 888 op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); 889 op->watch.cookie = cookie; 890 op->watch.op = watch_opcode; 891 op->watch.gen = gen; 892 } 893 894 /* 895 * prot_ver, timeout and notify payload (may be empty) should already be 896 * encoded in @request_pl 897 */ 898 static void osd_req_op_notify_init(struct ceph_osd_request *req, int which, 899 u64 cookie, struct ceph_pagelist *request_pl) 900 { 901 struct ceph_osd_req_op *op; 902 903 op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); 904 op->notify.cookie = cookie; 905 906 ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl); 907 op->indata_len = request_pl->length; 908 } 909 910 /* 911 * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_* 912 */ 913 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 914 unsigned int which, 915 u64 expected_object_size, 916 u64 expected_write_size, 917 u32 flags) 918 { 919 struct ceph_osd_req_op *op; 920 921 op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0); 922 op->alloc_hint.expected_object_size = expected_object_size; 923 op->alloc_hint.expected_write_size = expected_write_size; 924 op->alloc_hint.flags = flags; 925 926 /* 927 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 928 * not worth a feature bit. Set FAILOK per-op flag to make 929 * sure older osds don't trip over an unsupported opcode. 930 */ 931 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 932 } 933 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 934 935 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 936 struct ceph_osd_data *osd_data) 937 { 938 u64 length = ceph_osd_data_length(osd_data); 939 940 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 941 BUG_ON(length > (u64) SIZE_MAX); 942 if (length) 943 ceph_msg_data_add_pages(msg, osd_data->pages, 944 length, osd_data->alignment, false); 945 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 946 BUG_ON(!length); 947 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 948 #ifdef CONFIG_BLOCK 949 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 950 ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length); 951 #endif 952 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { 953 ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); 954 } else { 955 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 956 } 957 } 958 959 static u32 osd_req_encode_op(struct ceph_osd_op *dst, 960 const struct ceph_osd_req_op *src) 961 { 962 switch (src->op) { 963 case CEPH_OSD_OP_STAT: 964 break; 965 case CEPH_OSD_OP_READ: 966 case CEPH_OSD_OP_WRITE: 967 case CEPH_OSD_OP_WRITEFULL: 968 case CEPH_OSD_OP_ZERO: 969 case CEPH_OSD_OP_TRUNCATE: 970 dst->extent.offset = cpu_to_le64(src->extent.offset); 971 dst->extent.length = cpu_to_le64(src->extent.length); 972 dst->extent.truncate_size = 973 cpu_to_le64(src->extent.truncate_size); 974 dst->extent.truncate_seq = 975 cpu_to_le32(src->extent.truncate_seq); 976 break; 977 case CEPH_OSD_OP_CALL: 978 dst->cls.class_len = src->cls.class_len; 979 dst->cls.method_len = src->cls.method_len; 980 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 981 break; 982 case CEPH_OSD_OP_WATCH: 983 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 984 dst->watch.ver = cpu_to_le64(0); 985 dst->watch.op = src->watch.op; 986 dst->watch.gen = cpu_to_le32(src->watch.gen); 987 break; 988 case CEPH_OSD_OP_NOTIFY_ACK: 989 break; 990 case CEPH_OSD_OP_NOTIFY: 991 dst->notify.cookie = cpu_to_le64(src->notify.cookie); 992 break; 993 case CEPH_OSD_OP_LIST_WATCHERS: 994 break; 995 case CEPH_OSD_OP_SETALLOCHINT: 996 dst->alloc_hint.expected_object_size = 997 cpu_to_le64(src->alloc_hint.expected_object_size); 998 dst->alloc_hint.expected_write_size = 999 cpu_to_le64(src->alloc_hint.expected_write_size); 1000 dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags); 1001 break; 1002 case CEPH_OSD_OP_SETXATTR: 1003 case CEPH_OSD_OP_CMPXATTR: 1004 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 1005 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 1006 dst->xattr.cmp_op = src->xattr.cmp_op; 1007 dst->xattr.cmp_mode = src->xattr.cmp_mode; 1008 break; 1009 case CEPH_OSD_OP_CREATE: 1010 case CEPH_OSD_OP_DELETE: 1011 break; 1012 case CEPH_OSD_OP_COPY_FROM2: 1013 dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid); 1014 dst->copy_from.src_version = 1015 cpu_to_le64(src->copy_from.src_version); 1016 dst->copy_from.flags = src->copy_from.flags; 1017 dst->copy_from.src_fadvise_flags = 1018 cpu_to_le32(src->copy_from.src_fadvise_flags); 1019 break; 1020 default: 1021 pr_err("unsupported osd opcode %s\n", 1022 ceph_osd_op_name(src->op)); 1023 WARN_ON(1); 1024 1025 return 0; 1026 } 1027 1028 dst->op = cpu_to_le16(src->op); 1029 dst->flags = cpu_to_le32(src->flags); 1030 dst->payload_len = cpu_to_le32(src->indata_len); 1031 1032 return src->indata_len; 1033 } 1034 1035 /* 1036 * build new request AND message, calculate layout, and adjust file 1037 * extent as needed. 1038 * 1039 * if the file was recently truncated, we include information about its 1040 * old and new size so that the object can be updated appropriately. (we 1041 * avoid synchronously deleting truncated objects because it's slow.) 1042 */ 1043 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 1044 struct ceph_file_layout *layout, 1045 struct ceph_vino vino, 1046 u64 off, u64 *plen, 1047 unsigned int which, int num_ops, 1048 int opcode, int flags, 1049 struct ceph_snap_context *snapc, 1050 u32 truncate_seq, 1051 u64 truncate_size, 1052 bool use_mempool) 1053 { 1054 struct ceph_osd_request *req; 1055 u64 objnum = 0; 1056 u64 objoff = 0; 1057 u64 objlen = 0; 1058 int r; 1059 1060 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 1061 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 1062 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 1063 1064 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 1065 GFP_NOFS); 1066 if (!req) { 1067 r = -ENOMEM; 1068 goto fail; 1069 } 1070 1071 /* calculate max write size */ 1072 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 1073 if (r) 1074 goto fail; 1075 1076 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 1077 osd_req_op_init(req, which, opcode, 0); 1078 } else { 1079 u32 object_size = layout->object_size; 1080 u32 object_base = off - objoff; 1081 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 1082 if (truncate_size <= object_base) { 1083 truncate_size = 0; 1084 } else { 1085 truncate_size -= object_base; 1086 if (truncate_size > object_size) 1087 truncate_size = object_size; 1088 } 1089 } 1090 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 1091 truncate_size, truncate_seq); 1092 } 1093 1094 req->r_base_oloc.pool = layout->pool_id; 1095 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 1096 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 1097 req->r_flags = flags | osdc->client->options->read_from_replica; 1098 1099 req->r_snapid = vino.snap; 1100 if (flags & CEPH_OSD_FLAG_WRITE) 1101 req->r_data_offset = off; 1102 1103 if (num_ops > 1) 1104 /* 1105 * This is a special case for ceph_writepages_start(), but it 1106 * also covers ceph_uninline_data(). If more multi-op request 1107 * use cases emerge, we will need a separate helper. 1108 */ 1109 r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0); 1110 else 1111 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 1112 if (r) 1113 goto fail; 1114 1115 return req; 1116 1117 fail: 1118 ceph_osdc_put_request(req); 1119 return ERR_PTR(r); 1120 } 1121 EXPORT_SYMBOL(ceph_osdc_new_request); 1122 1123 /* 1124 * We keep osd requests in an rbtree, sorted by ->r_tid. 1125 */ 1126 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 1127 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) 1128 1129 /* 1130 * Call @fn on each OSD request as long as @fn returns 0. 1131 */ 1132 static void for_each_request(struct ceph_osd_client *osdc, 1133 int (*fn)(struct ceph_osd_request *req, void *arg), 1134 void *arg) 1135 { 1136 struct rb_node *n, *p; 1137 1138 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 1139 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 1140 1141 for (p = rb_first(&osd->o_requests); p; ) { 1142 struct ceph_osd_request *req = 1143 rb_entry(p, struct ceph_osd_request, r_node); 1144 1145 p = rb_next(p); 1146 if (fn(req, arg)) 1147 return; 1148 } 1149 } 1150 1151 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { 1152 struct ceph_osd_request *req = 1153 rb_entry(p, struct ceph_osd_request, r_node); 1154 1155 p = rb_next(p); 1156 if (fn(req, arg)) 1157 return; 1158 } 1159 } 1160 1161 static bool osd_homeless(struct ceph_osd *osd) 1162 { 1163 return osd->o_osd == CEPH_HOMELESS_OSD; 1164 } 1165 1166 static bool osd_registered(struct ceph_osd *osd) 1167 { 1168 verify_osdc_locked(osd->o_osdc); 1169 1170 return !RB_EMPTY_NODE(&osd->o_node); 1171 } 1172 1173 /* 1174 * Assumes @osd is zero-initialized. 1175 */ 1176 static void osd_init(struct ceph_osd *osd) 1177 { 1178 refcount_set(&osd->o_ref, 1); 1179 RB_CLEAR_NODE(&osd->o_node); 1180 spin_lock_init(&osd->o_requests_lock); 1181 osd->o_requests = RB_ROOT; 1182 osd->o_linger_requests = RB_ROOT; 1183 osd->o_backoff_mappings = RB_ROOT; 1184 osd->o_backoffs_by_id = RB_ROOT; 1185 INIT_LIST_HEAD(&osd->o_osd_lru); 1186 INIT_LIST_HEAD(&osd->o_keepalive_item); 1187 osd->o_incarnation = 1; 1188 mutex_init(&osd->lock); 1189 } 1190 1191 static void osd_cleanup(struct ceph_osd *osd) 1192 { 1193 WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); 1194 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 1195 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 1196 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings)); 1197 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id)); 1198 WARN_ON(!list_empty(&osd->o_osd_lru)); 1199 WARN_ON(!list_empty(&osd->o_keepalive_item)); 1200 1201 if (osd->o_auth.authorizer) { 1202 WARN_ON(osd_homeless(osd)); 1203 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1204 } 1205 } 1206 1207 /* 1208 * Track open sessions with osds. 1209 */ 1210 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1211 { 1212 struct ceph_osd *osd; 1213 1214 WARN_ON(onum == CEPH_HOMELESS_OSD); 1215 1216 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL); 1217 osd_init(osd); 1218 osd->o_osdc = osdc; 1219 osd->o_osd = onum; 1220 1221 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1222 1223 return osd; 1224 } 1225 1226 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1227 { 1228 if (refcount_inc_not_zero(&osd->o_ref)) { 1229 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1, 1230 refcount_read(&osd->o_ref)); 1231 return osd; 1232 } else { 1233 dout("get_osd %p FAIL\n", osd); 1234 return NULL; 1235 } 1236 } 1237 1238 static void put_osd(struct ceph_osd *osd) 1239 { 1240 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref), 1241 refcount_read(&osd->o_ref) - 1); 1242 if (refcount_dec_and_test(&osd->o_ref)) { 1243 osd_cleanup(osd); 1244 kfree(osd); 1245 } 1246 } 1247 1248 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) 1249 1250 static void __move_osd_to_lru(struct ceph_osd *osd) 1251 { 1252 struct ceph_osd_client *osdc = osd->o_osdc; 1253 1254 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1255 BUG_ON(!list_empty(&osd->o_osd_lru)); 1256 1257 spin_lock(&osdc->osd_lru_lock); 1258 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1259 spin_unlock(&osdc->osd_lru_lock); 1260 1261 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1262 } 1263 1264 static void maybe_move_osd_to_lru(struct ceph_osd *osd) 1265 { 1266 if (RB_EMPTY_ROOT(&osd->o_requests) && 1267 RB_EMPTY_ROOT(&osd->o_linger_requests)) 1268 __move_osd_to_lru(osd); 1269 } 1270 1271 static void __remove_osd_from_lru(struct ceph_osd *osd) 1272 { 1273 struct ceph_osd_client *osdc = osd->o_osdc; 1274 1275 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1276 1277 spin_lock(&osdc->osd_lru_lock); 1278 if (!list_empty(&osd->o_osd_lru)) 1279 list_del_init(&osd->o_osd_lru); 1280 spin_unlock(&osdc->osd_lru_lock); 1281 } 1282 1283 /* 1284 * Close the connection and assign any leftover requests to the 1285 * homeless session. 1286 */ 1287 static void close_osd(struct ceph_osd *osd) 1288 { 1289 struct ceph_osd_client *osdc = osd->o_osdc; 1290 struct rb_node *n; 1291 1292 verify_osdc_wrlocked(osdc); 1293 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1294 1295 ceph_con_close(&osd->o_con); 1296 1297 for (n = rb_first(&osd->o_requests); n; ) { 1298 struct ceph_osd_request *req = 1299 rb_entry(n, struct ceph_osd_request, r_node); 1300 1301 n = rb_next(n); /* unlink_request() */ 1302 1303 dout(" reassigning req %p tid %llu\n", req, req->r_tid); 1304 unlink_request(osd, req); 1305 link_request(&osdc->homeless_osd, req); 1306 } 1307 for (n = rb_first(&osd->o_linger_requests); n; ) { 1308 struct ceph_osd_linger_request *lreq = 1309 rb_entry(n, struct ceph_osd_linger_request, node); 1310 1311 n = rb_next(n); /* unlink_linger() */ 1312 1313 dout(" reassigning lreq %p linger_id %llu\n", lreq, 1314 lreq->linger_id); 1315 unlink_linger(osd, lreq); 1316 link_linger(&osdc->homeless_osd, lreq); 1317 } 1318 clear_backoffs(osd); 1319 1320 __remove_osd_from_lru(osd); 1321 erase_osd(&osdc->osds, osd); 1322 put_osd(osd); 1323 } 1324 1325 /* 1326 * reset osd connect 1327 */ 1328 static int reopen_osd(struct ceph_osd *osd) 1329 { 1330 struct ceph_entity_addr *peer_addr; 1331 1332 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1333 1334 if (RB_EMPTY_ROOT(&osd->o_requests) && 1335 RB_EMPTY_ROOT(&osd->o_linger_requests)) { 1336 close_osd(osd); 1337 return -ENODEV; 1338 } 1339 1340 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd]; 1341 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1342 !ceph_con_opened(&osd->o_con)) { 1343 struct rb_node *n; 1344 1345 dout("osd addr hasn't changed and connection never opened, " 1346 "letting msgr retry\n"); 1347 /* touch each r_stamp for handle_timeout()'s benfit */ 1348 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 1349 struct ceph_osd_request *req = 1350 rb_entry(n, struct ceph_osd_request, r_node); 1351 req->r_stamp = jiffies; 1352 } 1353 1354 return -EAGAIN; 1355 } 1356 1357 ceph_con_close(&osd->o_con); 1358 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1359 osd->o_incarnation++; 1360 1361 return 0; 1362 } 1363 1364 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o, 1365 bool wrlocked) 1366 { 1367 struct ceph_osd *osd; 1368 1369 if (wrlocked) 1370 verify_osdc_wrlocked(osdc); 1371 else 1372 verify_osdc_locked(osdc); 1373 1374 if (o != CEPH_HOMELESS_OSD) 1375 osd = lookup_osd(&osdc->osds, o); 1376 else 1377 osd = &osdc->homeless_osd; 1378 if (!osd) { 1379 if (!wrlocked) 1380 return ERR_PTR(-EAGAIN); 1381 1382 osd = create_osd(osdc, o); 1383 insert_osd(&osdc->osds, osd); 1384 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, 1385 &osdc->osdmap->osd_addr[osd->o_osd]); 1386 } 1387 1388 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd); 1389 return osd; 1390 } 1391 1392 /* 1393 * Create request <-> OSD session relation. 1394 * 1395 * @req has to be assigned a tid, @osd may be homeless. 1396 */ 1397 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1398 { 1399 verify_osd_locked(osd); 1400 WARN_ON(!req->r_tid || req->r_osd); 1401 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1402 req, req->r_tid); 1403 1404 if (!osd_homeless(osd)) 1405 __remove_osd_from_lru(osd); 1406 else 1407 atomic_inc(&osd->o_osdc->num_homeless); 1408 1409 get_osd(osd); 1410 spin_lock(&osd->o_requests_lock); 1411 insert_request(&osd->o_requests, req); 1412 spin_unlock(&osd->o_requests_lock); 1413 req->r_osd = osd; 1414 } 1415 1416 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1417 { 1418 verify_osd_locked(osd); 1419 WARN_ON(req->r_osd != osd); 1420 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1421 req, req->r_tid); 1422 1423 req->r_osd = NULL; 1424 spin_lock(&osd->o_requests_lock); 1425 erase_request(&osd->o_requests, req); 1426 spin_unlock(&osd->o_requests_lock); 1427 put_osd(osd); 1428 1429 if (!osd_homeless(osd)) 1430 maybe_move_osd_to_lru(osd); 1431 else 1432 atomic_dec(&osd->o_osdc->num_homeless); 1433 } 1434 1435 static bool __pool_full(struct ceph_pg_pool_info *pi) 1436 { 1437 return pi->flags & CEPH_POOL_FLAG_FULL; 1438 } 1439 1440 static bool have_pool_full(struct ceph_osd_client *osdc) 1441 { 1442 struct rb_node *n; 1443 1444 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 1445 struct ceph_pg_pool_info *pi = 1446 rb_entry(n, struct ceph_pg_pool_info, node); 1447 1448 if (__pool_full(pi)) 1449 return true; 1450 } 1451 1452 return false; 1453 } 1454 1455 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id) 1456 { 1457 struct ceph_pg_pool_info *pi; 1458 1459 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 1460 if (!pi) 1461 return false; 1462 1463 return __pool_full(pi); 1464 } 1465 1466 /* 1467 * Returns whether a request should be blocked from being sent 1468 * based on the current osdmap and osd_client settings. 1469 */ 1470 static bool target_should_be_paused(struct ceph_osd_client *osdc, 1471 const struct ceph_osd_request_target *t, 1472 struct ceph_pg_pool_info *pi) 1473 { 1474 bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 1475 bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 1476 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1477 __pool_full(pi); 1478 1479 WARN_ON(pi->id != t->target_oloc.pool); 1480 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || 1481 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || 1482 (osdc->osdmap->epoch < osdc->epoch_barrier); 1483 } 1484 1485 static int pick_random_replica(const struct ceph_osds *acting) 1486 { 1487 int i = get_random_u32_below(acting->size); 1488 1489 dout("%s picked osd%d, primary osd%d\n", __func__, 1490 acting->osds[i], acting->primary); 1491 return i; 1492 } 1493 1494 /* 1495 * Picks the closest replica based on client's location given by 1496 * crush_location option. Prefers the primary if the locality is 1497 * the same. 1498 */ 1499 static int pick_closest_replica(struct ceph_osd_client *osdc, 1500 const struct ceph_osds *acting) 1501 { 1502 struct ceph_options *opt = osdc->client->options; 1503 int best_i, best_locality; 1504 int i = 0, locality; 1505 1506 do { 1507 locality = ceph_get_crush_locality(osdc->osdmap, 1508 acting->osds[i], 1509 &opt->crush_locs); 1510 if (i == 0 || 1511 (locality >= 0 && best_locality < 0) || 1512 (locality >= 0 && best_locality >= 0 && 1513 locality < best_locality)) { 1514 best_i = i; 1515 best_locality = locality; 1516 } 1517 } while (++i < acting->size); 1518 1519 dout("%s picked osd%d with locality %d, primary osd%d\n", __func__, 1520 acting->osds[best_i], best_locality, acting->primary); 1521 return best_i; 1522 } 1523 1524 enum calc_target_result { 1525 CALC_TARGET_NO_ACTION = 0, 1526 CALC_TARGET_NEED_RESEND, 1527 CALC_TARGET_POOL_DNE, 1528 }; 1529 1530 static enum calc_target_result calc_target(struct ceph_osd_client *osdc, 1531 struct ceph_osd_request_target *t, 1532 bool any_change) 1533 { 1534 struct ceph_pg_pool_info *pi; 1535 struct ceph_pg pgid, last_pgid; 1536 struct ceph_osds up, acting; 1537 bool is_read = t->flags & CEPH_OSD_FLAG_READ; 1538 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE; 1539 bool force_resend = false; 1540 bool unpaused = false; 1541 bool legacy_change = false; 1542 bool split = false; 1543 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE); 1544 bool recovery_deletes = ceph_osdmap_flag(osdc, 1545 CEPH_OSDMAP_RECOVERY_DELETES); 1546 enum calc_target_result ct_res; 1547 1548 t->epoch = osdc->osdmap->epoch; 1549 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1550 if (!pi) { 1551 t->osd = CEPH_HOMELESS_OSD; 1552 ct_res = CALC_TARGET_POOL_DNE; 1553 goto out; 1554 } 1555 1556 if (osdc->osdmap->epoch == pi->last_force_request_resend) { 1557 if (t->last_force_resend < pi->last_force_request_resend) { 1558 t->last_force_resend = pi->last_force_request_resend; 1559 force_resend = true; 1560 } else if (t->last_force_resend == 0) { 1561 force_resend = true; 1562 } 1563 } 1564 1565 /* apply tiering */ 1566 ceph_oid_copy(&t->target_oid, &t->base_oid); 1567 ceph_oloc_copy(&t->target_oloc, &t->base_oloc); 1568 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1569 if (is_read && pi->read_tier >= 0) 1570 t->target_oloc.pool = pi->read_tier; 1571 if (is_write && pi->write_tier >= 0) 1572 t->target_oloc.pool = pi->write_tier; 1573 1574 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool); 1575 if (!pi) { 1576 t->osd = CEPH_HOMELESS_OSD; 1577 ct_res = CALC_TARGET_POOL_DNE; 1578 goto out; 1579 } 1580 } 1581 1582 __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid); 1583 last_pgid.pool = pgid.pool; 1584 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1585 1586 ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting); 1587 if (any_change && 1588 ceph_is_new_interval(&t->acting, 1589 &acting, 1590 &t->up, 1591 &up, 1592 t->size, 1593 pi->size, 1594 t->min_size, 1595 pi->min_size, 1596 t->pg_num, 1597 pi->pg_num, 1598 t->sort_bitwise, 1599 sort_bitwise, 1600 t->recovery_deletes, 1601 recovery_deletes, 1602 &last_pgid)) 1603 force_resend = true; 1604 1605 if (t->paused && !target_should_be_paused(osdc, t, pi)) { 1606 t->paused = false; 1607 unpaused = true; 1608 } 1609 legacy_change = ceph_pg_compare(&t->pgid, &pgid) || 1610 ceph_osds_changed(&t->acting, &acting, 1611 t->used_replica || any_change); 1612 if (t->pg_num) 1613 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num); 1614 1615 if (legacy_change || force_resend || split) { 1616 t->pgid = pgid; /* struct */ 1617 ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid); 1618 ceph_osds_copy(&t->acting, &acting); 1619 ceph_osds_copy(&t->up, &up); 1620 t->size = pi->size; 1621 t->min_size = pi->min_size; 1622 t->pg_num = pi->pg_num; 1623 t->pg_num_mask = pi->pg_num_mask; 1624 t->sort_bitwise = sort_bitwise; 1625 t->recovery_deletes = recovery_deletes; 1626 1627 if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS | 1628 CEPH_OSD_FLAG_LOCALIZE_READS)) && 1629 !is_write && pi->type == CEPH_POOL_TYPE_REP && 1630 acting.size > 1) { 1631 int pos; 1632 1633 WARN_ON(!is_read || acting.osds[0] != acting.primary); 1634 if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) { 1635 pos = pick_random_replica(&acting); 1636 } else { 1637 pos = pick_closest_replica(osdc, &acting); 1638 } 1639 t->osd = acting.osds[pos]; 1640 t->used_replica = pos > 0; 1641 } else { 1642 t->osd = acting.primary; 1643 t->used_replica = false; 1644 } 1645 } 1646 1647 if (unpaused || legacy_change || force_resend || split) 1648 ct_res = CALC_TARGET_NEED_RESEND; 1649 else 1650 ct_res = CALC_TARGET_NO_ACTION; 1651 1652 out: 1653 dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused, 1654 legacy_change, force_resend, split, ct_res, t->osd); 1655 return ct_res; 1656 } 1657 1658 static struct ceph_spg_mapping *alloc_spg_mapping(void) 1659 { 1660 struct ceph_spg_mapping *spg; 1661 1662 spg = kmalloc(sizeof(*spg), GFP_NOIO); 1663 if (!spg) 1664 return NULL; 1665 1666 RB_CLEAR_NODE(&spg->node); 1667 spg->backoffs = RB_ROOT; 1668 return spg; 1669 } 1670 1671 static void free_spg_mapping(struct ceph_spg_mapping *spg) 1672 { 1673 WARN_ON(!RB_EMPTY_NODE(&spg->node)); 1674 WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs)); 1675 1676 kfree(spg); 1677 } 1678 1679 /* 1680 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to 1681 * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is 1682 * defined only within a specific spgid; it does not pass anything to 1683 * children on split, or to another primary. 1684 */ 1685 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare, 1686 RB_BYPTR, const struct ceph_spg *, node) 1687 1688 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid) 1689 { 1690 return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits; 1691 } 1692 1693 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid, 1694 void **pkey, size_t *pkey_len) 1695 { 1696 if (hoid->key_len) { 1697 *pkey = hoid->key; 1698 *pkey_len = hoid->key_len; 1699 } else { 1700 *pkey = hoid->oid; 1701 *pkey_len = hoid->oid_len; 1702 } 1703 } 1704 1705 static int compare_names(const void *name1, size_t name1_len, 1706 const void *name2, size_t name2_len) 1707 { 1708 int ret; 1709 1710 ret = memcmp(name1, name2, min(name1_len, name2_len)); 1711 if (!ret) { 1712 if (name1_len < name2_len) 1713 ret = -1; 1714 else if (name1_len > name2_len) 1715 ret = 1; 1716 } 1717 return ret; 1718 } 1719 1720 static int hoid_compare(const struct ceph_hobject_id *lhs, 1721 const struct ceph_hobject_id *rhs) 1722 { 1723 void *effective_key1, *effective_key2; 1724 size_t effective_key1_len, effective_key2_len; 1725 int ret; 1726 1727 if (lhs->is_max < rhs->is_max) 1728 return -1; 1729 if (lhs->is_max > rhs->is_max) 1730 return 1; 1731 1732 if (lhs->pool < rhs->pool) 1733 return -1; 1734 if (lhs->pool > rhs->pool) 1735 return 1; 1736 1737 if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs)) 1738 return -1; 1739 if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs)) 1740 return 1; 1741 1742 ret = compare_names(lhs->nspace, lhs->nspace_len, 1743 rhs->nspace, rhs->nspace_len); 1744 if (ret) 1745 return ret; 1746 1747 hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len); 1748 hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len); 1749 ret = compare_names(effective_key1, effective_key1_len, 1750 effective_key2, effective_key2_len); 1751 if (ret) 1752 return ret; 1753 1754 ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len); 1755 if (ret) 1756 return ret; 1757 1758 if (lhs->snapid < rhs->snapid) 1759 return -1; 1760 if (lhs->snapid > rhs->snapid) 1761 return 1; 1762 1763 return 0; 1764 } 1765 1766 /* 1767 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX 1768 * compat stuff here. 1769 * 1770 * Assumes @hoid is zero-initialized. 1771 */ 1772 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid) 1773 { 1774 u8 struct_v; 1775 u32 struct_len; 1776 int ret; 1777 1778 ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v, 1779 &struct_len); 1780 if (ret) 1781 return ret; 1782 1783 if (struct_v < 4) { 1784 pr_err("got struct_v %d < 4 of hobject_t\n", struct_v); 1785 goto e_inval; 1786 } 1787 1788 hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len, 1789 GFP_NOIO); 1790 if (IS_ERR(hoid->key)) { 1791 ret = PTR_ERR(hoid->key); 1792 hoid->key = NULL; 1793 return ret; 1794 } 1795 1796 hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len, 1797 GFP_NOIO); 1798 if (IS_ERR(hoid->oid)) { 1799 ret = PTR_ERR(hoid->oid); 1800 hoid->oid = NULL; 1801 return ret; 1802 } 1803 1804 ceph_decode_64_safe(p, end, hoid->snapid, e_inval); 1805 ceph_decode_32_safe(p, end, hoid->hash, e_inval); 1806 ceph_decode_8_safe(p, end, hoid->is_max, e_inval); 1807 1808 hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len, 1809 GFP_NOIO); 1810 if (IS_ERR(hoid->nspace)) { 1811 ret = PTR_ERR(hoid->nspace); 1812 hoid->nspace = NULL; 1813 return ret; 1814 } 1815 1816 ceph_decode_64_safe(p, end, hoid->pool, e_inval); 1817 1818 ceph_hoid_build_hash_cache(hoid); 1819 return 0; 1820 1821 e_inval: 1822 return -EINVAL; 1823 } 1824 1825 static int hoid_encoding_size(const struct ceph_hobject_id *hoid) 1826 { 1827 return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */ 1828 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len; 1829 } 1830 1831 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid) 1832 { 1833 ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid)); 1834 ceph_encode_string(p, end, hoid->key, hoid->key_len); 1835 ceph_encode_string(p, end, hoid->oid, hoid->oid_len); 1836 ceph_encode_64(p, hoid->snapid); 1837 ceph_encode_32(p, hoid->hash); 1838 ceph_encode_8(p, hoid->is_max); 1839 ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len); 1840 ceph_encode_64(p, hoid->pool); 1841 } 1842 1843 static void free_hoid(struct ceph_hobject_id *hoid) 1844 { 1845 if (hoid) { 1846 kfree(hoid->key); 1847 kfree(hoid->oid); 1848 kfree(hoid->nspace); 1849 kfree(hoid); 1850 } 1851 } 1852 1853 static struct ceph_osd_backoff *alloc_backoff(void) 1854 { 1855 struct ceph_osd_backoff *backoff; 1856 1857 backoff = kzalloc(sizeof(*backoff), GFP_NOIO); 1858 if (!backoff) 1859 return NULL; 1860 1861 RB_CLEAR_NODE(&backoff->spg_node); 1862 RB_CLEAR_NODE(&backoff->id_node); 1863 return backoff; 1864 } 1865 1866 static void free_backoff(struct ceph_osd_backoff *backoff) 1867 { 1868 WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node)); 1869 WARN_ON(!RB_EMPTY_NODE(&backoff->id_node)); 1870 1871 free_hoid(backoff->begin); 1872 free_hoid(backoff->end); 1873 kfree(backoff); 1874 } 1875 1876 /* 1877 * Within a specific spgid, backoffs are managed by ->begin hoid. 1878 */ 1879 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare, 1880 RB_BYVAL, spg_node); 1881 1882 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root, 1883 const struct ceph_hobject_id *hoid) 1884 { 1885 struct rb_node *n = root->rb_node; 1886 1887 while (n) { 1888 struct ceph_osd_backoff *cur = 1889 rb_entry(n, struct ceph_osd_backoff, spg_node); 1890 int cmp; 1891 1892 cmp = hoid_compare(hoid, cur->begin); 1893 if (cmp < 0) { 1894 n = n->rb_left; 1895 } else if (cmp > 0) { 1896 if (hoid_compare(hoid, cur->end) < 0) 1897 return cur; 1898 1899 n = n->rb_right; 1900 } else { 1901 return cur; 1902 } 1903 } 1904 1905 return NULL; 1906 } 1907 1908 /* 1909 * Each backoff has a unique id within its OSD session. 1910 */ 1911 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node) 1912 1913 static void clear_backoffs(struct ceph_osd *osd) 1914 { 1915 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) { 1916 struct ceph_spg_mapping *spg = 1917 rb_entry(rb_first(&osd->o_backoff_mappings), 1918 struct ceph_spg_mapping, node); 1919 1920 while (!RB_EMPTY_ROOT(&spg->backoffs)) { 1921 struct ceph_osd_backoff *backoff = 1922 rb_entry(rb_first(&spg->backoffs), 1923 struct ceph_osd_backoff, spg_node); 1924 1925 erase_backoff(&spg->backoffs, backoff); 1926 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 1927 free_backoff(backoff); 1928 } 1929 erase_spg_mapping(&osd->o_backoff_mappings, spg); 1930 free_spg_mapping(spg); 1931 } 1932 } 1933 1934 /* 1935 * Set up a temporary, non-owning view into @t. 1936 */ 1937 static void hoid_fill_from_target(struct ceph_hobject_id *hoid, 1938 const struct ceph_osd_request_target *t) 1939 { 1940 hoid->key = NULL; 1941 hoid->key_len = 0; 1942 hoid->oid = t->target_oid.name; 1943 hoid->oid_len = t->target_oid.name_len; 1944 hoid->snapid = CEPH_NOSNAP; 1945 hoid->hash = t->pgid.seed; 1946 hoid->is_max = false; 1947 if (t->target_oloc.pool_ns) { 1948 hoid->nspace = t->target_oloc.pool_ns->str; 1949 hoid->nspace_len = t->target_oloc.pool_ns->len; 1950 } else { 1951 hoid->nspace = NULL; 1952 hoid->nspace_len = 0; 1953 } 1954 hoid->pool = t->target_oloc.pool; 1955 ceph_hoid_build_hash_cache(hoid); 1956 } 1957 1958 static bool should_plug_request(struct ceph_osd_request *req) 1959 { 1960 struct ceph_osd *osd = req->r_osd; 1961 struct ceph_spg_mapping *spg; 1962 struct ceph_osd_backoff *backoff; 1963 struct ceph_hobject_id hoid; 1964 1965 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid); 1966 if (!spg) 1967 return false; 1968 1969 hoid_fill_from_target(&hoid, &req->r_t); 1970 backoff = lookup_containing_backoff(&spg->backoffs, &hoid); 1971 if (!backoff) 1972 return false; 1973 1974 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n", 1975 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool, 1976 backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id); 1977 return true; 1978 } 1979 1980 /* 1981 * Keep get_num_data_items() in sync with this function. 1982 */ 1983 static void setup_request_data(struct ceph_osd_request *req) 1984 { 1985 struct ceph_msg *request_msg = req->r_request; 1986 struct ceph_msg *reply_msg = req->r_reply; 1987 struct ceph_osd_req_op *op; 1988 1989 if (req->r_request->num_data_items || req->r_reply->num_data_items) 1990 return; 1991 1992 WARN_ON(request_msg->data_length || reply_msg->data_length); 1993 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { 1994 switch (op->op) { 1995 /* request */ 1996 case CEPH_OSD_OP_WRITE: 1997 case CEPH_OSD_OP_WRITEFULL: 1998 WARN_ON(op->indata_len != op->extent.length); 1999 ceph_osdc_msg_data_add(request_msg, 2000 &op->extent.osd_data); 2001 break; 2002 case CEPH_OSD_OP_SETXATTR: 2003 case CEPH_OSD_OP_CMPXATTR: 2004 WARN_ON(op->indata_len != op->xattr.name_len + 2005 op->xattr.value_len); 2006 ceph_osdc_msg_data_add(request_msg, 2007 &op->xattr.osd_data); 2008 break; 2009 case CEPH_OSD_OP_NOTIFY_ACK: 2010 ceph_osdc_msg_data_add(request_msg, 2011 &op->notify_ack.request_data); 2012 break; 2013 case CEPH_OSD_OP_COPY_FROM2: 2014 ceph_osdc_msg_data_add(request_msg, 2015 &op->copy_from.osd_data); 2016 break; 2017 2018 /* reply */ 2019 case CEPH_OSD_OP_STAT: 2020 ceph_osdc_msg_data_add(reply_msg, 2021 &op->raw_data_in); 2022 break; 2023 case CEPH_OSD_OP_READ: 2024 ceph_osdc_msg_data_add(reply_msg, 2025 &op->extent.osd_data); 2026 break; 2027 case CEPH_OSD_OP_LIST_WATCHERS: 2028 ceph_osdc_msg_data_add(reply_msg, 2029 &op->list_watchers.response_data); 2030 break; 2031 2032 /* both */ 2033 case CEPH_OSD_OP_CALL: 2034 WARN_ON(op->indata_len != op->cls.class_len + 2035 op->cls.method_len + 2036 op->cls.indata_len); 2037 ceph_osdc_msg_data_add(request_msg, 2038 &op->cls.request_info); 2039 /* optional, can be NONE */ 2040 ceph_osdc_msg_data_add(request_msg, 2041 &op->cls.request_data); 2042 /* optional, can be NONE */ 2043 ceph_osdc_msg_data_add(reply_msg, 2044 &op->cls.response_data); 2045 break; 2046 case CEPH_OSD_OP_NOTIFY: 2047 ceph_osdc_msg_data_add(request_msg, 2048 &op->notify.request_data); 2049 ceph_osdc_msg_data_add(reply_msg, 2050 &op->notify.response_data); 2051 break; 2052 } 2053 } 2054 } 2055 2056 static void encode_pgid(void **p, const struct ceph_pg *pgid) 2057 { 2058 ceph_encode_8(p, 1); 2059 ceph_encode_64(p, pgid->pool); 2060 ceph_encode_32(p, pgid->seed); 2061 ceph_encode_32(p, -1); /* preferred */ 2062 } 2063 2064 static void encode_spgid(void **p, const struct ceph_spg *spgid) 2065 { 2066 ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1); 2067 encode_pgid(p, &spgid->pgid); 2068 ceph_encode_8(p, spgid->shard); 2069 } 2070 2071 static void encode_oloc(void **p, void *end, 2072 const struct ceph_object_locator *oloc) 2073 { 2074 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc)); 2075 ceph_encode_64(p, oloc->pool); 2076 ceph_encode_32(p, -1); /* preferred */ 2077 ceph_encode_32(p, 0); /* key len */ 2078 if (oloc->pool_ns) 2079 ceph_encode_string(p, end, oloc->pool_ns->str, 2080 oloc->pool_ns->len); 2081 else 2082 ceph_encode_32(p, 0); 2083 } 2084 2085 static void encode_request_partial(struct ceph_osd_request *req, 2086 struct ceph_msg *msg) 2087 { 2088 void *p = msg->front.iov_base; 2089 void *const end = p + msg->front_alloc_len; 2090 u32 data_len = 0; 2091 int i; 2092 2093 if (req->r_flags & CEPH_OSD_FLAG_WRITE) { 2094 /* snapshots aren't writeable */ 2095 WARN_ON(req->r_snapid != CEPH_NOSNAP); 2096 } else { 2097 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec || 2098 req->r_data_offset || req->r_snapc); 2099 } 2100 2101 setup_request_data(req); 2102 2103 encode_spgid(&p, &req->r_t.spgid); /* actual spg */ 2104 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ 2105 ceph_encode_32(&p, req->r_osdc->osdmap->epoch); 2106 ceph_encode_32(&p, req->r_flags); 2107 2108 /* reqid */ 2109 ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid)); 2110 memset(p, 0, sizeof(struct ceph_osd_reqid)); 2111 p += sizeof(struct ceph_osd_reqid); 2112 2113 /* trace */ 2114 memset(p, 0, sizeof(struct ceph_blkin_trace_info)); 2115 p += sizeof(struct ceph_blkin_trace_info); 2116 2117 ceph_encode_32(&p, 0); /* client_inc, always 0 */ 2118 ceph_encode_timespec64(p, &req->r_mtime); 2119 p += sizeof(struct ceph_timespec); 2120 2121 encode_oloc(&p, end, &req->r_t.target_oloc); 2122 ceph_encode_string(&p, end, req->r_t.target_oid.name, 2123 req->r_t.target_oid.name_len); 2124 2125 /* ops, can imply data */ 2126 ceph_encode_16(&p, req->r_num_ops); 2127 for (i = 0; i < req->r_num_ops; i++) { 2128 data_len += osd_req_encode_op(p, &req->r_ops[i]); 2129 p += sizeof(struct ceph_osd_op); 2130 } 2131 2132 ceph_encode_64(&p, req->r_snapid); /* snapid */ 2133 if (req->r_snapc) { 2134 ceph_encode_64(&p, req->r_snapc->seq); 2135 ceph_encode_32(&p, req->r_snapc->num_snaps); 2136 for (i = 0; i < req->r_snapc->num_snaps; i++) 2137 ceph_encode_64(&p, req->r_snapc->snaps[i]); 2138 } else { 2139 ceph_encode_64(&p, 0); /* snap_seq */ 2140 ceph_encode_32(&p, 0); /* snaps len */ 2141 } 2142 2143 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ 2144 BUG_ON(p > end - 8); /* space for features */ 2145 2146 msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */ 2147 /* front_len is finalized in encode_request_finish() */ 2148 msg->front.iov_len = p - msg->front.iov_base; 2149 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2150 msg->hdr.data_len = cpu_to_le32(data_len); 2151 /* 2152 * The header "data_off" is a hint to the receiver allowing it 2153 * to align received data into its buffers such that there's no 2154 * need to re-copy it before writing it to disk (direct I/O). 2155 */ 2156 msg->hdr.data_off = cpu_to_le16(req->r_data_offset); 2157 2158 dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg, 2159 req->r_t.target_oid.name, req->r_t.target_oid.name_len); 2160 } 2161 2162 static void encode_request_finish(struct ceph_msg *msg) 2163 { 2164 void *p = msg->front.iov_base; 2165 void *const partial_end = p + msg->front.iov_len; 2166 void *const end = p + msg->front_alloc_len; 2167 2168 if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) { 2169 /* luminous OSD -- encode features and be done */ 2170 p = partial_end; 2171 ceph_encode_64(&p, msg->con->peer_features); 2172 } else { 2173 struct { 2174 char spgid[CEPH_ENCODING_START_BLK_LEN + 2175 CEPH_PGID_ENCODING_LEN + 1]; 2176 __le32 hash; 2177 __le32 epoch; 2178 __le32 flags; 2179 char reqid[CEPH_ENCODING_START_BLK_LEN + 2180 sizeof(struct ceph_osd_reqid)]; 2181 char trace[sizeof(struct ceph_blkin_trace_info)]; 2182 __le32 client_inc; 2183 struct ceph_timespec mtime; 2184 } __packed head; 2185 struct ceph_pg pgid; 2186 void *oloc, *oid, *tail; 2187 int oloc_len, oid_len, tail_len; 2188 int len; 2189 2190 /* 2191 * Pre-luminous OSD -- reencode v8 into v4 using @head 2192 * as a temporary buffer. Encode the raw PG; the rest 2193 * is just a matter of moving oloc, oid and tail blobs 2194 * around. 2195 */ 2196 memcpy(&head, p, sizeof(head)); 2197 p += sizeof(head); 2198 2199 oloc = p; 2200 p += CEPH_ENCODING_START_BLK_LEN; 2201 pgid.pool = ceph_decode_64(&p); 2202 p += 4 + 4; /* preferred, key len */ 2203 len = ceph_decode_32(&p); 2204 p += len; /* nspace */ 2205 oloc_len = p - oloc; 2206 2207 oid = p; 2208 len = ceph_decode_32(&p); 2209 p += len; 2210 oid_len = p - oid; 2211 2212 tail = p; 2213 tail_len = partial_end - p; 2214 2215 p = msg->front.iov_base; 2216 ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc)); 2217 ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch)); 2218 ceph_encode_copy(&p, &head.flags, sizeof(head.flags)); 2219 ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime)); 2220 2221 /* reassert_version */ 2222 memset(p, 0, sizeof(struct ceph_eversion)); 2223 p += sizeof(struct ceph_eversion); 2224 2225 BUG_ON(p >= oloc); 2226 memmove(p, oloc, oloc_len); 2227 p += oloc_len; 2228 2229 pgid.seed = le32_to_cpu(head.hash); 2230 encode_pgid(&p, &pgid); /* raw pg */ 2231 2232 BUG_ON(p >= oid); 2233 memmove(p, oid, oid_len); 2234 p += oid_len; 2235 2236 /* tail -- ops, snapid, snapc, retry_attempt */ 2237 BUG_ON(p >= tail); 2238 memmove(p, tail, tail_len); 2239 p += tail_len; 2240 2241 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */ 2242 } 2243 2244 BUG_ON(p > end); 2245 msg->front.iov_len = p - msg->front.iov_base; 2246 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2247 2248 dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg, 2249 le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len), 2250 le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len), 2251 le16_to_cpu(msg->hdr.version)); 2252 } 2253 2254 /* 2255 * @req has to be assigned a tid and registered. 2256 */ 2257 static void send_request(struct ceph_osd_request *req) 2258 { 2259 struct ceph_osd *osd = req->r_osd; 2260 2261 verify_osd_locked(osd); 2262 WARN_ON(osd->o_osd != req->r_t.osd); 2263 2264 /* backoff? */ 2265 if (should_plug_request(req)) 2266 return; 2267 2268 /* 2269 * We may have a previously queued request message hanging 2270 * around. Cancel it to avoid corrupting the msgr. 2271 */ 2272 if (req->r_sent) 2273 ceph_msg_revoke(req->r_request); 2274 2275 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; 2276 if (req->r_attempts) 2277 req->r_flags |= CEPH_OSD_FLAG_RETRY; 2278 else 2279 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); 2280 2281 encode_request_partial(req, req->r_request); 2282 2283 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n", 2284 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, 2285 req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed, 2286 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags, 2287 req->r_attempts); 2288 2289 req->r_t.paused = false; 2290 req->r_stamp = jiffies; 2291 req->r_attempts++; 2292 2293 req->r_sent = osd->o_incarnation; 2294 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 2295 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); 2296 } 2297 2298 static void maybe_request_map(struct ceph_osd_client *osdc) 2299 { 2300 bool continuous = false; 2301 2302 verify_osdc_locked(osdc); 2303 WARN_ON(!osdc->osdmap->epoch); 2304 2305 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2306 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) || 2307 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2308 dout("%s osdc %p continuous\n", __func__, osdc); 2309 continuous = true; 2310 } else { 2311 dout("%s osdc %p onetime\n", __func__, osdc); 2312 } 2313 2314 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 2315 osdc->osdmap->epoch + 1, continuous)) 2316 ceph_monc_renew_subs(&osdc->client->monc); 2317 } 2318 2319 static void complete_request(struct ceph_osd_request *req, int err); 2320 static void send_map_check(struct ceph_osd_request *req); 2321 2322 static void __submit_request(struct ceph_osd_request *req, bool wrlocked) 2323 { 2324 struct ceph_osd_client *osdc = req->r_osdc; 2325 struct ceph_osd *osd; 2326 enum calc_target_result ct_res; 2327 int err = 0; 2328 bool need_send = false; 2329 bool promoted = false; 2330 2331 WARN_ON(req->r_tid); 2332 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 2333 2334 again: 2335 ct_res = calc_target(osdc, &req->r_t, false); 2336 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked) 2337 goto promote; 2338 2339 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked); 2340 if (IS_ERR(osd)) { 2341 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked); 2342 goto promote; 2343 } 2344 2345 if (osdc->abort_err) { 2346 dout("req %p abort_err %d\n", req, osdc->abort_err); 2347 err = osdc->abort_err; 2348 } else if (osdc->osdmap->epoch < osdc->epoch_barrier) { 2349 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, 2350 osdc->epoch_barrier); 2351 req->r_t.paused = true; 2352 maybe_request_map(osdc); 2353 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2354 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2355 dout("req %p pausewr\n", req); 2356 req->r_t.paused = true; 2357 maybe_request_map(osdc); 2358 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) && 2359 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2360 dout("req %p pauserd\n", req); 2361 req->r_t.paused = true; 2362 maybe_request_map(osdc); 2363 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2364 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY | 2365 CEPH_OSD_FLAG_FULL_FORCE)) && 2366 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2367 pool_full(osdc, req->r_t.base_oloc.pool))) { 2368 dout("req %p full/pool_full\n", req); 2369 if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) { 2370 err = -ENOSPC; 2371 } else { 2372 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) 2373 pr_warn_ratelimited("cluster is full (osdmap FULL)\n"); 2374 else 2375 pr_warn_ratelimited("pool %lld is full or reached quota\n", 2376 req->r_t.base_oloc.pool); 2377 req->r_t.paused = true; 2378 maybe_request_map(osdc); 2379 } 2380 } else if (!osd_homeless(osd)) { 2381 need_send = true; 2382 } else { 2383 maybe_request_map(osdc); 2384 } 2385 2386 mutex_lock(&osd->lock); 2387 /* 2388 * Assign the tid atomically with send_request() to protect 2389 * multiple writes to the same object from racing with each 2390 * other, resulting in out of order ops on the OSDs. 2391 */ 2392 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2393 link_request(osd, req); 2394 if (need_send) 2395 send_request(req); 2396 else if (err) 2397 complete_request(req, err); 2398 mutex_unlock(&osd->lock); 2399 2400 if (!err && ct_res == CALC_TARGET_POOL_DNE) 2401 send_map_check(req); 2402 2403 if (promoted) 2404 downgrade_write(&osdc->lock); 2405 return; 2406 2407 promote: 2408 up_read(&osdc->lock); 2409 down_write(&osdc->lock); 2410 wrlocked = true; 2411 promoted = true; 2412 goto again; 2413 } 2414 2415 static void account_request(struct ceph_osd_request *req) 2416 { 2417 WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK)); 2418 WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); 2419 2420 req->r_flags |= CEPH_OSD_FLAG_ONDISK; 2421 atomic_inc(&req->r_osdc->num_requests); 2422 2423 req->r_start_stamp = jiffies; 2424 req->r_start_latency = ktime_get(); 2425 } 2426 2427 static void submit_request(struct ceph_osd_request *req, bool wrlocked) 2428 { 2429 ceph_osdc_get_request(req); 2430 account_request(req); 2431 __submit_request(req, wrlocked); 2432 } 2433 2434 static void finish_request(struct ceph_osd_request *req) 2435 { 2436 struct ceph_osd_client *osdc = req->r_osdc; 2437 2438 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); 2439 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2440 2441 req->r_end_latency = ktime_get(); 2442 2443 if (req->r_osd) 2444 unlink_request(req->r_osd, req); 2445 atomic_dec(&osdc->num_requests); 2446 2447 /* 2448 * If an OSD has failed or returned and a request has been sent 2449 * twice, it's possible to get a reply and end up here while the 2450 * request message is queued for delivery. We will ignore the 2451 * reply, so not a big deal, but better to try and catch it. 2452 */ 2453 ceph_msg_revoke(req->r_request); 2454 ceph_msg_revoke_incoming(req->r_reply); 2455 } 2456 2457 static void __complete_request(struct ceph_osd_request *req) 2458 { 2459 dout("%s req %p tid %llu cb %ps result %d\n", __func__, req, 2460 req->r_tid, req->r_callback, req->r_result); 2461 2462 if (req->r_callback) 2463 req->r_callback(req); 2464 complete_all(&req->r_completion); 2465 ceph_osdc_put_request(req); 2466 } 2467 2468 static void complete_request_workfn(struct work_struct *work) 2469 { 2470 struct ceph_osd_request *req = 2471 container_of(work, struct ceph_osd_request, r_complete_work); 2472 2473 __complete_request(req); 2474 } 2475 2476 /* 2477 * This is open-coded in handle_reply(). 2478 */ 2479 static void complete_request(struct ceph_osd_request *req, int err) 2480 { 2481 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2482 2483 req->r_result = err; 2484 finish_request(req); 2485 2486 INIT_WORK(&req->r_complete_work, complete_request_workfn); 2487 queue_work(req->r_osdc->completion_wq, &req->r_complete_work); 2488 } 2489 2490 static void cancel_map_check(struct ceph_osd_request *req) 2491 { 2492 struct ceph_osd_client *osdc = req->r_osdc; 2493 struct ceph_osd_request *lookup_req; 2494 2495 verify_osdc_wrlocked(osdc); 2496 2497 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2498 if (!lookup_req) 2499 return; 2500 2501 WARN_ON(lookup_req != req); 2502 erase_request_mc(&osdc->map_checks, req); 2503 ceph_osdc_put_request(req); 2504 } 2505 2506 static void cancel_request(struct ceph_osd_request *req) 2507 { 2508 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2509 2510 cancel_map_check(req); 2511 finish_request(req); 2512 complete_all(&req->r_completion); 2513 ceph_osdc_put_request(req); 2514 } 2515 2516 static void abort_request(struct ceph_osd_request *req, int err) 2517 { 2518 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2519 2520 cancel_map_check(req); 2521 complete_request(req, err); 2522 } 2523 2524 static int abort_fn(struct ceph_osd_request *req, void *arg) 2525 { 2526 int err = *(int *)arg; 2527 2528 abort_request(req, err); 2529 return 0; /* continue iteration */ 2530 } 2531 2532 /* 2533 * Abort all in-flight requests with @err and arrange for all future 2534 * requests to be failed immediately. 2535 */ 2536 void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err) 2537 { 2538 dout("%s osdc %p err %d\n", __func__, osdc, err); 2539 down_write(&osdc->lock); 2540 for_each_request(osdc, abort_fn, &err); 2541 osdc->abort_err = err; 2542 up_write(&osdc->lock); 2543 } 2544 EXPORT_SYMBOL(ceph_osdc_abort_requests); 2545 2546 void ceph_osdc_clear_abort_err(struct ceph_osd_client *osdc) 2547 { 2548 down_write(&osdc->lock); 2549 osdc->abort_err = 0; 2550 up_write(&osdc->lock); 2551 } 2552 EXPORT_SYMBOL(ceph_osdc_clear_abort_err); 2553 2554 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2555 { 2556 if (likely(eb > osdc->epoch_barrier)) { 2557 dout("updating epoch_barrier from %u to %u\n", 2558 osdc->epoch_barrier, eb); 2559 osdc->epoch_barrier = eb; 2560 /* Request map if we're not to the barrier yet */ 2561 if (eb > osdc->osdmap->epoch) 2562 maybe_request_map(osdc); 2563 } 2564 } 2565 2566 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2567 { 2568 down_read(&osdc->lock); 2569 if (unlikely(eb > osdc->epoch_barrier)) { 2570 up_read(&osdc->lock); 2571 down_write(&osdc->lock); 2572 update_epoch_barrier(osdc, eb); 2573 up_write(&osdc->lock); 2574 } else { 2575 up_read(&osdc->lock); 2576 } 2577 } 2578 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); 2579 2580 /* 2581 * We can end up releasing caps as a result of abort_request(). 2582 * In that case, we probably want to ensure that the cap release message 2583 * has an updated epoch barrier in it, so set the epoch barrier prior to 2584 * aborting the first request. 2585 */ 2586 static int abort_on_full_fn(struct ceph_osd_request *req, void *arg) 2587 { 2588 struct ceph_osd_client *osdc = req->r_osdc; 2589 bool *victims = arg; 2590 2591 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2592 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2593 pool_full(osdc, req->r_t.base_oloc.pool))) { 2594 if (!*victims) { 2595 update_epoch_barrier(osdc, osdc->osdmap->epoch); 2596 *victims = true; 2597 } 2598 abort_request(req, -ENOSPC); 2599 } 2600 2601 return 0; /* continue iteration */ 2602 } 2603 2604 /* 2605 * Drop all pending requests that are stalled waiting on a full condition to 2606 * clear, and complete them with ENOSPC as the return code. Set the 2607 * osdc->epoch_barrier to the latest map epoch that we've seen if any were 2608 * cancelled. 2609 */ 2610 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) 2611 { 2612 bool victims = false; 2613 2614 if (ceph_test_opt(osdc->client, ABORT_ON_FULL) && 2615 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc))) 2616 for_each_request(osdc, abort_on_full_fn, &victims); 2617 } 2618 2619 static void check_pool_dne(struct ceph_osd_request *req) 2620 { 2621 struct ceph_osd_client *osdc = req->r_osdc; 2622 struct ceph_osdmap *map = osdc->osdmap; 2623 2624 verify_osdc_wrlocked(osdc); 2625 WARN_ON(!map->epoch); 2626 2627 if (req->r_attempts) { 2628 /* 2629 * We sent a request earlier, which means that 2630 * previously the pool existed, and now it does not 2631 * (i.e., it was deleted). 2632 */ 2633 req->r_map_dne_bound = map->epoch; 2634 dout("%s req %p tid %llu pool disappeared\n", __func__, req, 2635 req->r_tid); 2636 } else { 2637 dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__, 2638 req, req->r_tid, req->r_map_dne_bound, map->epoch); 2639 } 2640 2641 if (req->r_map_dne_bound) { 2642 if (map->epoch >= req->r_map_dne_bound) { 2643 /* we had a new enough map */ 2644 pr_info_ratelimited("tid %llu pool does not exist\n", 2645 req->r_tid); 2646 complete_request(req, -ENOENT); 2647 } 2648 } else { 2649 send_map_check(req); 2650 } 2651 } 2652 2653 static void map_check_cb(struct ceph_mon_generic_request *greq) 2654 { 2655 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 2656 struct ceph_osd_request *req; 2657 u64 tid = greq->private_data; 2658 2659 WARN_ON(greq->result || !greq->u.newest); 2660 2661 down_write(&osdc->lock); 2662 req = lookup_request_mc(&osdc->map_checks, tid); 2663 if (!req) { 2664 dout("%s tid %llu dne\n", __func__, tid); 2665 goto out_unlock; 2666 } 2667 2668 dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__, 2669 req, req->r_tid, req->r_map_dne_bound, greq->u.newest); 2670 if (!req->r_map_dne_bound) 2671 req->r_map_dne_bound = greq->u.newest; 2672 erase_request_mc(&osdc->map_checks, req); 2673 check_pool_dne(req); 2674 2675 ceph_osdc_put_request(req); 2676 out_unlock: 2677 up_write(&osdc->lock); 2678 } 2679 2680 static void send_map_check(struct ceph_osd_request *req) 2681 { 2682 struct ceph_osd_client *osdc = req->r_osdc; 2683 struct ceph_osd_request *lookup_req; 2684 int ret; 2685 2686 verify_osdc_wrlocked(osdc); 2687 2688 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2689 if (lookup_req) { 2690 WARN_ON(lookup_req != req); 2691 return; 2692 } 2693 2694 ceph_osdc_get_request(req); 2695 insert_request_mc(&osdc->map_checks, req); 2696 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 2697 map_check_cb, req->r_tid); 2698 WARN_ON(ret); 2699 } 2700 2701 /* 2702 * lingering requests, watch/notify v2 infrastructure 2703 */ 2704 static void linger_release(struct kref *kref) 2705 { 2706 struct ceph_osd_linger_request *lreq = 2707 container_of(kref, struct ceph_osd_linger_request, kref); 2708 2709 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, 2710 lreq->reg_req, lreq->ping_req); 2711 WARN_ON(!RB_EMPTY_NODE(&lreq->node)); 2712 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); 2713 WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node)); 2714 WARN_ON(!list_empty(&lreq->scan_item)); 2715 WARN_ON(!list_empty(&lreq->pending_lworks)); 2716 WARN_ON(lreq->osd); 2717 2718 if (lreq->request_pl) 2719 ceph_pagelist_release(lreq->request_pl); 2720 if (lreq->notify_id_pages) 2721 ceph_release_page_vector(lreq->notify_id_pages, 1); 2722 2723 ceph_osdc_put_request(lreq->reg_req); 2724 ceph_osdc_put_request(lreq->ping_req); 2725 target_destroy(&lreq->t); 2726 kfree(lreq); 2727 } 2728 2729 static void linger_put(struct ceph_osd_linger_request *lreq) 2730 { 2731 if (lreq) 2732 kref_put(&lreq->kref, linger_release); 2733 } 2734 2735 static struct ceph_osd_linger_request * 2736 linger_get(struct ceph_osd_linger_request *lreq) 2737 { 2738 kref_get(&lreq->kref); 2739 return lreq; 2740 } 2741 2742 static struct ceph_osd_linger_request * 2743 linger_alloc(struct ceph_osd_client *osdc) 2744 { 2745 struct ceph_osd_linger_request *lreq; 2746 2747 lreq = kzalloc(sizeof(*lreq), GFP_NOIO); 2748 if (!lreq) 2749 return NULL; 2750 2751 kref_init(&lreq->kref); 2752 mutex_init(&lreq->lock); 2753 RB_CLEAR_NODE(&lreq->node); 2754 RB_CLEAR_NODE(&lreq->osdc_node); 2755 RB_CLEAR_NODE(&lreq->mc_node); 2756 INIT_LIST_HEAD(&lreq->scan_item); 2757 INIT_LIST_HEAD(&lreq->pending_lworks); 2758 init_completion(&lreq->reg_commit_wait); 2759 init_completion(&lreq->notify_finish_wait); 2760 2761 lreq->osdc = osdc; 2762 target_init(&lreq->t); 2763 2764 dout("%s lreq %p\n", __func__, lreq); 2765 return lreq; 2766 } 2767 2768 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) 2769 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) 2770 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node) 2771 2772 /* 2773 * Create linger request <-> OSD session relation. 2774 * 2775 * @lreq has to be registered, @osd may be homeless. 2776 */ 2777 static void link_linger(struct ceph_osd *osd, 2778 struct ceph_osd_linger_request *lreq) 2779 { 2780 verify_osd_locked(osd); 2781 WARN_ON(!lreq->linger_id || lreq->osd); 2782 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2783 osd->o_osd, lreq, lreq->linger_id); 2784 2785 if (!osd_homeless(osd)) 2786 __remove_osd_from_lru(osd); 2787 else 2788 atomic_inc(&osd->o_osdc->num_homeless); 2789 2790 get_osd(osd); 2791 insert_linger(&osd->o_linger_requests, lreq); 2792 lreq->osd = osd; 2793 } 2794 2795 static void unlink_linger(struct ceph_osd *osd, 2796 struct ceph_osd_linger_request *lreq) 2797 { 2798 verify_osd_locked(osd); 2799 WARN_ON(lreq->osd != osd); 2800 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2801 osd->o_osd, lreq, lreq->linger_id); 2802 2803 lreq->osd = NULL; 2804 erase_linger(&osd->o_linger_requests, lreq); 2805 put_osd(osd); 2806 2807 if (!osd_homeless(osd)) 2808 maybe_move_osd_to_lru(osd); 2809 else 2810 atomic_dec(&osd->o_osdc->num_homeless); 2811 } 2812 2813 static bool __linger_registered(struct ceph_osd_linger_request *lreq) 2814 { 2815 verify_osdc_locked(lreq->osdc); 2816 2817 return !RB_EMPTY_NODE(&lreq->osdc_node); 2818 } 2819 2820 static bool linger_registered(struct ceph_osd_linger_request *lreq) 2821 { 2822 struct ceph_osd_client *osdc = lreq->osdc; 2823 bool registered; 2824 2825 down_read(&osdc->lock); 2826 registered = __linger_registered(lreq); 2827 up_read(&osdc->lock); 2828 2829 return registered; 2830 } 2831 2832 static void linger_register(struct ceph_osd_linger_request *lreq) 2833 { 2834 struct ceph_osd_client *osdc = lreq->osdc; 2835 2836 verify_osdc_wrlocked(osdc); 2837 WARN_ON(lreq->linger_id); 2838 2839 linger_get(lreq); 2840 lreq->linger_id = ++osdc->last_linger_id; 2841 insert_linger_osdc(&osdc->linger_requests, lreq); 2842 } 2843 2844 static void linger_unregister(struct ceph_osd_linger_request *lreq) 2845 { 2846 struct ceph_osd_client *osdc = lreq->osdc; 2847 2848 verify_osdc_wrlocked(osdc); 2849 2850 erase_linger_osdc(&osdc->linger_requests, lreq); 2851 linger_put(lreq); 2852 } 2853 2854 static void cancel_linger_request(struct ceph_osd_request *req) 2855 { 2856 struct ceph_osd_linger_request *lreq = req->r_priv; 2857 2858 WARN_ON(!req->r_linger); 2859 cancel_request(req); 2860 linger_put(lreq); 2861 } 2862 2863 struct linger_work { 2864 struct work_struct work; 2865 struct ceph_osd_linger_request *lreq; 2866 struct list_head pending_item; 2867 unsigned long queued_stamp; 2868 2869 union { 2870 struct { 2871 u64 notify_id; 2872 u64 notifier_id; 2873 void *payload; /* points into @msg front */ 2874 size_t payload_len; 2875 2876 struct ceph_msg *msg; /* for ceph_msg_put() */ 2877 } notify; 2878 struct { 2879 int err; 2880 } error; 2881 }; 2882 }; 2883 2884 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, 2885 work_func_t workfn) 2886 { 2887 struct linger_work *lwork; 2888 2889 lwork = kzalloc(sizeof(*lwork), GFP_NOIO); 2890 if (!lwork) 2891 return NULL; 2892 2893 INIT_WORK(&lwork->work, workfn); 2894 INIT_LIST_HEAD(&lwork->pending_item); 2895 lwork->lreq = linger_get(lreq); 2896 2897 return lwork; 2898 } 2899 2900 static void lwork_free(struct linger_work *lwork) 2901 { 2902 struct ceph_osd_linger_request *lreq = lwork->lreq; 2903 2904 mutex_lock(&lreq->lock); 2905 list_del(&lwork->pending_item); 2906 mutex_unlock(&lreq->lock); 2907 2908 linger_put(lreq); 2909 kfree(lwork); 2910 } 2911 2912 static void lwork_queue(struct linger_work *lwork) 2913 { 2914 struct ceph_osd_linger_request *lreq = lwork->lreq; 2915 struct ceph_osd_client *osdc = lreq->osdc; 2916 2917 verify_lreq_locked(lreq); 2918 WARN_ON(!list_empty(&lwork->pending_item)); 2919 2920 lwork->queued_stamp = jiffies; 2921 list_add_tail(&lwork->pending_item, &lreq->pending_lworks); 2922 queue_work(osdc->notify_wq, &lwork->work); 2923 } 2924 2925 static void do_watch_notify(struct work_struct *w) 2926 { 2927 struct linger_work *lwork = container_of(w, struct linger_work, work); 2928 struct ceph_osd_linger_request *lreq = lwork->lreq; 2929 2930 if (!linger_registered(lreq)) { 2931 dout("%s lreq %p not registered\n", __func__, lreq); 2932 goto out; 2933 } 2934 2935 WARN_ON(!lreq->is_watch); 2936 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", 2937 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, 2938 lwork->notify.payload_len); 2939 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, 2940 lwork->notify.notifier_id, lwork->notify.payload, 2941 lwork->notify.payload_len); 2942 2943 out: 2944 ceph_msg_put(lwork->notify.msg); 2945 lwork_free(lwork); 2946 } 2947 2948 static void do_watch_error(struct work_struct *w) 2949 { 2950 struct linger_work *lwork = container_of(w, struct linger_work, work); 2951 struct ceph_osd_linger_request *lreq = lwork->lreq; 2952 2953 if (!linger_registered(lreq)) { 2954 dout("%s lreq %p not registered\n", __func__, lreq); 2955 goto out; 2956 } 2957 2958 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); 2959 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); 2960 2961 out: 2962 lwork_free(lwork); 2963 } 2964 2965 static void queue_watch_error(struct ceph_osd_linger_request *lreq) 2966 { 2967 struct linger_work *lwork; 2968 2969 lwork = lwork_alloc(lreq, do_watch_error); 2970 if (!lwork) { 2971 pr_err("failed to allocate error-lwork\n"); 2972 return; 2973 } 2974 2975 lwork->error.err = lreq->last_error; 2976 lwork_queue(lwork); 2977 } 2978 2979 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, 2980 int result) 2981 { 2982 if (!completion_done(&lreq->reg_commit_wait)) { 2983 lreq->reg_commit_error = (result <= 0 ? result : 0); 2984 complete_all(&lreq->reg_commit_wait); 2985 } 2986 } 2987 2988 static void linger_commit_cb(struct ceph_osd_request *req) 2989 { 2990 struct ceph_osd_linger_request *lreq = req->r_priv; 2991 2992 mutex_lock(&lreq->lock); 2993 if (req != lreq->reg_req) { 2994 dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n", 2995 __func__, lreq, lreq->linger_id, req, lreq->reg_req); 2996 goto out; 2997 } 2998 2999 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, 3000 lreq->linger_id, req->r_result); 3001 linger_reg_commit_complete(lreq, req->r_result); 3002 lreq->committed = true; 3003 3004 if (!lreq->is_watch) { 3005 struct ceph_osd_data *osd_data = 3006 osd_req_op_data(req, 0, notify, response_data); 3007 void *p = page_address(osd_data->pages[0]); 3008 3009 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY || 3010 osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 3011 3012 /* make note of the notify_id */ 3013 if (req->r_ops[0].outdata_len >= sizeof(u64)) { 3014 lreq->notify_id = ceph_decode_64(&p); 3015 dout("lreq %p notify_id %llu\n", lreq, 3016 lreq->notify_id); 3017 } else { 3018 dout("lreq %p no notify_id\n", lreq); 3019 } 3020 } 3021 3022 out: 3023 mutex_unlock(&lreq->lock); 3024 linger_put(lreq); 3025 } 3026 3027 static int normalize_watch_error(int err) 3028 { 3029 /* 3030 * Translate ENOENT -> ENOTCONN so that a delete->disconnection 3031 * notification and a failure to reconnect because we raced with 3032 * the delete appear the same to the user. 3033 */ 3034 if (err == -ENOENT) 3035 err = -ENOTCONN; 3036 3037 return err; 3038 } 3039 3040 static void linger_reconnect_cb(struct ceph_osd_request *req) 3041 { 3042 struct ceph_osd_linger_request *lreq = req->r_priv; 3043 3044 mutex_lock(&lreq->lock); 3045 if (req != lreq->reg_req) { 3046 dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n", 3047 __func__, lreq, lreq->linger_id, req, lreq->reg_req); 3048 goto out; 3049 } 3050 3051 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, 3052 lreq, lreq->linger_id, req->r_result, lreq->last_error); 3053 if (req->r_result < 0) { 3054 if (!lreq->last_error) { 3055 lreq->last_error = normalize_watch_error(req->r_result); 3056 queue_watch_error(lreq); 3057 } 3058 } 3059 3060 out: 3061 mutex_unlock(&lreq->lock); 3062 linger_put(lreq); 3063 } 3064 3065 static void send_linger(struct ceph_osd_linger_request *lreq) 3066 { 3067 struct ceph_osd_client *osdc = lreq->osdc; 3068 struct ceph_osd_request *req; 3069 int ret; 3070 3071 verify_osdc_wrlocked(osdc); 3072 mutex_lock(&lreq->lock); 3073 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3074 3075 if (lreq->reg_req) { 3076 if (lreq->reg_req->r_osd) 3077 cancel_linger_request(lreq->reg_req); 3078 ceph_osdc_put_request(lreq->reg_req); 3079 } 3080 3081 req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO); 3082 BUG_ON(!req); 3083 3084 target_copy(&req->r_t, &lreq->t); 3085 req->r_mtime = lreq->mtime; 3086 3087 if (lreq->is_watch && lreq->committed) { 3088 osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_RECONNECT, 3089 lreq->linger_id, ++lreq->register_gen); 3090 dout("lreq %p reconnect register_gen %u\n", lreq, 3091 req->r_ops[0].watch.gen); 3092 req->r_callback = linger_reconnect_cb; 3093 } else { 3094 if (lreq->is_watch) { 3095 osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_WATCH, 3096 lreq->linger_id, 0); 3097 } else { 3098 lreq->notify_id = 0; 3099 3100 refcount_inc(&lreq->request_pl->refcnt); 3101 osd_req_op_notify_init(req, 0, lreq->linger_id, 3102 lreq->request_pl); 3103 ceph_osd_data_pages_init( 3104 osd_req_op_data(req, 0, notify, response_data), 3105 lreq->notify_id_pages, PAGE_SIZE, 0, false, false); 3106 } 3107 dout("lreq %p register\n", lreq); 3108 req->r_callback = linger_commit_cb; 3109 } 3110 3111 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 3112 BUG_ON(ret); 3113 3114 req->r_priv = linger_get(lreq); 3115 req->r_linger = true; 3116 lreq->reg_req = req; 3117 mutex_unlock(&lreq->lock); 3118 3119 submit_request(req, true); 3120 } 3121 3122 static void linger_ping_cb(struct ceph_osd_request *req) 3123 { 3124 struct ceph_osd_linger_request *lreq = req->r_priv; 3125 3126 mutex_lock(&lreq->lock); 3127 if (req != lreq->ping_req) { 3128 dout("%s lreq %p linger_id %llu unknown req (%p != %p)\n", 3129 __func__, lreq, lreq->linger_id, req, lreq->ping_req); 3130 goto out; 3131 } 3132 3133 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", 3134 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, 3135 lreq->last_error); 3136 if (lreq->register_gen == req->r_ops[0].watch.gen) { 3137 if (!req->r_result) { 3138 lreq->watch_valid_thru = lreq->ping_sent; 3139 } else if (!lreq->last_error) { 3140 lreq->last_error = normalize_watch_error(req->r_result); 3141 queue_watch_error(lreq); 3142 } 3143 } else { 3144 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, 3145 lreq->register_gen, req->r_ops[0].watch.gen); 3146 } 3147 3148 out: 3149 mutex_unlock(&lreq->lock); 3150 linger_put(lreq); 3151 } 3152 3153 static void send_linger_ping(struct ceph_osd_linger_request *lreq) 3154 { 3155 struct ceph_osd_client *osdc = lreq->osdc; 3156 struct ceph_osd_request *req; 3157 int ret; 3158 3159 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 3160 dout("%s PAUSERD\n", __func__); 3161 return; 3162 } 3163 3164 lreq->ping_sent = jiffies; 3165 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", 3166 __func__, lreq, lreq->linger_id, lreq->ping_sent, 3167 lreq->register_gen); 3168 3169 if (lreq->ping_req) { 3170 if (lreq->ping_req->r_osd) 3171 cancel_linger_request(lreq->ping_req); 3172 ceph_osdc_put_request(lreq->ping_req); 3173 } 3174 3175 req = ceph_osdc_alloc_request(osdc, NULL, 1, true, GFP_NOIO); 3176 BUG_ON(!req); 3177 3178 target_copy(&req->r_t, &lreq->t); 3179 osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_PING, lreq->linger_id, 3180 lreq->register_gen); 3181 req->r_callback = linger_ping_cb; 3182 3183 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 3184 BUG_ON(ret); 3185 3186 req->r_priv = linger_get(lreq); 3187 req->r_linger = true; 3188 lreq->ping_req = req; 3189 3190 ceph_osdc_get_request(req); 3191 account_request(req); 3192 req->r_tid = atomic64_inc_return(&osdc->last_tid); 3193 link_request(lreq->osd, req); 3194 send_request(req); 3195 } 3196 3197 static void linger_submit(struct ceph_osd_linger_request *lreq) 3198 { 3199 struct ceph_osd_client *osdc = lreq->osdc; 3200 struct ceph_osd *osd; 3201 3202 down_write(&osdc->lock); 3203 linger_register(lreq); 3204 3205 calc_target(osdc, &lreq->t, false); 3206 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3207 link_linger(osd, lreq); 3208 3209 send_linger(lreq); 3210 up_write(&osdc->lock); 3211 } 3212 3213 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) 3214 { 3215 struct ceph_osd_client *osdc = lreq->osdc; 3216 struct ceph_osd_linger_request *lookup_lreq; 3217 3218 verify_osdc_wrlocked(osdc); 3219 3220 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 3221 lreq->linger_id); 3222 if (!lookup_lreq) 3223 return; 3224 3225 WARN_ON(lookup_lreq != lreq); 3226 erase_linger_mc(&osdc->linger_map_checks, lreq); 3227 linger_put(lreq); 3228 } 3229 3230 /* 3231 * @lreq has to be both registered and linked. 3232 */ 3233 static void __linger_cancel(struct ceph_osd_linger_request *lreq) 3234 { 3235 if (lreq->ping_req && lreq->ping_req->r_osd) 3236 cancel_linger_request(lreq->ping_req); 3237 if (lreq->reg_req && lreq->reg_req->r_osd) 3238 cancel_linger_request(lreq->reg_req); 3239 cancel_linger_map_check(lreq); 3240 unlink_linger(lreq->osd, lreq); 3241 linger_unregister(lreq); 3242 } 3243 3244 static void linger_cancel(struct ceph_osd_linger_request *lreq) 3245 { 3246 struct ceph_osd_client *osdc = lreq->osdc; 3247 3248 down_write(&osdc->lock); 3249 if (__linger_registered(lreq)) 3250 __linger_cancel(lreq); 3251 up_write(&osdc->lock); 3252 } 3253 3254 static void send_linger_map_check(struct ceph_osd_linger_request *lreq); 3255 3256 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq) 3257 { 3258 struct ceph_osd_client *osdc = lreq->osdc; 3259 struct ceph_osdmap *map = osdc->osdmap; 3260 3261 verify_osdc_wrlocked(osdc); 3262 WARN_ON(!map->epoch); 3263 3264 if (lreq->register_gen) { 3265 lreq->map_dne_bound = map->epoch; 3266 dout("%s lreq %p linger_id %llu pool disappeared\n", __func__, 3267 lreq, lreq->linger_id); 3268 } else { 3269 dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n", 3270 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3271 map->epoch); 3272 } 3273 3274 if (lreq->map_dne_bound) { 3275 if (map->epoch >= lreq->map_dne_bound) { 3276 /* we had a new enough map */ 3277 pr_info("linger_id %llu pool does not exist\n", 3278 lreq->linger_id); 3279 linger_reg_commit_complete(lreq, -ENOENT); 3280 __linger_cancel(lreq); 3281 } 3282 } else { 3283 send_linger_map_check(lreq); 3284 } 3285 } 3286 3287 static void linger_map_check_cb(struct ceph_mon_generic_request *greq) 3288 { 3289 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 3290 struct ceph_osd_linger_request *lreq; 3291 u64 linger_id = greq->private_data; 3292 3293 WARN_ON(greq->result || !greq->u.newest); 3294 3295 down_write(&osdc->lock); 3296 lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id); 3297 if (!lreq) { 3298 dout("%s linger_id %llu dne\n", __func__, linger_id); 3299 goto out_unlock; 3300 } 3301 3302 dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n", 3303 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3304 greq->u.newest); 3305 if (!lreq->map_dne_bound) 3306 lreq->map_dne_bound = greq->u.newest; 3307 erase_linger_mc(&osdc->linger_map_checks, lreq); 3308 check_linger_pool_dne(lreq); 3309 3310 linger_put(lreq); 3311 out_unlock: 3312 up_write(&osdc->lock); 3313 } 3314 3315 static void send_linger_map_check(struct ceph_osd_linger_request *lreq) 3316 { 3317 struct ceph_osd_client *osdc = lreq->osdc; 3318 struct ceph_osd_linger_request *lookup_lreq; 3319 int ret; 3320 3321 verify_osdc_wrlocked(osdc); 3322 3323 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 3324 lreq->linger_id); 3325 if (lookup_lreq) { 3326 WARN_ON(lookup_lreq != lreq); 3327 return; 3328 } 3329 3330 linger_get(lreq); 3331 insert_linger_mc(&osdc->linger_map_checks, lreq); 3332 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 3333 linger_map_check_cb, lreq->linger_id); 3334 WARN_ON(ret); 3335 } 3336 3337 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) 3338 { 3339 int ret; 3340 3341 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3342 ret = wait_for_completion_killable(&lreq->reg_commit_wait); 3343 return ret ?: lreq->reg_commit_error; 3344 } 3345 3346 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq, 3347 unsigned long timeout) 3348 { 3349 long left; 3350 3351 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3352 left = wait_for_completion_killable_timeout(&lreq->notify_finish_wait, 3353 ceph_timeout_jiffies(timeout)); 3354 if (left <= 0) 3355 left = left ?: -ETIMEDOUT; 3356 else 3357 left = lreq->notify_finish_error; /* completed */ 3358 3359 return left; 3360 } 3361 3362 /* 3363 * Timeout callback, called every N seconds. When 1 or more OSD 3364 * requests has been active for more than N seconds, we send a keepalive 3365 * (tag + timestamp) to its OSD to ensure any communications channel 3366 * reset is detected. 3367 */ 3368 static void handle_timeout(struct work_struct *work) 3369 { 3370 struct ceph_osd_client *osdc = 3371 container_of(work, struct ceph_osd_client, timeout_work.work); 3372 struct ceph_options *opts = osdc->client->options; 3373 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout; 3374 unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout; 3375 LIST_HEAD(slow_osds); 3376 struct rb_node *n, *p; 3377 3378 dout("%s osdc %p\n", __func__, osdc); 3379 down_write(&osdc->lock); 3380 3381 /* 3382 * ping osds that are a bit slow. this ensures that if there 3383 * is a break in the TCP connection we will notice, and reopen 3384 * a connection with that osd (from the fault callback). 3385 */ 3386 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 3387 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3388 bool found = false; 3389 3390 for (p = rb_first(&osd->o_requests); p; ) { 3391 struct ceph_osd_request *req = 3392 rb_entry(p, struct ceph_osd_request, r_node); 3393 3394 p = rb_next(p); /* abort_request() */ 3395 3396 if (time_before(req->r_stamp, cutoff)) { 3397 dout(" req %p tid %llu on osd%d is laggy\n", 3398 req, req->r_tid, osd->o_osd); 3399 found = true; 3400 } 3401 if (opts->osd_request_timeout && 3402 time_before(req->r_start_stamp, expiry_cutoff)) { 3403 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3404 req->r_tid, osd->o_osd); 3405 abort_request(req, -ETIMEDOUT); 3406 } 3407 } 3408 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { 3409 struct ceph_osd_linger_request *lreq = 3410 rb_entry(p, struct ceph_osd_linger_request, node); 3411 3412 dout(" lreq %p linger_id %llu is served by osd%d\n", 3413 lreq, lreq->linger_id, osd->o_osd); 3414 found = true; 3415 3416 mutex_lock(&lreq->lock); 3417 if (lreq->is_watch && lreq->committed && !lreq->last_error) 3418 send_linger_ping(lreq); 3419 mutex_unlock(&lreq->lock); 3420 } 3421 3422 if (found) 3423 list_move_tail(&osd->o_keepalive_item, &slow_osds); 3424 } 3425 3426 if (opts->osd_request_timeout) { 3427 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { 3428 struct ceph_osd_request *req = 3429 rb_entry(p, struct ceph_osd_request, r_node); 3430 3431 p = rb_next(p); /* abort_request() */ 3432 3433 if (time_before(req->r_start_stamp, expiry_cutoff)) { 3434 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3435 req->r_tid, osdc->homeless_osd.o_osd); 3436 abort_request(req, -ETIMEDOUT); 3437 } 3438 } 3439 } 3440 3441 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds)) 3442 maybe_request_map(osdc); 3443 3444 while (!list_empty(&slow_osds)) { 3445 struct ceph_osd *osd = list_first_entry(&slow_osds, 3446 struct ceph_osd, 3447 o_keepalive_item); 3448 list_del_init(&osd->o_keepalive_item); 3449 ceph_con_keepalive(&osd->o_con); 3450 } 3451 3452 up_write(&osdc->lock); 3453 schedule_delayed_work(&osdc->timeout_work, 3454 osdc->client->options->osd_keepalive_timeout); 3455 } 3456 3457 static void handle_osds_timeout(struct work_struct *work) 3458 { 3459 struct ceph_osd_client *osdc = 3460 container_of(work, struct ceph_osd_client, 3461 osds_timeout_work.work); 3462 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 3463 struct ceph_osd *osd, *nosd; 3464 3465 dout("%s osdc %p\n", __func__, osdc); 3466 down_write(&osdc->lock); 3467 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 3468 if (time_before(jiffies, osd->lru_ttl)) 3469 break; 3470 3471 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 3472 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 3473 close_osd(osd); 3474 } 3475 3476 up_write(&osdc->lock); 3477 schedule_delayed_work(&osdc->osds_timeout_work, 3478 round_jiffies_relative(delay)); 3479 } 3480 3481 static int ceph_oloc_decode(void **p, void *end, 3482 struct ceph_object_locator *oloc) 3483 { 3484 u8 struct_v, struct_cv; 3485 u32 len; 3486 void *struct_end; 3487 int ret = 0; 3488 3489 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3490 struct_v = ceph_decode_8(p); 3491 struct_cv = ceph_decode_8(p); 3492 if (struct_v < 3) { 3493 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 3494 struct_v, struct_cv); 3495 goto e_inval; 3496 } 3497 if (struct_cv > 6) { 3498 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 3499 struct_v, struct_cv); 3500 goto e_inval; 3501 } 3502 len = ceph_decode_32(p); 3503 ceph_decode_need(p, end, len, e_inval); 3504 struct_end = *p + len; 3505 3506 oloc->pool = ceph_decode_64(p); 3507 *p += 4; /* skip preferred */ 3508 3509 len = ceph_decode_32(p); 3510 if (len > 0) { 3511 pr_warn("ceph_object_locator::key is set\n"); 3512 goto e_inval; 3513 } 3514 3515 if (struct_v >= 5) { 3516 bool changed = false; 3517 3518 len = ceph_decode_32(p); 3519 if (len > 0) { 3520 ceph_decode_need(p, end, len, e_inval); 3521 if (!oloc->pool_ns || 3522 ceph_compare_string(oloc->pool_ns, *p, len)) 3523 changed = true; 3524 *p += len; 3525 } else { 3526 if (oloc->pool_ns) 3527 changed = true; 3528 } 3529 if (changed) { 3530 /* redirect changes namespace */ 3531 pr_warn("ceph_object_locator::nspace is changed\n"); 3532 goto e_inval; 3533 } 3534 } 3535 3536 if (struct_v >= 6) { 3537 s64 hash = ceph_decode_64(p); 3538 if (hash != -1) { 3539 pr_warn("ceph_object_locator::hash is set\n"); 3540 goto e_inval; 3541 } 3542 } 3543 3544 /* skip the rest */ 3545 *p = struct_end; 3546 out: 3547 return ret; 3548 3549 e_inval: 3550 ret = -EINVAL; 3551 goto out; 3552 } 3553 3554 static int ceph_redirect_decode(void **p, void *end, 3555 struct ceph_request_redirect *redir) 3556 { 3557 u8 struct_v, struct_cv; 3558 u32 len; 3559 void *struct_end; 3560 int ret; 3561 3562 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3563 struct_v = ceph_decode_8(p); 3564 struct_cv = ceph_decode_8(p); 3565 if (struct_cv > 1) { 3566 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 3567 struct_v, struct_cv); 3568 goto e_inval; 3569 } 3570 len = ceph_decode_32(p); 3571 ceph_decode_need(p, end, len, e_inval); 3572 struct_end = *p + len; 3573 3574 ret = ceph_oloc_decode(p, end, &redir->oloc); 3575 if (ret) 3576 goto out; 3577 3578 len = ceph_decode_32(p); 3579 if (len > 0) { 3580 pr_warn("ceph_request_redirect::object_name is set\n"); 3581 goto e_inval; 3582 } 3583 3584 /* skip the rest */ 3585 *p = struct_end; 3586 out: 3587 return ret; 3588 3589 e_inval: 3590 ret = -EINVAL; 3591 goto out; 3592 } 3593 3594 struct MOSDOpReply { 3595 struct ceph_pg pgid; 3596 u64 flags; 3597 int result; 3598 u32 epoch; 3599 int num_ops; 3600 u32 outdata_len[CEPH_OSD_MAX_OPS]; 3601 s32 rval[CEPH_OSD_MAX_OPS]; 3602 int retry_attempt; 3603 struct ceph_eversion replay_version; 3604 u64 user_version; 3605 struct ceph_request_redirect redirect; 3606 }; 3607 3608 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m) 3609 { 3610 void *p = msg->front.iov_base; 3611 void *const end = p + msg->front.iov_len; 3612 u16 version = le16_to_cpu(msg->hdr.version); 3613 struct ceph_eversion bad_replay_version; 3614 u8 decode_redir; 3615 u32 len; 3616 int ret; 3617 int i; 3618 3619 ceph_decode_32_safe(&p, end, len, e_inval); 3620 ceph_decode_need(&p, end, len, e_inval); 3621 p += len; /* skip oid */ 3622 3623 ret = ceph_decode_pgid(&p, end, &m->pgid); 3624 if (ret) 3625 return ret; 3626 3627 ceph_decode_64_safe(&p, end, m->flags, e_inval); 3628 ceph_decode_32_safe(&p, end, m->result, e_inval); 3629 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval); 3630 memcpy(&bad_replay_version, p, sizeof(bad_replay_version)); 3631 p += sizeof(bad_replay_version); 3632 ceph_decode_32_safe(&p, end, m->epoch, e_inval); 3633 3634 ceph_decode_32_safe(&p, end, m->num_ops, e_inval); 3635 if (m->num_ops > ARRAY_SIZE(m->outdata_len)) 3636 goto e_inval; 3637 3638 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op), 3639 e_inval); 3640 for (i = 0; i < m->num_ops; i++) { 3641 struct ceph_osd_op *op = p; 3642 3643 m->outdata_len[i] = le32_to_cpu(op->payload_len); 3644 p += sizeof(*op); 3645 } 3646 3647 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval); 3648 for (i = 0; i < m->num_ops; i++) 3649 ceph_decode_32_safe(&p, end, m->rval[i], e_inval); 3650 3651 if (version >= 5) { 3652 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval); 3653 memcpy(&m->replay_version, p, sizeof(m->replay_version)); 3654 p += sizeof(m->replay_version); 3655 ceph_decode_64_safe(&p, end, m->user_version, e_inval); 3656 } else { 3657 m->replay_version = bad_replay_version; /* struct */ 3658 m->user_version = le64_to_cpu(m->replay_version.version); 3659 } 3660 3661 if (version >= 6) { 3662 if (version >= 7) 3663 ceph_decode_8_safe(&p, end, decode_redir, e_inval); 3664 else 3665 decode_redir = 1; 3666 } else { 3667 decode_redir = 0; 3668 } 3669 3670 if (decode_redir) { 3671 ret = ceph_redirect_decode(&p, end, &m->redirect); 3672 if (ret) 3673 return ret; 3674 } else { 3675 ceph_oloc_init(&m->redirect.oloc); 3676 } 3677 3678 return 0; 3679 3680 e_inval: 3681 return -EINVAL; 3682 } 3683 3684 /* 3685 * Handle MOSDOpReply. Set ->r_result and call the callback if it is 3686 * specified. 3687 */ 3688 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) 3689 { 3690 struct ceph_osd_client *osdc = osd->o_osdc; 3691 struct ceph_osd_request *req; 3692 struct MOSDOpReply m; 3693 u64 tid = le64_to_cpu(msg->hdr.tid); 3694 u32 data_len = 0; 3695 int ret; 3696 int i; 3697 3698 dout("%s msg %p tid %llu\n", __func__, msg, tid); 3699 3700 down_read(&osdc->lock); 3701 if (!osd_registered(osd)) { 3702 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3703 goto out_unlock_osdc; 3704 } 3705 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 3706 3707 mutex_lock(&osd->lock); 3708 req = lookup_request(&osd->o_requests, tid); 3709 if (!req) { 3710 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid); 3711 goto out_unlock_session; 3712 } 3713 3714 m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns; 3715 ret = decode_MOSDOpReply(msg, &m); 3716 m.redirect.oloc.pool_ns = NULL; 3717 if (ret) { 3718 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n", 3719 req->r_tid, ret); 3720 ceph_msg_dump(msg); 3721 goto fail_request; 3722 } 3723 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n", 3724 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed, 3725 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch), 3726 le64_to_cpu(m.replay_version.version), m.user_version); 3727 3728 if (m.retry_attempt >= 0) { 3729 if (m.retry_attempt != req->r_attempts - 1) { 3730 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n", 3731 req, req->r_tid, m.retry_attempt, 3732 req->r_attempts - 1); 3733 goto out_unlock_session; 3734 } 3735 } else { 3736 WARN_ON(1); /* MOSDOpReply v4 is assumed */ 3737 } 3738 3739 if (!ceph_oloc_empty(&m.redirect.oloc)) { 3740 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid, 3741 m.redirect.oloc.pool); 3742 unlink_request(osd, req); 3743 mutex_unlock(&osd->lock); 3744 3745 /* 3746 * Not ceph_oloc_copy() - changing pool_ns is not 3747 * supported. 3748 */ 3749 req->r_t.target_oloc.pool = m.redirect.oloc.pool; 3750 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED | 3751 CEPH_OSD_FLAG_IGNORE_OVERLAY | 3752 CEPH_OSD_FLAG_IGNORE_CACHE; 3753 req->r_tid = 0; 3754 __submit_request(req, false); 3755 goto out_unlock_osdc; 3756 } 3757 3758 if (m.result == -EAGAIN) { 3759 dout("req %p tid %llu EAGAIN\n", req, req->r_tid); 3760 unlink_request(osd, req); 3761 mutex_unlock(&osd->lock); 3762 3763 /* 3764 * The object is missing on the replica or not (yet) 3765 * readable. Clear pgid to force a resend to the primary 3766 * via legacy_change. 3767 */ 3768 req->r_t.pgid.pool = 0; 3769 req->r_t.pgid.seed = 0; 3770 WARN_ON(!req->r_t.used_replica); 3771 req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS | 3772 CEPH_OSD_FLAG_LOCALIZE_READS); 3773 req->r_tid = 0; 3774 __submit_request(req, false); 3775 goto out_unlock_osdc; 3776 } 3777 3778 if (m.num_ops != req->r_num_ops) { 3779 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops, 3780 req->r_num_ops, req->r_tid); 3781 goto fail_request; 3782 } 3783 for (i = 0; i < req->r_num_ops; i++) { 3784 dout(" req %p tid %llu op %d rval %d len %u\n", req, 3785 req->r_tid, i, m.rval[i], m.outdata_len[i]); 3786 req->r_ops[i].rval = m.rval[i]; 3787 req->r_ops[i].outdata_len = m.outdata_len[i]; 3788 data_len += m.outdata_len[i]; 3789 } 3790 if (data_len != le32_to_cpu(msg->hdr.data_len)) { 3791 pr_err("sum of lens %u != %u for tid %llu\n", data_len, 3792 le32_to_cpu(msg->hdr.data_len), req->r_tid); 3793 goto fail_request; 3794 } 3795 dout("%s req %p tid %llu result %d data_len %u\n", __func__, 3796 req, req->r_tid, m.result, data_len); 3797 3798 /* 3799 * Since we only ever request ONDISK, we should only ever get 3800 * one (type of) reply back. 3801 */ 3802 WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); 3803 req->r_result = m.result ?: data_len; 3804 finish_request(req); 3805 mutex_unlock(&osd->lock); 3806 up_read(&osdc->lock); 3807 3808 __complete_request(req); 3809 return; 3810 3811 fail_request: 3812 complete_request(req, -EIO); 3813 out_unlock_session: 3814 mutex_unlock(&osd->lock); 3815 out_unlock_osdc: 3816 up_read(&osdc->lock); 3817 } 3818 3819 static void set_pool_was_full(struct ceph_osd_client *osdc) 3820 { 3821 struct rb_node *n; 3822 3823 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 3824 struct ceph_pg_pool_info *pi = 3825 rb_entry(n, struct ceph_pg_pool_info, node); 3826 3827 pi->was_full = __pool_full(pi); 3828 } 3829 } 3830 3831 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) 3832 { 3833 struct ceph_pg_pool_info *pi; 3834 3835 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 3836 if (!pi) 3837 return false; 3838 3839 return pi->was_full && !__pool_full(pi); 3840 } 3841 3842 static enum calc_target_result 3843 recalc_linger_target(struct ceph_osd_linger_request *lreq) 3844 { 3845 struct ceph_osd_client *osdc = lreq->osdc; 3846 enum calc_target_result ct_res; 3847 3848 ct_res = calc_target(osdc, &lreq->t, true); 3849 if (ct_res == CALC_TARGET_NEED_RESEND) { 3850 struct ceph_osd *osd; 3851 3852 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3853 if (osd != lreq->osd) { 3854 unlink_linger(lreq->osd, lreq); 3855 link_linger(osd, lreq); 3856 } 3857 } 3858 3859 return ct_res; 3860 } 3861 3862 /* 3863 * Requeue requests whose mapping to an OSD has changed. 3864 */ 3865 static void scan_requests(struct ceph_osd *osd, 3866 bool force_resend, 3867 bool cleared_full, 3868 bool check_pool_cleared_full, 3869 struct rb_root *need_resend, 3870 struct list_head *need_resend_linger) 3871 { 3872 struct ceph_osd_client *osdc = osd->o_osdc; 3873 struct rb_node *n; 3874 bool force_resend_writes; 3875 3876 for (n = rb_first(&osd->o_linger_requests); n; ) { 3877 struct ceph_osd_linger_request *lreq = 3878 rb_entry(n, struct ceph_osd_linger_request, node); 3879 enum calc_target_result ct_res; 3880 3881 n = rb_next(n); /* recalc_linger_target() */ 3882 3883 dout("%s lreq %p linger_id %llu\n", __func__, lreq, 3884 lreq->linger_id); 3885 ct_res = recalc_linger_target(lreq); 3886 switch (ct_res) { 3887 case CALC_TARGET_NO_ACTION: 3888 force_resend_writes = cleared_full || 3889 (check_pool_cleared_full && 3890 pool_cleared_full(osdc, lreq->t.base_oloc.pool)); 3891 if (!force_resend && !force_resend_writes) 3892 break; 3893 3894 fallthrough; 3895 case CALC_TARGET_NEED_RESEND: 3896 cancel_linger_map_check(lreq); 3897 /* 3898 * scan_requests() for the previous epoch(s) 3899 * may have already added it to the list, since 3900 * it's not unlinked here. 3901 */ 3902 if (list_empty(&lreq->scan_item)) 3903 list_add_tail(&lreq->scan_item, need_resend_linger); 3904 break; 3905 case CALC_TARGET_POOL_DNE: 3906 list_del_init(&lreq->scan_item); 3907 check_linger_pool_dne(lreq); 3908 break; 3909 } 3910 } 3911 3912 for (n = rb_first(&osd->o_requests); n; ) { 3913 struct ceph_osd_request *req = 3914 rb_entry(n, struct ceph_osd_request, r_node); 3915 enum calc_target_result ct_res; 3916 3917 n = rb_next(n); /* unlink_request(), check_pool_dne() */ 3918 3919 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 3920 ct_res = calc_target(osdc, &req->r_t, false); 3921 switch (ct_res) { 3922 case CALC_TARGET_NO_ACTION: 3923 force_resend_writes = cleared_full || 3924 (check_pool_cleared_full && 3925 pool_cleared_full(osdc, req->r_t.base_oloc.pool)); 3926 if (!force_resend && 3927 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) || 3928 !force_resend_writes)) 3929 break; 3930 3931 fallthrough; 3932 case CALC_TARGET_NEED_RESEND: 3933 cancel_map_check(req); 3934 unlink_request(osd, req); 3935 insert_request(need_resend, req); 3936 break; 3937 case CALC_TARGET_POOL_DNE: 3938 check_pool_dne(req); 3939 break; 3940 } 3941 } 3942 } 3943 3944 static int handle_one_map(struct ceph_osd_client *osdc, 3945 void *p, void *end, bool incremental, 3946 struct rb_root *need_resend, 3947 struct list_head *need_resend_linger) 3948 { 3949 struct ceph_osdmap *newmap; 3950 struct rb_node *n; 3951 bool skipped_map = false; 3952 bool was_full; 3953 3954 was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3955 set_pool_was_full(osdc); 3956 3957 if (incremental) 3958 newmap = osdmap_apply_incremental(&p, end, 3959 ceph_msgr2(osdc->client), 3960 osdc->osdmap); 3961 else 3962 newmap = ceph_osdmap_decode(&p, end, ceph_msgr2(osdc->client)); 3963 if (IS_ERR(newmap)) 3964 return PTR_ERR(newmap); 3965 3966 if (newmap != osdc->osdmap) { 3967 /* 3968 * Preserve ->was_full before destroying the old map. 3969 * For pools that weren't in the old map, ->was_full 3970 * should be false. 3971 */ 3972 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) { 3973 struct ceph_pg_pool_info *pi = 3974 rb_entry(n, struct ceph_pg_pool_info, node); 3975 struct ceph_pg_pool_info *old_pi; 3976 3977 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id); 3978 if (old_pi) 3979 pi->was_full = old_pi->was_full; 3980 else 3981 WARN_ON(pi->was_full); 3982 } 3983 3984 if (osdc->osdmap->epoch && 3985 osdc->osdmap->epoch + 1 < newmap->epoch) { 3986 WARN_ON(incremental); 3987 skipped_map = true; 3988 } 3989 3990 ceph_osdmap_destroy(osdc->osdmap); 3991 osdc->osdmap = newmap; 3992 } 3993 3994 was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3995 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true, 3996 need_resend, need_resend_linger); 3997 3998 for (n = rb_first(&osdc->osds); n; ) { 3999 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 4000 4001 n = rb_next(n); /* close_osd() */ 4002 4003 scan_requests(osd, skipped_map, was_full, true, need_resend, 4004 need_resend_linger); 4005 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 4006 memcmp(&osd->o_con.peer_addr, 4007 ceph_osd_addr(osdc->osdmap, osd->o_osd), 4008 sizeof(struct ceph_entity_addr))) 4009 close_osd(osd); 4010 } 4011 4012 return 0; 4013 } 4014 4015 static void kick_requests(struct ceph_osd_client *osdc, 4016 struct rb_root *need_resend, 4017 struct list_head *need_resend_linger) 4018 { 4019 struct ceph_osd_linger_request *lreq, *nlreq; 4020 enum calc_target_result ct_res; 4021 struct rb_node *n; 4022 4023 /* make sure need_resend targets reflect latest map */ 4024 for (n = rb_first(need_resend); n; ) { 4025 struct ceph_osd_request *req = 4026 rb_entry(n, struct ceph_osd_request, r_node); 4027 4028 n = rb_next(n); 4029 4030 if (req->r_t.epoch < osdc->osdmap->epoch) { 4031 ct_res = calc_target(osdc, &req->r_t, false); 4032 if (ct_res == CALC_TARGET_POOL_DNE) { 4033 erase_request(need_resend, req); 4034 check_pool_dne(req); 4035 } 4036 } 4037 } 4038 4039 for (n = rb_first(need_resend); n; ) { 4040 struct ceph_osd_request *req = 4041 rb_entry(n, struct ceph_osd_request, r_node); 4042 struct ceph_osd *osd; 4043 4044 n = rb_next(n); 4045 erase_request(need_resend, req); /* before link_request() */ 4046 4047 osd = lookup_create_osd(osdc, req->r_t.osd, true); 4048 link_request(osd, req); 4049 if (!req->r_linger) { 4050 if (!osd_homeless(osd) && !req->r_t.paused) 4051 send_request(req); 4052 } else { 4053 cancel_linger_request(req); 4054 } 4055 } 4056 4057 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { 4058 if (!osd_homeless(lreq->osd)) 4059 send_linger(lreq); 4060 4061 list_del_init(&lreq->scan_item); 4062 } 4063 } 4064 4065 /* 4066 * Process updated osd map. 4067 * 4068 * The message contains any number of incremental and full maps, normally 4069 * indicating some sort of topology change in the cluster. Kick requests 4070 * off to different OSDs as needed. 4071 */ 4072 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 4073 { 4074 void *p = msg->front.iov_base; 4075 void *const end = p + msg->front.iov_len; 4076 u32 nr_maps, maplen; 4077 u32 epoch; 4078 struct ceph_fsid fsid; 4079 struct rb_root need_resend = RB_ROOT; 4080 LIST_HEAD(need_resend_linger); 4081 bool handled_incremental = false; 4082 bool was_pauserd, was_pausewr; 4083 bool pauserd, pausewr; 4084 int err; 4085 4086 dout("%s have %u\n", __func__, osdc->osdmap->epoch); 4087 down_write(&osdc->lock); 4088 4089 /* verify fsid */ 4090 ceph_decode_need(&p, end, sizeof(fsid), bad); 4091 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 4092 if (ceph_check_fsid(osdc->client, &fsid) < 0) 4093 goto bad; 4094 4095 was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 4096 was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 4097 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 4098 have_pool_full(osdc); 4099 4100 /* incremental maps */ 4101 ceph_decode_32_safe(&p, end, nr_maps, bad); 4102 dout(" %d inc maps\n", nr_maps); 4103 while (nr_maps > 0) { 4104 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 4105 epoch = ceph_decode_32(&p); 4106 maplen = ceph_decode_32(&p); 4107 ceph_decode_need(&p, end, maplen, bad); 4108 if (osdc->osdmap->epoch && 4109 osdc->osdmap->epoch + 1 == epoch) { 4110 dout("applying incremental map %u len %d\n", 4111 epoch, maplen); 4112 err = handle_one_map(osdc, p, p + maplen, true, 4113 &need_resend, &need_resend_linger); 4114 if (err) 4115 goto bad; 4116 handled_incremental = true; 4117 } else { 4118 dout("ignoring incremental map %u len %d\n", 4119 epoch, maplen); 4120 } 4121 p += maplen; 4122 nr_maps--; 4123 } 4124 if (handled_incremental) 4125 goto done; 4126 4127 /* full maps */ 4128 ceph_decode_32_safe(&p, end, nr_maps, bad); 4129 dout(" %d full maps\n", nr_maps); 4130 while (nr_maps) { 4131 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 4132 epoch = ceph_decode_32(&p); 4133 maplen = ceph_decode_32(&p); 4134 ceph_decode_need(&p, end, maplen, bad); 4135 if (nr_maps > 1) { 4136 dout("skipping non-latest full map %u len %d\n", 4137 epoch, maplen); 4138 } else if (osdc->osdmap->epoch >= epoch) { 4139 dout("skipping full map %u len %d, " 4140 "older than our %u\n", epoch, maplen, 4141 osdc->osdmap->epoch); 4142 } else { 4143 dout("taking full map %u len %d\n", epoch, maplen); 4144 err = handle_one_map(osdc, p, p + maplen, false, 4145 &need_resend, &need_resend_linger); 4146 if (err) 4147 goto bad; 4148 } 4149 p += maplen; 4150 nr_maps--; 4151 } 4152 4153 done: 4154 /* 4155 * subscribe to subsequent osdmap updates if full to ensure 4156 * we find out when we are no longer full and stop returning 4157 * ENOSPC. 4158 */ 4159 pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 4160 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 4161 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 4162 have_pool_full(osdc); 4163 if (was_pauserd || was_pausewr || pauserd || pausewr || 4164 osdc->osdmap->epoch < osdc->epoch_barrier) 4165 maybe_request_map(osdc); 4166 4167 kick_requests(osdc, &need_resend, &need_resend_linger); 4168 4169 ceph_osdc_abort_on_full(osdc); 4170 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 4171 osdc->osdmap->epoch); 4172 up_write(&osdc->lock); 4173 wake_up_all(&osdc->client->auth_wq); 4174 return; 4175 4176 bad: 4177 pr_err("osdc handle_map corrupt msg\n"); 4178 ceph_msg_dump(msg); 4179 up_write(&osdc->lock); 4180 } 4181 4182 /* 4183 * Resubmit requests pending on the given osd. 4184 */ 4185 static void kick_osd_requests(struct ceph_osd *osd) 4186 { 4187 struct rb_node *n; 4188 4189 clear_backoffs(osd); 4190 4191 for (n = rb_first(&osd->o_requests); n; ) { 4192 struct ceph_osd_request *req = 4193 rb_entry(n, struct ceph_osd_request, r_node); 4194 4195 n = rb_next(n); /* cancel_linger_request() */ 4196 4197 if (!req->r_linger) { 4198 if (!req->r_t.paused) 4199 send_request(req); 4200 } else { 4201 cancel_linger_request(req); 4202 } 4203 } 4204 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { 4205 struct ceph_osd_linger_request *lreq = 4206 rb_entry(n, struct ceph_osd_linger_request, node); 4207 4208 send_linger(lreq); 4209 } 4210 } 4211 4212 /* 4213 * If the osd connection drops, we need to resubmit all requests. 4214 */ 4215 static void osd_fault(struct ceph_connection *con) 4216 { 4217 struct ceph_osd *osd = con->private; 4218 struct ceph_osd_client *osdc = osd->o_osdc; 4219 4220 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 4221 4222 down_write(&osdc->lock); 4223 if (!osd_registered(osd)) { 4224 dout("%s osd%d unknown\n", __func__, osd->o_osd); 4225 goto out_unlock; 4226 } 4227 4228 if (!reopen_osd(osd)) 4229 kick_osd_requests(osd); 4230 maybe_request_map(osdc); 4231 4232 out_unlock: 4233 up_write(&osdc->lock); 4234 } 4235 4236 struct MOSDBackoff { 4237 struct ceph_spg spgid; 4238 u32 map_epoch; 4239 u8 op; 4240 u64 id; 4241 struct ceph_hobject_id *begin; 4242 struct ceph_hobject_id *end; 4243 }; 4244 4245 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m) 4246 { 4247 void *p = msg->front.iov_base; 4248 void *const end = p + msg->front.iov_len; 4249 u8 struct_v; 4250 u32 struct_len; 4251 int ret; 4252 4253 ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len); 4254 if (ret) 4255 return ret; 4256 4257 ret = ceph_decode_pgid(&p, end, &m->spgid.pgid); 4258 if (ret) 4259 return ret; 4260 4261 ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval); 4262 ceph_decode_32_safe(&p, end, m->map_epoch, e_inval); 4263 ceph_decode_8_safe(&p, end, m->op, e_inval); 4264 ceph_decode_64_safe(&p, end, m->id, e_inval); 4265 4266 m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO); 4267 if (!m->begin) 4268 return -ENOMEM; 4269 4270 ret = decode_hoid(&p, end, m->begin); 4271 if (ret) { 4272 free_hoid(m->begin); 4273 return ret; 4274 } 4275 4276 m->end = kzalloc(sizeof(*m->end), GFP_NOIO); 4277 if (!m->end) { 4278 free_hoid(m->begin); 4279 return -ENOMEM; 4280 } 4281 4282 ret = decode_hoid(&p, end, m->end); 4283 if (ret) { 4284 free_hoid(m->begin); 4285 free_hoid(m->end); 4286 return ret; 4287 } 4288 4289 return 0; 4290 4291 e_inval: 4292 return -EINVAL; 4293 } 4294 4295 static struct ceph_msg *create_backoff_message( 4296 const struct ceph_osd_backoff *backoff, 4297 u32 map_epoch) 4298 { 4299 struct ceph_msg *msg; 4300 void *p, *end; 4301 int msg_size; 4302 4303 msg_size = CEPH_ENCODING_START_BLK_LEN + 4304 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 4305 msg_size += 4 + 1 + 8; /* map_epoch, op, id */ 4306 msg_size += CEPH_ENCODING_START_BLK_LEN + 4307 hoid_encoding_size(backoff->begin); 4308 msg_size += CEPH_ENCODING_START_BLK_LEN + 4309 hoid_encoding_size(backoff->end); 4310 4311 msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true); 4312 if (!msg) 4313 return NULL; 4314 4315 p = msg->front.iov_base; 4316 end = p + msg->front_alloc_len; 4317 4318 encode_spgid(&p, &backoff->spgid); 4319 ceph_encode_32(&p, map_epoch); 4320 ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK); 4321 ceph_encode_64(&p, backoff->id); 4322 encode_hoid(&p, end, backoff->begin); 4323 encode_hoid(&p, end, backoff->end); 4324 BUG_ON(p != end); 4325 4326 msg->front.iov_len = p - msg->front.iov_base; 4327 msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */ 4328 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 4329 4330 return msg; 4331 } 4332 4333 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m) 4334 { 4335 struct ceph_spg_mapping *spg; 4336 struct ceph_osd_backoff *backoff; 4337 struct ceph_msg *msg; 4338 4339 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4340 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4341 4342 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid); 4343 if (!spg) { 4344 spg = alloc_spg_mapping(); 4345 if (!spg) { 4346 pr_err("%s failed to allocate spg\n", __func__); 4347 return; 4348 } 4349 spg->spgid = m->spgid; /* struct */ 4350 insert_spg_mapping(&osd->o_backoff_mappings, spg); 4351 } 4352 4353 backoff = alloc_backoff(); 4354 if (!backoff) { 4355 pr_err("%s failed to allocate backoff\n", __func__); 4356 return; 4357 } 4358 backoff->spgid = m->spgid; /* struct */ 4359 backoff->id = m->id; 4360 backoff->begin = m->begin; 4361 m->begin = NULL; /* backoff now owns this */ 4362 backoff->end = m->end; 4363 m->end = NULL; /* ditto */ 4364 4365 insert_backoff(&spg->backoffs, backoff); 4366 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4367 4368 /* 4369 * Ack with original backoff's epoch so that the OSD can 4370 * discard this if there was a PG split. 4371 */ 4372 msg = create_backoff_message(backoff, m->map_epoch); 4373 if (!msg) { 4374 pr_err("%s failed to allocate msg\n", __func__); 4375 return; 4376 } 4377 ceph_con_send(&osd->o_con, msg); 4378 } 4379 4380 static bool target_contained_by(const struct ceph_osd_request_target *t, 4381 const struct ceph_hobject_id *begin, 4382 const struct ceph_hobject_id *end) 4383 { 4384 struct ceph_hobject_id hoid; 4385 int cmp; 4386 4387 hoid_fill_from_target(&hoid, t); 4388 cmp = hoid_compare(&hoid, begin); 4389 return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0); 4390 } 4391 4392 static void handle_backoff_unblock(struct ceph_osd *osd, 4393 const struct MOSDBackoff *m) 4394 { 4395 struct ceph_spg_mapping *spg; 4396 struct ceph_osd_backoff *backoff; 4397 struct rb_node *n; 4398 4399 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4400 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4401 4402 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id); 4403 if (!backoff) { 4404 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n", 4405 __func__, osd->o_osd, m->spgid.pgid.pool, 4406 m->spgid.pgid.seed, m->spgid.shard, m->id); 4407 return; 4408 } 4409 4410 if (hoid_compare(backoff->begin, m->begin) && 4411 hoid_compare(backoff->end, m->end)) { 4412 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n", 4413 __func__, osd->o_osd, m->spgid.pgid.pool, 4414 m->spgid.pgid.seed, m->spgid.shard, m->id); 4415 /* unblock it anyway... */ 4416 } 4417 4418 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid); 4419 BUG_ON(!spg); 4420 4421 erase_backoff(&spg->backoffs, backoff); 4422 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4423 free_backoff(backoff); 4424 4425 if (RB_EMPTY_ROOT(&spg->backoffs)) { 4426 erase_spg_mapping(&osd->o_backoff_mappings, spg); 4427 free_spg_mapping(spg); 4428 } 4429 4430 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 4431 struct ceph_osd_request *req = 4432 rb_entry(n, struct ceph_osd_request, r_node); 4433 4434 if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) { 4435 /* 4436 * Match against @m, not @backoff -- the PG may 4437 * have split on the OSD. 4438 */ 4439 if (target_contained_by(&req->r_t, m->begin, m->end)) { 4440 /* 4441 * If no other installed backoff applies, 4442 * resend. 4443 */ 4444 send_request(req); 4445 } 4446 } 4447 } 4448 } 4449 4450 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg) 4451 { 4452 struct ceph_osd_client *osdc = osd->o_osdc; 4453 struct MOSDBackoff m; 4454 int ret; 4455 4456 down_read(&osdc->lock); 4457 if (!osd_registered(osd)) { 4458 dout("%s osd%d unknown\n", __func__, osd->o_osd); 4459 up_read(&osdc->lock); 4460 return; 4461 } 4462 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 4463 4464 mutex_lock(&osd->lock); 4465 ret = decode_MOSDBackoff(msg, &m); 4466 if (ret) { 4467 pr_err("failed to decode MOSDBackoff: %d\n", ret); 4468 ceph_msg_dump(msg); 4469 goto out_unlock; 4470 } 4471 4472 switch (m.op) { 4473 case CEPH_OSD_BACKOFF_OP_BLOCK: 4474 handle_backoff_block(osd, &m); 4475 break; 4476 case CEPH_OSD_BACKOFF_OP_UNBLOCK: 4477 handle_backoff_unblock(osd, &m); 4478 break; 4479 default: 4480 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op); 4481 } 4482 4483 free_hoid(m.begin); 4484 free_hoid(m.end); 4485 4486 out_unlock: 4487 mutex_unlock(&osd->lock); 4488 up_read(&osdc->lock); 4489 } 4490 4491 /* 4492 * Process osd watch notifications 4493 */ 4494 static void handle_watch_notify(struct ceph_osd_client *osdc, 4495 struct ceph_msg *msg) 4496 { 4497 void *p = msg->front.iov_base; 4498 void *const end = p + msg->front.iov_len; 4499 struct ceph_osd_linger_request *lreq; 4500 struct linger_work *lwork; 4501 u8 proto_ver, opcode; 4502 u64 cookie, notify_id; 4503 u64 notifier_id = 0; 4504 s32 return_code = 0; 4505 void *payload = NULL; 4506 u32 payload_len = 0; 4507 4508 ceph_decode_8_safe(&p, end, proto_ver, bad); 4509 ceph_decode_8_safe(&p, end, opcode, bad); 4510 ceph_decode_64_safe(&p, end, cookie, bad); 4511 p += 8; /* skip ver */ 4512 ceph_decode_64_safe(&p, end, notify_id, bad); 4513 4514 if (proto_ver >= 1) { 4515 ceph_decode_32_safe(&p, end, payload_len, bad); 4516 ceph_decode_need(&p, end, payload_len, bad); 4517 payload = p; 4518 p += payload_len; 4519 } 4520 4521 if (le16_to_cpu(msg->hdr.version) >= 2) 4522 ceph_decode_32_safe(&p, end, return_code, bad); 4523 4524 if (le16_to_cpu(msg->hdr.version) >= 3) 4525 ceph_decode_64_safe(&p, end, notifier_id, bad); 4526 4527 down_read(&osdc->lock); 4528 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); 4529 if (!lreq) { 4530 dout("%s opcode %d cookie %llu dne\n", __func__, opcode, 4531 cookie); 4532 goto out_unlock_osdc; 4533 } 4534 4535 mutex_lock(&lreq->lock); 4536 dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__, 4537 opcode, cookie, lreq, lreq->is_watch); 4538 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { 4539 if (!lreq->last_error) { 4540 lreq->last_error = -ENOTCONN; 4541 queue_watch_error(lreq); 4542 } 4543 } else if (!lreq->is_watch) { 4544 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */ 4545 if (lreq->notify_id && lreq->notify_id != notify_id) { 4546 dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq, 4547 lreq->notify_id, notify_id); 4548 } else if (!completion_done(&lreq->notify_finish_wait)) { 4549 struct ceph_msg_data *data = 4550 msg->num_data_items ? &msg->data[0] : NULL; 4551 4552 if (data) { 4553 if (lreq->preply_pages) { 4554 WARN_ON(data->type != 4555 CEPH_MSG_DATA_PAGES); 4556 *lreq->preply_pages = data->pages; 4557 *lreq->preply_len = data->length; 4558 data->own_pages = false; 4559 } 4560 } 4561 lreq->notify_finish_error = return_code; 4562 complete_all(&lreq->notify_finish_wait); 4563 } 4564 } else { 4565 /* CEPH_WATCH_EVENT_NOTIFY */ 4566 lwork = lwork_alloc(lreq, do_watch_notify); 4567 if (!lwork) { 4568 pr_err("failed to allocate notify-lwork\n"); 4569 goto out_unlock_lreq; 4570 } 4571 4572 lwork->notify.notify_id = notify_id; 4573 lwork->notify.notifier_id = notifier_id; 4574 lwork->notify.payload = payload; 4575 lwork->notify.payload_len = payload_len; 4576 lwork->notify.msg = ceph_msg_get(msg); 4577 lwork_queue(lwork); 4578 } 4579 4580 out_unlock_lreq: 4581 mutex_unlock(&lreq->lock); 4582 out_unlock_osdc: 4583 up_read(&osdc->lock); 4584 return; 4585 4586 bad: 4587 pr_err("osdc handle_watch_notify corrupt msg\n"); 4588 } 4589 4590 /* 4591 * Register request, send initial attempt. 4592 */ 4593 void ceph_osdc_start_request(struct ceph_osd_client *osdc, 4594 struct ceph_osd_request *req) 4595 { 4596 down_read(&osdc->lock); 4597 submit_request(req, false); 4598 up_read(&osdc->lock); 4599 } 4600 EXPORT_SYMBOL(ceph_osdc_start_request); 4601 4602 /* 4603 * Unregister request. If @req was registered, it isn't completed: 4604 * r_result isn't set and __complete_request() isn't invoked. 4605 * 4606 * If @req wasn't registered, this call may have raced with 4607 * handle_reply(), in which case r_result would already be set and 4608 * __complete_request() would be getting invoked, possibly even 4609 * concurrently with this call. 4610 */ 4611 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 4612 { 4613 struct ceph_osd_client *osdc = req->r_osdc; 4614 4615 down_write(&osdc->lock); 4616 if (req->r_osd) 4617 cancel_request(req); 4618 up_write(&osdc->lock); 4619 } 4620 EXPORT_SYMBOL(ceph_osdc_cancel_request); 4621 4622 /* 4623 * @timeout: in jiffies, 0 means "wait forever" 4624 */ 4625 static int wait_request_timeout(struct ceph_osd_request *req, 4626 unsigned long timeout) 4627 { 4628 long left; 4629 4630 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 4631 left = wait_for_completion_killable_timeout(&req->r_completion, 4632 ceph_timeout_jiffies(timeout)); 4633 if (left <= 0) { 4634 left = left ?: -ETIMEDOUT; 4635 ceph_osdc_cancel_request(req); 4636 } else { 4637 left = req->r_result; /* completed */ 4638 } 4639 4640 return left; 4641 } 4642 4643 /* 4644 * wait for a request to complete 4645 */ 4646 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 4647 struct ceph_osd_request *req) 4648 { 4649 return wait_request_timeout(req, 0); 4650 } 4651 EXPORT_SYMBOL(ceph_osdc_wait_request); 4652 4653 /* 4654 * sync - wait for all in-flight requests to flush. avoid starvation. 4655 */ 4656 void ceph_osdc_sync(struct ceph_osd_client *osdc) 4657 { 4658 struct rb_node *n, *p; 4659 u64 last_tid = atomic64_read(&osdc->last_tid); 4660 4661 again: 4662 down_read(&osdc->lock); 4663 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 4664 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 4665 4666 mutex_lock(&osd->lock); 4667 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) { 4668 struct ceph_osd_request *req = 4669 rb_entry(p, struct ceph_osd_request, r_node); 4670 4671 if (req->r_tid > last_tid) 4672 break; 4673 4674 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE)) 4675 continue; 4676 4677 ceph_osdc_get_request(req); 4678 mutex_unlock(&osd->lock); 4679 up_read(&osdc->lock); 4680 dout("%s waiting on req %p tid %llu last_tid %llu\n", 4681 __func__, req, req->r_tid, last_tid); 4682 wait_for_completion(&req->r_completion); 4683 ceph_osdc_put_request(req); 4684 goto again; 4685 } 4686 4687 mutex_unlock(&osd->lock); 4688 } 4689 4690 up_read(&osdc->lock); 4691 dout("%s done last_tid %llu\n", __func__, last_tid); 4692 } 4693 EXPORT_SYMBOL(ceph_osdc_sync); 4694 4695 /* 4696 * Returns a handle, caller owns a ref. 4697 */ 4698 struct ceph_osd_linger_request * 4699 ceph_osdc_watch(struct ceph_osd_client *osdc, 4700 struct ceph_object_id *oid, 4701 struct ceph_object_locator *oloc, 4702 rados_watchcb2_t wcb, 4703 rados_watcherrcb_t errcb, 4704 void *data) 4705 { 4706 struct ceph_osd_linger_request *lreq; 4707 int ret; 4708 4709 lreq = linger_alloc(osdc); 4710 if (!lreq) 4711 return ERR_PTR(-ENOMEM); 4712 4713 lreq->is_watch = true; 4714 lreq->wcb = wcb; 4715 lreq->errcb = errcb; 4716 lreq->data = data; 4717 lreq->watch_valid_thru = jiffies; 4718 4719 ceph_oid_copy(&lreq->t.base_oid, oid); 4720 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4721 lreq->t.flags = CEPH_OSD_FLAG_WRITE; 4722 ktime_get_real_ts64(&lreq->mtime); 4723 4724 linger_submit(lreq); 4725 ret = linger_reg_commit_wait(lreq); 4726 if (ret) { 4727 linger_cancel(lreq); 4728 goto err_put_lreq; 4729 } 4730 4731 return lreq; 4732 4733 err_put_lreq: 4734 linger_put(lreq); 4735 return ERR_PTR(ret); 4736 } 4737 EXPORT_SYMBOL(ceph_osdc_watch); 4738 4739 /* 4740 * Releases a ref. 4741 * 4742 * Times out after mount_timeout to preserve rbd unmap behaviour 4743 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap 4744 * with mount_timeout"). 4745 */ 4746 int ceph_osdc_unwatch(struct ceph_osd_client *osdc, 4747 struct ceph_osd_linger_request *lreq) 4748 { 4749 struct ceph_options *opts = osdc->client->options; 4750 struct ceph_osd_request *req; 4751 int ret; 4752 4753 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4754 if (!req) 4755 return -ENOMEM; 4756 4757 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4758 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4759 req->r_flags = CEPH_OSD_FLAG_WRITE; 4760 ktime_get_real_ts64(&req->r_mtime); 4761 osd_req_op_watch_init(req, 0, CEPH_OSD_WATCH_OP_UNWATCH, 4762 lreq->linger_id, 0); 4763 4764 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4765 if (ret) 4766 goto out_put_req; 4767 4768 ceph_osdc_start_request(osdc, req); 4769 linger_cancel(lreq); 4770 linger_put(lreq); 4771 ret = wait_request_timeout(req, opts->mount_timeout); 4772 4773 out_put_req: 4774 ceph_osdc_put_request(req); 4775 return ret; 4776 } 4777 EXPORT_SYMBOL(ceph_osdc_unwatch); 4778 4779 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, 4780 u64 notify_id, u64 cookie, void *payload, 4781 u32 payload_len) 4782 { 4783 struct ceph_osd_req_op *op; 4784 struct ceph_pagelist *pl; 4785 int ret; 4786 4787 op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); 4788 4789 pl = ceph_pagelist_alloc(GFP_NOIO); 4790 if (!pl) 4791 return -ENOMEM; 4792 4793 ret = ceph_pagelist_encode_64(pl, notify_id); 4794 ret |= ceph_pagelist_encode_64(pl, cookie); 4795 if (payload) { 4796 ret |= ceph_pagelist_encode_32(pl, payload_len); 4797 ret |= ceph_pagelist_append(pl, payload, payload_len); 4798 } else { 4799 ret |= ceph_pagelist_encode_32(pl, 0); 4800 } 4801 if (ret) { 4802 ceph_pagelist_release(pl); 4803 return -ENOMEM; 4804 } 4805 4806 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); 4807 op->indata_len = pl->length; 4808 return 0; 4809 } 4810 4811 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, 4812 struct ceph_object_id *oid, 4813 struct ceph_object_locator *oloc, 4814 u64 notify_id, 4815 u64 cookie, 4816 void *payload, 4817 u32 payload_len) 4818 { 4819 struct ceph_osd_request *req; 4820 int ret; 4821 4822 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4823 if (!req) 4824 return -ENOMEM; 4825 4826 ceph_oid_copy(&req->r_base_oid, oid); 4827 ceph_oloc_copy(&req->r_base_oloc, oloc); 4828 req->r_flags = CEPH_OSD_FLAG_READ; 4829 4830 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, 4831 payload_len); 4832 if (ret) 4833 goto out_put_req; 4834 4835 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4836 if (ret) 4837 goto out_put_req; 4838 4839 ceph_osdc_start_request(osdc, req); 4840 ret = ceph_osdc_wait_request(osdc, req); 4841 4842 out_put_req: 4843 ceph_osdc_put_request(req); 4844 return ret; 4845 } 4846 EXPORT_SYMBOL(ceph_osdc_notify_ack); 4847 4848 /* 4849 * @timeout: in seconds 4850 * 4851 * @preply_{pages,len} are initialized both on success and error. 4852 * The caller is responsible for: 4853 * 4854 * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)) 4855 */ 4856 int ceph_osdc_notify(struct ceph_osd_client *osdc, 4857 struct ceph_object_id *oid, 4858 struct ceph_object_locator *oloc, 4859 void *payload, 4860 u32 payload_len, 4861 u32 timeout, 4862 struct page ***preply_pages, 4863 size_t *preply_len) 4864 { 4865 struct ceph_osd_linger_request *lreq; 4866 int ret; 4867 4868 WARN_ON(!timeout); 4869 if (preply_pages) { 4870 *preply_pages = NULL; 4871 *preply_len = 0; 4872 } 4873 4874 lreq = linger_alloc(osdc); 4875 if (!lreq) 4876 return -ENOMEM; 4877 4878 lreq->request_pl = ceph_pagelist_alloc(GFP_NOIO); 4879 if (!lreq->request_pl) { 4880 ret = -ENOMEM; 4881 goto out_put_lreq; 4882 } 4883 4884 ret = ceph_pagelist_encode_32(lreq->request_pl, 1); /* prot_ver */ 4885 ret |= ceph_pagelist_encode_32(lreq->request_pl, timeout); 4886 ret |= ceph_pagelist_encode_32(lreq->request_pl, payload_len); 4887 ret |= ceph_pagelist_append(lreq->request_pl, payload, payload_len); 4888 if (ret) { 4889 ret = -ENOMEM; 4890 goto out_put_lreq; 4891 } 4892 4893 /* for notify_id */ 4894 lreq->notify_id_pages = ceph_alloc_page_vector(1, GFP_NOIO); 4895 if (IS_ERR(lreq->notify_id_pages)) { 4896 ret = PTR_ERR(lreq->notify_id_pages); 4897 lreq->notify_id_pages = NULL; 4898 goto out_put_lreq; 4899 } 4900 4901 lreq->preply_pages = preply_pages; 4902 lreq->preply_len = preply_len; 4903 4904 ceph_oid_copy(&lreq->t.base_oid, oid); 4905 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4906 lreq->t.flags = CEPH_OSD_FLAG_READ; 4907 4908 linger_submit(lreq); 4909 ret = linger_reg_commit_wait(lreq); 4910 if (!ret) 4911 ret = linger_notify_finish_wait(lreq, 4912 msecs_to_jiffies(2 * timeout * MSEC_PER_SEC)); 4913 else 4914 dout("lreq %p failed to initiate notify %d\n", lreq, ret); 4915 4916 linger_cancel(lreq); 4917 out_put_lreq: 4918 linger_put(lreq); 4919 return ret; 4920 } 4921 EXPORT_SYMBOL(ceph_osdc_notify); 4922 4923 /* 4924 * Return the number of milliseconds since the watch was last 4925 * confirmed, or an error. If there is an error, the watch is no 4926 * longer valid, and should be destroyed with ceph_osdc_unwatch(). 4927 */ 4928 int ceph_osdc_watch_check(struct ceph_osd_client *osdc, 4929 struct ceph_osd_linger_request *lreq) 4930 { 4931 unsigned long stamp, age; 4932 int ret; 4933 4934 down_read(&osdc->lock); 4935 mutex_lock(&lreq->lock); 4936 stamp = lreq->watch_valid_thru; 4937 if (!list_empty(&lreq->pending_lworks)) { 4938 struct linger_work *lwork = 4939 list_first_entry(&lreq->pending_lworks, 4940 struct linger_work, 4941 pending_item); 4942 4943 if (time_before(lwork->queued_stamp, stamp)) 4944 stamp = lwork->queued_stamp; 4945 } 4946 age = jiffies - stamp; 4947 dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__, 4948 lreq, lreq->linger_id, age, lreq->last_error); 4949 /* we are truncating to msecs, so return a safe upper bound */ 4950 ret = lreq->last_error ?: 1 + jiffies_to_msecs(age); 4951 4952 mutex_unlock(&lreq->lock); 4953 up_read(&osdc->lock); 4954 return ret; 4955 } 4956 4957 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) 4958 { 4959 u8 struct_v; 4960 u32 struct_len; 4961 int ret; 4962 4963 ret = ceph_start_decoding(p, end, 2, "watch_item_t", 4964 &struct_v, &struct_len); 4965 if (ret) 4966 goto bad; 4967 4968 ret = -EINVAL; 4969 ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad); 4970 ceph_decode_64_safe(p, end, item->cookie, bad); 4971 ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */ 4972 4973 if (struct_v >= 2) { 4974 ret = ceph_decode_entity_addr(p, end, &item->addr); 4975 if (ret) 4976 goto bad; 4977 } else { 4978 ret = 0; 4979 } 4980 4981 dout("%s %s%llu cookie %llu addr %s\n", __func__, 4982 ENTITY_NAME(item->name), item->cookie, 4983 ceph_pr_addr(&item->addr)); 4984 bad: 4985 return ret; 4986 } 4987 4988 static int decode_watchers(void **p, void *end, 4989 struct ceph_watch_item **watchers, 4990 u32 *num_watchers) 4991 { 4992 u8 struct_v; 4993 u32 struct_len; 4994 int i; 4995 int ret; 4996 4997 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t", 4998 &struct_v, &struct_len); 4999 if (ret) 5000 return ret; 5001 5002 *num_watchers = ceph_decode_32(p); 5003 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO); 5004 if (!*watchers) 5005 return -ENOMEM; 5006 5007 for (i = 0; i < *num_watchers; i++) { 5008 ret = decode_watcher(p, end, *watchers + i); 5009 if (ret) { 5010 kfree(*watchers); 5011 return ret; 5012 } 5013 } 5014 5015 return 0; 5016 } 5017 5018 /* 5019 * On success, the caller is responsible for: 5020 * 5021 * kfree(watchers); 5022 */ 5023 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, 5024 struct ceph_object_id *oid, 5025 struct ceph_object_locator *oloc, 5026 struct ceph_watch_item **watchers, 5027 u32 *num_watchers) 5028 { 5029 struct ceph_osd_request *req; 5030 struct page **pages; 5031 int ret; 5032 5033 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 5034 if (!req) 5035 return -ENOMEM; 5036 5037 ceph_oid_copy(&req->r_base_oid, oid); 5038 ceph_oloc_copy(&req->r_base_oloc, oloc); 5039 req->r_flags = CEPH_OSD_FLAG_READ; 5040 5041 pages = ceph_alloc_page_vector(1, GFP_NOIO); 5042 if (IS_ERR(pages)) { 5043 ret = PTR_ERR(pages); 5044 goto out_put_req; 5045 } 5046 5047 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0); 5048 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers, 5049 response_data), 5050 pages, PAGE_SIZE, 0, false, true); 5051 5052 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 5053 if (ret) 5054 goto out_put_req; 5055 5056 ceph_osdc_start_request(osdc, req); 5057 ret = ceph_osdc_wait_request(osdc, req); 5058 if (ret >= 0) { 5059 void *p = page_address(pages[0]); 5060 void *const end = p + req->r_ops[0].outdata_len; 5061 5062 ret = decode_watchers(&p, end, watchers, num_watchers); 5063 } 5064 5065 out_put_req: 5066 ceph_osdc_put_request(req); 5067 return ret; 5068 } 5069 EXPORT_SYMBOL(ceph_osdc_list_watchers); 5070 5071 /* 5072 * Call all pending notify callbacks - for use after a watch is 5073 * unregistered, to make sure no more callbacks for it will be invoked 5074 */ 5075 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 5076 { 5077 dout("%s osdc %p\n", __func__, osdc); 5078 flush_workqueue(osdc->notify_wq); 5079 } 5080 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 5081 5082 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) 5083 { 5084 down_read(&osdc->lock); 5085 maybe_request_map(osdc); 5086 up_read(&osdc->lock); 5087 } 5088 EXPORT_SYMBOL(ceph_osdc_maybe_request_map); 5089 5090 /* 5091 * Execute an OSD class method on an object. 5092 * 5093 * @flags: CEPH_OSD_FLAG_* 5094 * @resp_len: in/out param for reply length 5095 */ 5096 int ceph_osdc_call(struct ceph_osd_client *osdc, 5097 struct ceph_object_id *oid, 5098 struct ceph_object_locator *oloc, 5099 const char *class, const char *method, 5100 unsigned int flags, 5101 struct page *req_page, size_t req_len, 5102 struct page **resp_pages, size_t *resp_len) 5103 { 5104 struct ceph_osd_request *req; 5105 int ret; 5106 5107 if (req_len > PAGE_SIZE) 5108 return -E2BIG; 5109 5110 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 5111 if (!req) 5112 return -ENOMEM; 5113 5114 ceph_oid_copy(&req->r_base_oid, oid); 5115 ceph_oloc_copy(&req->r_base_oloc, oloc); 5116 req->r_flags = flags; 5117 5118 ret = osd_req_op_cls_init(req, 0, class, method); 5119 if (ret) 5120 goto out_put_req; 5121 5122 if (req_page) 5123 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 5124 0, false, false); 5125 if (resp_pages) 5126 osd_req_op_cls_response_data_pages(req, 0, resp_pages, 5127 *resp_len, 0, false, false); 5128 5129 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 5130 if (ret) 5131 goto out_put_req; 5132 5133 ceph_osdc_start_request(osdc, req); 5134 ret = ceph_osdc_wait_request(osdc, req); 5135 if (ret >= 0) { 5136 ret = req->r_ops[0].rval; 5137 if (resp_pages) 5138 *resp_len = req->r_ops[0].outdata_len; 5139 } 5140 5141 out_put_req: 5142 ceph_osdc_put_request(req); 5143 return ret; 5144 } 5145 EXPORT_SYMBOL(ceph_osdc_call); 5146 5147 /* 5148 * reset all osd connections 5149 */ 5150 void ceph_osdc_reopen_osds(struct ceph_osd_client *osdc) 5151 { 5152 struct rb_node *n; 5153 5154 down_write(&osdc->lock); 5155 for (n = rb_first(&osdc->osds); n; ) { 5156 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 5157 5158 n = rb_next(n); 5159 if (!reopen_osd(osd)) 5160 kick_osd_requests(osd); 5161 } 5162 up_write(&osdc->lock); 5163 } 5164 5165 /* 5166 * init, shutdown 5167 */ 5168 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 5169 { 5170 int err; 5171 5172 dout("init\n"); 5173 osdc->client = client; 5174 init_rwsem(&osdc->lock); 5175 osdc->osds = RB_ROOT; 5176 INIT_LIST_HEAD(&osdc->osd_lru); 5177 spin_lock_init(&osdc->osd_lru_lock); 5178 osd_init(&osdc->homeless_osd); 5179 osdc->homeless_osd.o_osdc = osdc; 5180 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; 5181 osdc->last_linger_id = CEPH_LINGER_ID_START; 5182 osdc->linger_requests = RB_ROOT; 5183 osdc->map_checks = RB_ROOT; 5184 osdc->linger_map_checks = RB_ROOT; 5185 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 5186 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 5187 5188 err = -ENOMEM; 5189 osdc->osdmap = ceph_osdmap_alloc(); 5190 if (!osdc->osdmap) 5191 goto out; 5192 5193 osdc->req_mempool = mempool_create_slab_pool(10, 5194 ceph_osd_request_cache); 5195 if (!osdc->req_mempool) 5196 goto out_map; 5197 5198 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 5199 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op"); 5200 if (err < 0) 5201 goto out_mempool; 5202 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 5203 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, 5204 "osd_op_reply"); 5205 if (err < 0) 5206 goto out_msgpool; 5207 5208 err = -ENOMEM; 5209 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 5210 if (!osdc->notify_wq) 5211 goto out_msgpool_reply; 5212 5213 osdc->completion_wq = create_singlethread_workqueue("ceph-completion"); 5214 if (!osdc->completion_wq) 5215 goto out_notify_wq; 5216 5217 schedule_delayed_work(&osdc->timeout_work, 5218 osdc->client->options->osd_keepalive_timeout); 5219 schedule_delayed_work(&osdc->osds_timeout_work, 5220 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 5221 5222 return 0; 5223 5224 out_notify_wq: 5225 destroy_workqueue(osdc->notify_wq); 5226 out_msgpool_reply: 5227 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5228 out_msgpool: 5229 ceph_msgpool_destroy(&osdc->msgpool_op); 5230 out_mempool: 5231 mempool_destroy(osdc->req_mempool); 5232 out_map: 5233 ceph_osdmap_destroy(osdc->osdmap); 5234 out: 5235 return err; 5236 } 5237 5238 void ceph_osdc_stop(struct ceph_osd_client *osdc) 5239 { 5240 destroy_workqueue(osdc->completion_wq); 5241 destroy_workqueue(osdc->notify_wq); 5242 cancel_delayed_work_sync(&osdc->timeout_work); 5243 cancel_delayed_work_sync(&osdc->osds_timeout_work); 5244 5245 down_write(&osdc->lock); 5246 while (!RB_EMPTY_ROOT(&osdc->osds)) { 5247 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 5248 struct ceph_osd, o_node); 5249 close_osd(osd); 5250 } 5251 up_write(&osdc->lock); 5252 WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1); 5253 osd_cleanup(&osdc->homeless_osd); 5254 5255 WARN_ON(!list_empty(&osdc->osd_lru)); 5256 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); 5257 WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks)); 5258 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks)); 5259 WARN_ON(atomic_read(&osdc->num_requests)); 5260 WARN_ON(atomic_read(&osdc->num_homeless)); 5261 5262 ceph_osdmap_destroy(osdc->osdmap); 5263 mempool_destroy(osdc->req_mempool); 5264 ceph_msgpool_destroy(&osdc->msgpool_op); 5265 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5266 } 5267 5268 int osd_req_op_copy_from_init(struct ceph_osd_request *req, 5269 u64 src_snapid, u64 src_version, 5270 struct ceph_object_id *src_oid, 5271 struct ceph_object_locator *src_oloc, 5272 u32 src_fadvise_flags, 5273 u32 dst_fadvise_flags, 5274 u32 truncate_seq, u64 truncate_size, 5275 u8 copy_from_flags) 5276 { 5277 struct ceph_osd_req_op *op; 5278 struct page **pages; 5279 void *p, *end; 5280 5281 pages = ceph_alloc_page_vector(1, GFP_KERNEL); 5282 if (IS_ERR(pages)) 5283 return PTR_ERR(pages); 5284 5285 op = osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM2, 5286 dst_fadvise_flags); 5287 op->copy_from.snapid = src_snapid; 5288 op->copy_from.src_version = src_version; 5289 op->copy_from.flags = copy_from_flags; 5290 op->copy_from.src_fadvise_flags = src_fadvise_flags; 5291 5292 p = page_address(pages[0]); 5293 end = p + PAGE_SIZE; 5294 ceph_encode_string(&p, end, src_oid->name, src_oid->name_len); 5295 encode_oloc(&p, end, src_oloc); 5296 ceph_encode_32(&p, truncate_seq); 5297 ceph_encode_64(&p, truncate_size); 5298 op->indata_len = PAGE_SIZE - (end - p); 5299 5300 ceph_osd_data_pages_init(&op->copy_from.osd_data, pages, 5301 op->indata_len, 0, false, true); 5302 return 0; 5303 } 5304 EXPORT_SYMBOL(osd_req_op_copy_from_init); 5305 5306 int __init ceph_osdc_setup(void) 5307 { 5308 size_t size = sizeof(struct ceph_osd_request) + 5309 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 5310 5311 BUG_ON(ceph_osd_request_cache); 5312 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 5313 0, 0, NULL); 5314 5315 return ceph_osd_request_cache ? 0 : -ENOMEM; 5316 } 5317 5318 void ceph_osdc_cleanup(void) 5319 { 5320 BUG_ON(!ceph_osd_request_cache); 5321 kmem_cache_destroy(ceph_osd_request_cache); 5322 ceph_osd_request_cache = NULL; 5323 } 5324 5325 /* 5326 * handle incoming message 5327 */ 5328 static void osd_dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5329 { 5330 struct ceph_osd *osd = con->private; 5331 struct ceph_osd_client *osdc = osd->o_osdc; 5332 int type = le16_to_cpu(msg->hdr.type); 5333 5334 switch (type) { 5335 case CEPH_MSG_OSD_MAP: 5336 ceph_osdc_handle_map(osdc, msg); 5337 break; 5338 case CEPH_MSG_OSD_OPREPLY: 5339 handle_reply(osd, msg); 5340 break; 5341 case CEPH_MSG_OSD_BACKOFF: 5342 handle_backoff(osd, msg); 5343 break; 5344 case CEPH_MSG_WATCH_NOTIFY: 5345 handle_watch_notify(osdc, msg); 5346 break; 5347 5348 default: 5349 pr_err("received unknown message type %d %s\n", type, 5350 ceph_msg_type_name(type)); 5351 } 5352 5353 ceph_msg_put(msg); 5354 } 5355 5356 /* 5357 * Lookup and return message for incoming reply. Don't try to do 5358 * anything about a larger than preallocated data portion of the 5359 * message at the moment - for now, just skip the message. 5360 */ 5361 static struct ceph_msg *get_reply(struct ceph_connection *con, 5362 struct ceph_msg_header *hdr, 5363 int *skip) 5364 { 5365 struct ceph_osd *osd = con->private; 5366 struct ceph_osd_client *osdc = osd->o_osdc; 5367 struct ceph_msg *m = NULL; 5368 struct ceph_osd_request *req; 5369 int front_len = le32_to_cpu(hdr->front_len); 5370 int data_len = le32_to_cpu(hdr->data_len); 5371 u64 tid = le64_to_cpu(hdr->tid); 5372 5373 down_read(&osdc->lock); 5374 if (!osd_registered(osd)) { 5375 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd); 5376 *skip = 1; 5377 goto out_unlock_osdc; 5378 } 5379 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num)); 5380 5381 mutex_lock(&osd->lock); 5382 req = lookup_request(&osd->o_requests, tid); 5383 if (!req) { 5384 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 5385 osd->o_osd, tid); 5386 *skip = 1; 5387 goto out_unlock_session; 5388 } 5389 5390 ceph_msg_revoke_incoming(req->r_reply); 5391 5392 if (front_len > req->r_reply->front_alloc_len) { 5393 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 5394 __func__, osd->o_osd, req->r_tid, front_len, 5395 req->r_reply->front_alloc_len); 5396 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 5397 false); 5398 if (!m) 5399 goto out_unlock_session; 5400 ceph_msg_put(req->r_reply); 5401 req->r_reply = m; 5402 } 5403 5404 if (data_len > req->r_reply->data_length) { 5405 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 5406 __func__, osd->o_osd, req->r_tid, data_len, 5407 req->r_reply->data_length); 5408 m = NULL; 5409 *skip = 1; 5410 goto out_unlock_session; 5411 } 5412 5413 m = ceph_msg_get(req->r_reply); 5414 dout("get_reply tid %lld %p\n", tid, m); 5415 5416 out_unlock_session: 5417 mutex_unlock(&osd->lock); 5418 out_unlock_osdc: 5419 up_read(&osdc->lock); 5420 return m; 5421 } 5422 5423 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) 5424 { 5425 struct ceph_msg *m; 5426 int type = le16_to_cpu(hdr->type); 5427 u32 front_len = le32_to_cpu(hdr->front_len); 5428 u32 data_len = le32_to_cpu(hdr->data_len); 5429 5430 m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false); 5431 if (!m) 5432 return NULL; 5433 5434 if (data_len) { 5435 struct page **pages; 5436 5437 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len), 5438 GFP_NOIO); 5439 if (IS_ERR(pages)) { 5440 ceph_msg_put(m); 5441 return NULL; 5442 } 5443 5444 ceph_msg_data_add_pages(m, pages, data_len, 0, true); 5445 } 5446 5447 return m; 5448 } 5449 5450 static struct ceph_msg *osd_alloc_msg(struct ceph_connection *con, 5451 struct ceph_msg_header *hdr, 5452 int *skip) 5453 { 5454 struct ceph_osd *osd = con->private; 5455 int type = le16_to_cpu(hdr->type); 5456 5457 *skip = 0; 5458 switch (type) { 5459 case CEPH_MSG_OSD_MAP: 5460 case CEPH_MSG_OSD_BACKOFF: 5461 case CEPH_MSG_WATCH_NOTIFY: 5462 return alloc_msg_with_page_vector(hdr); 5463 case CEPH_MSG_OSD_OPREPLY: 5464 return get_reply(con, hdr, skip); 5465 default: 5466 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__, 5467 osd->o_osd, type); 5468 *skip = 1; 5469 return NULL; 5470 } 5471 } 5472 5473 /* 5474 * Wrappers to refcount containing ceph_osd struct 5475 */ 5476 static struct ceph_connection *osd_get_con(struct ceph_connection *con) 5477 { 5478 struct ceph_osd *osd = con->private; 5479 if (get_osd(osd)) 5480 return con; 5481 return NULL; 5482 } 5483 5484 static void osd_put_con(struct ceph_connection *con) 5485 { 5486 struct ceph_osd *osd = con->private; 5487 put_osd(osd); 5488 } 5489 5490 /* 5491 * authentication 5492 */ 5493 5494 /* 5495 * Note: returned pointer is the address of a structure that's 5496 * managed separately. Caller must *not* attempt to free it. 5497 */ 5498 static struct ceph_auth_handshake * 5499 osd_get_authorizer(struct ceph_connection *con, int *proto, int force_new) 5500 { 5501 struct ceph_osd *o = con->private; 5502 struct ceph_osd_client *osdc = o->o_osdc; 5503 struct ceph_auth_client *ac = osdc->client->monc.auth; 5504 struct ceph_auth_handshake *auth = &o->o_auth; 5505 int ret; 5506 5507 ret = __ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD, 5508 force_new, proto, NULL, NULL); 5509 if (ret) 5510 return ERR_PTR(ret); 5511 5512 return auth; 5513 } 5514 5515 static int osd_add_authorizer_challenge(struct ceph_connection *con, 5516 void *challenge_buf, int challenge_buf_len) 5517 { 5518 struct ceph_osd *o = con->private; 5519 struct ceph_osd_client *osdc = o->o_osdc; 5520 struct ceph_auth_client *ac = osdc->client->monc.auth; 5521 5522 return ceph_auth_add_authorizer_challenge(ac, o->o_auth.authorizer, 5523 challenge_buf, challenge_buf_len); 5524 } 5525 5526 static int osd_verify_authorizer_reply(struct ceph_connection *con) 5527 { 5528 struct ceph_osd *o = con->private; 5529 struct ceph_osd_client *osdc = o->o_osdc; 5530 struct ceph_auth_client *ac = osdc->client->monc.auth; 5531 struct ceph_auth_handshake *auth = &o->o_auth; 5532 5533 return ceph_auth_verify_authorizer_reply(ac, auth->authorizer, 5534 auth->authorizer_reply_buf, auth->authorizer_reply_buf_len, 5535 NULL, NULL, NULL, NULL); 5536 } 5537 5538 static int osd_invalidate_authorizer(struct ceph_connection *con) 5539 { 5540 struct ceph_osd *o = con->private; 5541 struct ceph_osd_client *osdc = o->o_osdc; 5542 struct ceph_auth_client *ac = osdc->client->monc.auth; 5543 5544 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 5545 return ceph_monc_validate_auth(&osdc->client->monc); 5546 } 5547 5548 static int osd_get_auth_request(struct ceph_connection *con, 5549 void *buf, int *buf_len, 5550 void **authorizer, int *authorizer_len) 5551 { 5552 struct ceph_osd *o = con->private; 5553 struct ceph_auth_client *ac = o->o_osdc->client->monc.auth; 5554 struct ceph_auth_handshake *auth = &o->o_auth; 5555 int ret; 5556 5557 ret = ceph_auth_get_authorizer(ac, auth, CEPH_ENTITY_TYPE_OSD, 5558 buf, buf_len); 5559 if (ret) 5560 return ret; 5561 5562 *authorizer = auth->authorizer_buf; 5563 *authorizer_len = auth->authorizer_buf_len; 5564 return 0; 5565 } 5566 5567 static int osd_handle_auth_reply_more(struct ceph_connection *con, 5568 void *reply, int reply_len, 5569 void *buf, int *buf_len, 5570 void **authorizer, int *authorizer_len) 5571 { 5572 struct ceph_osd *o = con->private; 5573 struct ceph_auth_client *ac = o->o_osdc->client->monc.auth; 5574 struct ceph_auth_handshake *auth = &o->o_auth; 5575 int ret; 5576 5577 ret = ceph_auth_handle_svc_reply_more(ac, auth, reply, reply_len, 5578 buf, buf_len); 5579 if (ret) 5580 return ret; 5581 5582 *authorizer = auth->authorizer_buf; 5583 *authorizer_len = auth->authorizer_buf_len; 5584 return 0; 5585 } 5586 5587 static int osd_handle_auth_done(struct ceph_connection *con, 5588 u64 global_id, void *reply, int reply_len, 5589 u8 *session_key, int *session_key_len, 5590 u8 *con_secret, int *con_secret_len) 5591 { 5592 struct ceph_osd *o = con->private; 5593 struct ceph_auth_client *ac = o->o_osdc->client->monc.auth; 5594 struct ceph_auth_handshake *auth = &o->o_auth; 5595 5596 return ceph_auth_handle_svc_reply_done(ac, auth, reply, reply_len, 5597 session_key, session_key_len, 5598 con_secret, con_secret_len); 5599 } 5600 5601 static int osd_handle_auth_bad_method(struct ceph_connection *con, 5602 int used_proto, int result, 5603 const int *allowed_protos, int proto_cnt, 5604 const int *allowed_modes, int mode_cnt) 5605 { 5606 struct ceph_osd *o = con->private; 5607 struct ceph_mon_client *monc = &o->o_osdc->client->monc; 5608 int ret; 5609 5610 if (ceph_auth_handle_bad_authorizer(monc->auth, CEPH_ENTITY_TYPE_OSD, 5611 used_proto, result, 5612 allowed_protos, proto_cnt, 5613 allowed_modes, mode_cnt)) { 5614 ret = ceph_monc_validate_auth(monc); 5615 if (ret) 5616 return ret; 5617 } 5618 5619 return -EACCES; 5620 } 5621 5622 static void osd_reencode_message(struct ceph_msg *msg) 5623 { 5624 int type = le16_to_cpu(msg->hdr.type); 5625 5626 if (type == CEPH_MSG_OSD_OP) 5627 encode_request_finish(msg); 5628 } 5629 5630 static int osd_sign_message(struct ceph_msg *msg) 5631 { 5632 struct ceph_osd *o = msg->con->private; 5633 struct ceph_auth_handshake *auth = &o->o_auth; 5634 5635 return ceph_auth_sign_message(auth, msg); 5636 } 5637 5638 static int osd_check_message_signature(struct ceph_msg *msg) 5639 { 5640 struct ceph_osd *o = msg->con->private; 5641 struct ceph_auth_handshake *auth = &o->o_auth; 5642 5643 return ceph_auth_check_message_signature(auth, msg); 5644 } 5645 5646 static const struct ceph_connection_operations osd_con_ops = { 5647 .get = osd_get_con, 5648 .put = osd_put_con, 5649 .alloc_msg = osd_alloc_msg, 5650 .dispatch = osd_dispatch, 5651 .fault = osd_fault, 5652 .reencode_message = osd_reencode_message, 5653 .get_authorizer = osd_get_authorizer, 5654 .add_authorizer_challenge = osd_add_authorizer_challenge, 5655 .verify_authorizer_reply = osd_verify_authorizer_reply, 5656 .invalidate_authorizer = osd_invalidate_authorizer, 5657 .sign_message = osd_sign_message, 5658 .check_message_signature = osd_check_message_signature, 5659 .get_auth_request = osd_get_auth_request, 5660 .handle_auth_reply_more = osd_handle_auth_reply_more, 5661 .handle_auth_done = osd_handle_auth_done, 5662 .handle_auth_bad_method = osd_handle_auth_bad_method, 5663 }; 5664