1 // SPDX-License-Identifier: GPL-2.0 2 3 #include <linux/ceph/ceph_debug.h> 4 5 #include <linux/module.h> 6 #include <linux/err.h> 7 #include <linux/highmem.h> 8 #include <linux/mm.h> 9 #include <linux/pagemap.h> 10 #include <linux/slab.h> 11 #include <linux/uaccess.h> 12 #ifdef CONFIG_BLOCK 13 #include <linux/bio.h> 14 #endif 15 16 #include <linux/ceph/ceph_features.h> 17 #include <linux/ceph/libceph.h> 18 #include <linux/ceph/osd_client.h> 19 #include <linux/ceph/messenger.h> 20 #include <linux/ceph/decode.h> 21 #include <linux/ceph/auth.h> 22 #include <linux/ceph/pagelist.h> 23 #include <linux/ceph/striper.h> 24 25 #define OSD_OPREPLY_FRONT_LEN 512 26 27 static struct kmem_cache *ceph_osd_request_cache; 28 29 static const struct ceph_connection_operations osd_con_ops; 30 31 /* 32 * Implement client access to distributed object storage cluster. 33 * 34 * All data objects are stored within a cluster/cloud of OSDs, or 35 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 36 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 37 * remote daemons serving up and coordinating consistent and safe 38 * access to storage. 39 * 40 * Cluster membership and the mapping of data objects onto storage devices 41 * are described by the osd map. 42 * 43 * We keep track of pending OSD requests (read, write), resubmit 44 * requests to different OSDs when the cluster topology/data layout 45 * change, or retry the affected requests when the communications 46 * channel with an OSD is reset. 47 */ 48 49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); 50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); 51 static void link_linger(struct ceph_osd *osd, 52 struct ceph_osd_linger_request *lreq); 53 static void unlink_linger(struct ceph_osd *osd, 54 struct ceph_osd_linger_request *lreq); 55 static void clear_backoffs(struct ceph_osd *osd); 56 57 #if 1 58 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) 59 { 60 bool wrlocked = true; 61 62 if (unlikely(down_read_trylock(sem))) { 63 wrlocked = false; 64 up_read(sem); 65 } 66 67 return wrlocked; 68 } 69 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) 70 { 71 WARN_ON(!rwsem_is_locked(&osdc->lock)); 72 } 73 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) 74 { 75 WARN_ON(!rwsem_is_wrlocked(&osdc->lock)); 76 } 77 static inline void verify_osd_locked(struct ceph_osd *osd) 78 { 79 struct ceph_osd_client *osdc = osd->o_osdc; 80 81 WARN_ON(!(mutex_is_locked(&osd->lock) && 82 rwsem_is_locked(&osdc->lock)) && 83 !rwsem_is_wrlocked(&osdc->lock)); 84 } 85 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) 86 { 87 WARN_ON(!mutex_is_locked(&lreq->lock)); 88 } 89 #else 90 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } 91 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } 92 static inline void verify_osd_locked(struct ceph_osd *osd) { } 93 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } 94 #endif 95 96 /* 97 * calculate the mapping of a file extent onto an object, and fill out the 98 * request accordingly. shorten extent as necessary if it crosses an 99 * object boundary. 100 * 101 * fill osd op in request message. 102 */ 103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 104 u64 *objnum, u64 *objoff, u64 *objlen) 105 { 106 u64 orig_len = *plen; 107 u32 xlen; 108 109 /* object extent? */ 110 ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 111 objoff, &xlen); 112 *objlen = xlen; 113 if (*objlen < orig_len) { 114 *plen = *objlen; 115 dout(" skipping last %llu, final file extent %llu~%llu\n", 116 orig_len - *plen, off, *plen); 117 } 118 119 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 120 return 0; 121 } 122 123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 124 { 125 memset(osd_data, 0, sizeof (*osd_data)); 126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 127 } 128 129 /* 130 * Consumes @pages if @own_pages is true. 131 */ 132 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 133 struct page **pages, u64 length, u32 alignment, 134 bool pages_from_pool, bool own_pages) 135 { 136 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 137 osd_data->pages = pages; 138 osd_data->length = length; 139 osd_data->alignment = alignment; 140 osd_data->pages_from_pool = pages_from_pool; 141 osd_data->own_pages = own_pages; 142 } 143 144 /* 145 * Consumes a ref on @pagelist. 146 */ 147 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 148 struct ceph_pagelist *pagelist) 149 { 150 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 151 osd_data->pagelist = pagelist; 152 } 153 154 #ifdef CONFIG_BLOCK 155 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 156 struct ceph_bio_iter *bio_pos, 157 u32 bio_length) 158 { 159 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 160 osd_data->bio_pos = *bio_pos; 161 osd_data->bio_length = bio_length; 162 } 163 #endif /* CONFIG_BLOCK */ 164 165 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data, 166 struct ceph_bvec_iter *bvec_pos, 167 u32 num_bvecs) 168 { 169 osd_data->type = CEPH_OSD_DATA_TYPE_BVECS; 170 osd_data->bvec_pos = *bvec_pos; 171 osd_data->num_bvecs = num_bvecs; 172 } 173 174 static struct ceph_osd_data * 175 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 176 { 177 BUG_ON(which >= osd_req->r_num_ops); 178 179 return &osd_req->r_ops[which].raw_data_in; 180 } 181 182 struct ceph_osd_data * 183 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 184 unsigned int which) 185 { 186 return osd_req_op_data(osd_req, which, extent, osd_data); 187 } 188 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 189 190 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 191 unsigned int which, struct page **pages, 192 u64 length, u32 alignment, 193 bool pages_from_pool, bool own_pages) 194 { 195 struct ceph_osd_data *osd_data; 196 197 osd_data = osd_req_op_raw_data_in(osd_req, which); 198 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 199 pages_from_pool, own_pages); 200 } 201 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 202 203 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 204 unsigned int which, struct page **pages, 205 u64 length, u32 alignment, 206 bool pages_from_pool, bool own_pages) 207 { 208 struct ceph_osd_data *osd_data; 209 210 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 211 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 212 pages_from_pool, own_pages); 213 } 214 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 215 216 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 217 unsigned int which, struct ceph_pagelist *pagelist) 218 { 219 struct ceph_osd_data *osd_data; 220 221 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 222 ceph_osd_data_pagelist_init(osd_data, pagelist); 223 } 224 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 225 226 #ifdef CONFIG_BLOCK 227 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 228 unsigned int which, 229 struct ceph_bio_iter *bio_pos, 230 u32 bio_length) 231 { 232 struct ceph_osd_data *osd_data; 233 234 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 235 ceph_osd_data_bio_init(osd_data, bio_pos, bio_length); 236 } 237 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 238 #endif /* CONFIG_BLOCK */ 239 240 void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req, 241 unsigned int which, 242 struct bio_vec *bvecs, u32 num_bvecs, 243 u32 bytes) 244 { 245 struct ceph_osd_data *osd_data; 246 struct ceph_bvec_iter it = { 247 .bvecs = bvecs, 248 .iter = { .bi_size = bytes }, 249 }; 250 251 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 252 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs); 253 } 254 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs); 255 256 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req, 257 unsigned int which, 258 struct ceph_bvec_iter *bvec_pos) 259 { 260 struct ceph_osd_data *osd_data; 261 262 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 263 ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0); 264 } 265 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos); 266 267 static void osd_req_op_cls_request_info_pagelist( 268 struct ceph_osd_request *osd_req, 269 unsigned int which, struct ceph_pagelist *pagelist) 270 { 271 struct ceph_osd_data *osd_data; 272 273 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 274 ceph_osd_data_pagelist_init(osd_data, pagelist); 275 } 276 277 void osd_req_op_cls_request_data_pagelist( 278 struct ceph_osd_request *osd_req, 279 unsigned int which, struct ceph_pagelist *pagelist) 280 { 281 struct ceph_osd_data *osd_data; 282 283 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 284 ceph_osd_data_pagelist_init(osd_data, pagelist); 285 osd_req->r_ops[which].cls.indata_len += pagelist->length; 286 osd_req->r_ops[which].indata_len += pagelist->length; 287 } 288 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 289 290 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 291 unsigned int which, struct page **pages, u64 length, 292 u32 alignment, bool pages_from_pool, bool own_pages) 293 { 294 struct ceph_osd_data *osd_data; 295 296 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 297 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 298 pages_from_pool, own_pages); 299 osd_req->r_ops[which].cls.indata_len += length; 300 osd_req->r_ops[which].indata_len += length; 301 } 302 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 303 304 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req, 305 unsigned int which, 306 struct bio_vec *bvecs, u32 num_bvecs, 307 u32 bytes) 308 { 309 struct ceph_osd_data *osd_data; 310 struct ceph_bvec_iter it = { 311 .bvecs = bvecs, 312 .iter = { .bi_size = bytes }, 313 }; 314 315 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 316 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs); 317 osd_req->r_ops[which].cls.indata_len += bytes; 318 osd_req->r_ops[which].indata_len += bytes; 319 } 320 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs); 321 322 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 323 unsigned int which, struct page **pages, u64 length, 324 u32 alignment, bool pages_from_pool, bool own_pages) 325 { 326 struct ceph_osd_data *osd_data; 327 328 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 329 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 330 pages_from_pool, own_pages); 331 } 332 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 333 334 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 335 { 336 switch (osd_data->type) { 337 case CEPH_OSD_DATA_TYPE_NONE: 338 return 0; 339 case CEPH_OSD_DATA_TYPE_PAGES: 340 return osd_data->length; 341 case CEPH_OSD_DATA_TYPE_PAGELIST: 342 return (u64)osd_data->pagelist->length; 343 #ifdef CONFIG_BLOCK 344 case CEPH_OSD_DATA_TYPE_BIO: 345 return (u64)osd_data->bio_length; 346 #endif /* CONFIG_BLOCK */ 347 case CEPH_OSD_DATA_TYPE_BVECS: 348 return osd_data->bvec_pos.iter.bi_size; 349 default: 350 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 351 return 0; 352 } 353 } 354 355 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 356 { 357 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 358 int num_pages; 359 360 num_pages = calc_pages_for((u64)osd_data->alignment, 361 (u64)osd_data->length); 362 ceph_release_page_vector(osd_data->pages, num_pages); 363 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 364 ceph_pagelist_release(osd_data->pagelist); 365 } 366 ceph_osd_data_init(osd_data); 367 } 368 369 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 370 unsigned int which) 371 { 372 struct ceph_osd_req_op *op; 373 374 BUG_ON(which >= osd_req->r_num_ops); 375 op = &osd_req->r_ops[which]; 376 377 switch (op->op) { 378 case CEPH_OSD_OP_READ: 379 case CEPH_OSD_OP_WRITE: 380 case CEPH_OSD_OP_WRITEFULL: 381 ceph_osd_data_release(&op->extent.osd_data); 382 break; 383 case CEPH_OSD_OP_CALL: 384 ceph_osd_data_release(&op->cls.request_info); 385 ceph_osd_data_release(&op->cls.request_data); 386 ceph_osd_data_release(&op->cls.response_data); 387 break; 388 case CEPH_OSD_OP_SETXATTR: 389 case CEPH_OSD_OP_CMPXATTR: 390 ceph_osd_data_release(&op->xattr.osd_data); 391 break; 392 case CEPH_OSD_OP_STAT: 393 ceph_osd_data_release(&op->raw_data_in); 394 break; 395 case CEPH_OSD_OP_NOTIFY_ACK: 396 ceph_osd_data_release(&op->notify_ack.request_data); 397 break; 398 case CEPH_OSD_OP_NOTIFY: 399 ceph_osd_data_release(&op->notify.request_data); 400 ceph_osd_data_release(&op->notify.response_data); 401 break; 402 case CEPH_OSD_OP_LIST_WATCHERS: 403 ceph_osd_data_release(&op->list_watchers.response_data); 404 break; 405 case CEPH_OSD_OP_COPY_FROM: 406 ceph_osd_data_release(&op->copy_from.osd_data); 407 break; 408 default: 409 break; 410 } 411 } 412 413 /* 414 * Assumes @t is zero-initialized. 415 */ 416 static void target_init(struct ceph_osd_request_target *t) 417 { 418 ceph_oid_init(&t->base_oid); 419 ceph_oloc_init(&t->base_oloc); 420 ceph_oid_init(&t->target_oid); 421 ceph_oloc_init(&t->target_oloc); 422 423 ceph_osds_init(&t->acting); 424 ceph_osds_init(&t->up); 425 t->size = -1; 426 t->min_size = -1; 427 428 t->osd = CEPH_HOMELESS_OSD; 429 } 430 431 static void target_copy(struct ceph_osd_request_target *dest, 432 const struct ceph_osd_request_target *src) 433 { 434 ceph_oid_copy(&dest->base_oid, &src->base_oid); 435 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); 436 ceph_oid_copy(&dest->target_oid, &src->target_oid); 437 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); 438 439 dest->pgid = src->pgid; /* struct */ 440 dest->spgid = src->spgid; /* struct */ 441 dest->pg_num = src->pg_num; 442 dest->pg_num_mask = src->pg_num_mask; 443 ceph_osds_copy(&dest->acting, &src->acting); 444 ceph_osds_copy(&dest->up, &src->up); 445 dest->size = src->size; 446 dest->min_size = src->min_size; 447 dest->sort_bitwise = src->sort_bitwise; 448 449 dest->flags = src->flags; 450 dest->paused = src->paused; 451 452 dest->epoch = src->epoch; 453 dest->last_force_resend = src->last_force_resend; 454 455 dest->osd = src->osd; 456 } 457 458 static void target_destroy(struct ceph_osd_request_target *t) 459 { 460 ceph_oid_destroy(&t->base_oid); 461 ceph_oloc_destroy(&t->base_oloc); 462 ceph_oid_destroy(&t->target_oid); 463 ceph_oloc_destroy(&t->target_oloc); 464 } 465 466 /* 467 * requests 468 */ 469 static void request_release_checks(struct ceph_osd_request *req) 470 { 471 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 472 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); 473 WARN_ON(!list_empty(&req->r_private_item)); 474 WARN_ON(req->r_osd); 475 } 476 477 static void ceph_osdc_release_request(struct kref *kref) 478 { 479 struct ceph_osd_request *req = container_of(kref, 480 struct ceph_osd_request, r_kref); 481 unsigned int which; 482 483 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 484 req->r_request, req->r_reply); 485 request_release_checks(req); 486 487 if (req->r_request) 488 ceph_msg_put(req->r_request); 489 if (req->r_reply) 490 ceph_msg_put(req->r_reply); 491 492 for (which = 0; which < req->r_num_ops; which++) 493 osd_req_op_data_release(req, which); 494 495 target_destroy(&req->r_t); 496 ceph_put_snap_context(req->r_snapc); 497 498 if (req->r_mempool) 499 mempool_free(req, req->r_osdc->req_mempool); 500 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 501 kmem_cache_free(ceph_osd_request_cache, req); 502 else 503 kfree(req); 504 } 505 506 void ceph_osdc_get_request(struct ceph_osd_request *req) 507 { 508 dout("%s %p (was %d)\n", __func__, req, 509 kref_read(&req->r_kref)); 510 kref_get(&req->r_kref); 511 } 512 EXPORT_SYMBOL(ceph_osdc_get_request); 513 514 void ceph_osdc_put_request(struct ceph_osd_request *req) 515 { 516 if (req) { 517 dout("%s %p (was %d)\n", __func__, req, 518 kref_read(&req->r_kref)); 519 kref_put(&req->r_kref, ceph_osdc_release_request); 520 } 521 } 522 EXPORT_SYMBOL(ceph_osdc_put_request); 523 524 static void request_init(struct ceph_osd_request *req) 525 { 526 /* req only, each op is zeroed in _osd_req_op_init() */ 527 memset(req, 0, sizeof(*req)); 528 529 kref_init(&req->r_kref); 530 init_completion(&req->r_completion); 531 RB_CLEAR_NODE(&req->r_node); 532 RB_CLEAR_NODE(&req->r_mc_node); 533 INIT_LIST_HEAD(&req->r_private_item); 534 535 target_init(&req->r_t); 536 } 537 538 /* 539 * This is ugly, but it allows us to reuse linger registration and ping 540 * requests, keeping the structure of the code around send_linger{_ping}() 541 * reasonable. Setting up a min_nr=2 mempool for each linger request 542 * and dealing with copying ops (this blasts req only, watch op remains 543 * intact) isn't any better. 544 */ 545 static void request_reinit(struct ceph_osd_request *req) 546 { 547 struct ceph_osd_client *osdc = req->r_osdc; 548 bool mempool = req->r_mempool; 549 unsigned int num_ops = req->r_num_ops; 550 u64 snapid = req->r_snapid; 551 struct ceph_snap_context *snapc = req->r_snapc; 552 bool linger = req->r_linger; 553 struct ceph_msg *request_msg = req->r_request; 554 struct ceph_msg *reply_msg = req->r_reply; 555 556 dout("%s req %p\n", __func__, req); 557 WARN_ON(kref_read(&req->r_kref) != 1); 558 request_release_checks(req); 559 560 WARN_ON(kref_read(&request_msg->kref) != 1); 561 WARN_ON(kref_read(&reply_msg->kref) != 1); 562 target_destroy(&req->r_t); 563 564 request_init(req); 565 req->r_osdc = osdc; 566 req->r_mempool = mempool; 567 req->r_num_ops = num_ops; 568 req->r_snapid = snapid; 569 req->r_snapc = snapc; 570 req->r_linger = linger; 571 req->r_request = request_msg; 572 req->r_reply = reply_msg; 573 } 574 575 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 576 struct ceph_snap_context *snapc, 577 unsigned int num_ops, 578 bool use_mempool, 579 gfp_t gfp_flags) 580 { 581 struct ceph_osd_request *req; 582 583 if (use_mempool) { 584 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 585 req = mempool_alloc(osdc->req_mempool, gfp_flags); 586 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 587 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 588 } else { 589 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 590 req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags); 591 } 592 if (unlikely(!req)) 593 return NULL; 594 595 request_init(req); 596 req->r_osdc = osdc; 597 req->r_mempool = use_mempool; 598 req->r_num_ops = num_ops; 599 req->r_snapid = CEPH_NOSNAP; 600 req->r_snapc = ceph_get_snap_context(snapc); 601 602 dout("%s req %p\n", __func__, req); 603 return req; 604 } 605 EXPORT_SYMBOL(ceph_osdc_alloc_request); 606 607 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) 608 { 609 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 610 } 611 612 static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp, 613 int num_request_data_items, 614 int num_reply_data_items) 615 { 616 struct ceph_osd_client *osdc = req->r_osdc; 617 struct ceph_msg *msg; 618 int msg_size; 619 620 WARN_ON(req->r_request || req->r_reply); 621 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 622 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 623 624 /* create request message */ 625 msg_size = CEPH_ENCODING_START_BLK_LEN + 626 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 627 msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */ 628 msg_size += CEPH_ENCODING_START_BLK_LEN + 629 sizeof(struct ceph_osd_reqid); /* reqid */ 630 msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */ 631 msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */ 632 msg_size += CEPH_ENCODING_START_BLK_LEN + 633 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ 634 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 635 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 636 msg_size += 8; /* snapid */ 637 msg_size += 8; /* snap_seq */ 638 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 639 msg_size += 4 + 8; /* retry_attempt, features */ 640 641 if (req->r_mempool) 642 msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size, 643 num_request_data_items); 644 else 645 msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size, 646 num_request_data_items, gfp, true); 647 if (!msg) 648 return -ENOMEM; 649 650 memset(msg->front.iov_base, 0, msg->front.iov_len); 651 req->r_request = msg; 652 653 /* create reply message */ 654 msg_size = OSD_OPREPLY_FRONT_LEN; 655 msg_size += req->r_base_oid.name_len; 656 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 657 658 if (req->r_mempool) 659 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size, 660 num_reply_data_items); 661 else 662 msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size, 663 num_reply_data_items, gfp, true); 664 if (!msg) 665 return -ENOMEM; 666 667 req->r_reply = msg; 668 669 return 0; 670 } 671 672 static bool osd_req_opcode_valid(u16 opcode) 673 { 674 switch (opcode) { 675 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 676 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 677 #undef GENERATE_CASE 678 default: 679 return false; 680 } 681 } 682 683 static void get_num_data_items(struct ceph_osd_request *req, 684 int *num_request_data_items, 685 int *num_reply_data_items) 686 { 687 struct ceph_osd_req_op *op; 688 689 *num_request_data_items = 0; 690 *num_reply_data_items = 0; 691 692 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { 693 switch (op->op) { 694 /* request */ 695 case CEPH_OSD_OP_WRITE: 696 case CEPH_OSD_OP_WRITEFULL: 697 case CEPH_OSD_OP_SETXATTR: 698 case CEPH_OSD_OP_CMPXATTR: 699 case CEPH_OSD_OP_NOTIFY_ACK: 700 case CEPH_OSD_OP_COPY_FROM: 701 *num_request_data_items += 1; 702 break; 703 704 /* reply */ 705 case CEPH_OSD_OP_STAT: 706 case CEPH_OSD_OP_READ: 707 case CEPH_OSD_OP_LIST_WATCHERS: 708 *num_reply_data_items += 1; 709 break; 710 711 /* both */ 712 case CEPH_OSD_OP_NOTIFY: 713 *num_request_data_items += 1; 714 *num_reply_data_items += 1; 715 break; 716 case CEPH_OSD_OP_CALL: 717 *num_request_data_items += 2; 718 *num_reply_data_items += 1; 719 break; 720 721 default: 722 WARN_ON(!osd_req_opcode_valid(op->op)); 723 break; 724 } 725 } 726 } 727 728 /* 729 * oid, oloc and OSD op opcode(s) must be filled in before this function 730 * is called. 731 */ 732 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 733 { 734 int num_request_data_items, num_reply_data_items; 735 736 get_num_data_items(req, &num_request_data_items, &num_reply_data_items); 737 return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items, 738 num_reply_data_items); 739 } 740 EXPORT_SYMBOL(ceph_osdc_alloc_messages); 741 742 /* 743 * This is an osd op init function for opcodes that have no data or 744 * other information associated with them. It also serves as a 745 * common init routine for all the other init functions, below. 746 */ 747 static struct ceph_osd_req_op * 748 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 749 u16 opcode, u32 flags) 750 { 751 struct ceph_osd_req_op *op; 752 753 BUG_ON(which >= osd_req->r_num_ops); 754 BUG_ON(!osd_req_opcode_valid(opcode)); 755 756 op = &osd_req->r_ops[which]; 757 memset(op, 0, sizeof (*op)); 758 op->op = opcode; 759 op->flags = flags; 760 761 return op; 762 } 763 764 void osd_req_op_init(struct ceph_osd_request *osd_req, 765 unsigned int which, u16 opcode, u32 flags) 766 { 767 (void)_osd_req_op_init(osd_req, which, opcode, flags); 768 } 769 EXPORT_SYMBOL(osd_req_op_init); 770 771 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 772 unsigned int which, u16 opcode, 773 u64 offset, u64 length, 774 u64 truncate_size, u32 truncate_seq) 775 { 776 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 777 opcode, 0); 778 size_t payload_len = 0; 779 780 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 781 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 782 opcode != CEPH_OSD_OP_TRUNCATE); 783 784 op->extent.offset = offset; 785 op->extent.length = length; 786 op->extent.truncate_size = truncate_size; 787 op->extent.truncate_seq = truncate_seq; 788 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 789 payload_len += length; 790 791 op->indata_len = payload_len; 792 } 793 EXPORT_SYMBOL(osd_req_op_extent_init); 794 795 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 796 unsigned int which, u64 length) 797 { 798 struct ceph_osd_req_op *op; 799 u64 previous; 800 801 BUG_ON(which >= osd_req->r_num_ops); 802 op = &osd_req->r_ops[which]; 803 previous = op->extent.length; 804 805 if (length == previous) 806 return; /* Nothing to do */ 807 BUG_ON(length > previous); 808 809 op->extent.length = length; 810 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 811 op->indata_len -= previous - length; 812 } 813 EXPORT_SYMBOL(osd_req_op_extent_update); 814 815 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 816 unsigned int which, u64 offset_inc) 817 { 818 struct ceph_osd_req_op *op, *prev_op; 819 820 BUG_ON(which + 1 >= osd_req->r_num_ops); 821 822 prev_op = &osd_req->r_ops[which]; 823 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 824 /* dup previous one */ 825 op->indata_len = prev_op->indata_len; 826 op->outdata_len = prev_op->outdata_len; 827 op->extent = prev_op->extent; 828 /* adjust offset */ 829 op->extent.offset += offset_inc; 830 op->extent.length -= offset_inc; 831 832 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 833 op->indata_len -= offset_inc; 834 } 835 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 836 837 int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 838 const char *class, const char *method) 839 { 840 struct ceph_osd_req_op *op; 841 struct ceph_pagelist *pagelist; 842 size_t payload_len = 0; 843 size_t size; 844 845 op = _osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0); 846 847 pagelist = ceph_pagelist_alloc(GFP_NOFS); 848 if (!pagelist) 849 return -ENOMEM; 850 851 op->cls.class_name = class; 852 size = strlen(class); 853 BUG_ON(size > (size_t) U8_MAX); 854 op->cls.class_len = size; 855 ceph_pagelist_append(pagelist, class, size); 856 payload_len += size; 857 858 op->cls.method_name = method; 859 size = strlen(method); 860 BUG_ON(size > (size_t) U8_MAX); 861 op->cls.method_len = size; 862 ceph_pagelist_append(pagelist, method, size); 863 payload_len += size; 864 865 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 866 867 op->indata_len = payload_len; 868 return 0; 869 } 870 EXPORT_SYMBOL(osd_req_op_cls_init); 871 872 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 873 u16 opcode, const char *name, const void *value, 874 size_t size, u8 cmp_op, u8 cmp_mode) 875 { 876 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 877 opcode, 0); 878 struct ceph_pagelist *pagelist; 879 size_t payload_len; 880 881 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 882 883 pagelist = ceph_pagelist_alloc(GFP_NOFS); 884 if (!pagelist) 885 return -ENOMEM; 886 887 payload_len = strlen(name); 888 op->xattr.name_len = payload_len; 889 ceph_pagelist_append(pagelist, name, payload_len); 890 891 op->xattr.value_len = size; 892 ceph_pagelist_append(pagelist, value, size); 893 payload_len += size; 894 895 op->xattr.cmp_op = cmp_op; 896 op->xattr.cmp_mode = cmp_mode; 897 898 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 899 op->indata_len = payload_len; 900 return 0; 901 } 902 EXPORT_SYMBOL(osd_req_op_xattr_init); 903 904 /* 905 * @watch_opcode: CEPH_OSD_WATCH_OP_* 906 */ 907 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, 908 u64 cookie, u8 watch_opcode) 909 { 910 struct ceph_osd_req_op *op; 911 912 op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); 913 op->watch.cookie = cookie; 914 op->watch.op = watch_opcode; 915 op->watch.gen = 0; 916 } 917 918 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 919 unsigned int which, 920 u64 expected_object_size, 921 u64 expected_write_size) 922 { 923 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 924 CEPH_OSD_OP_SETALLOCHINT, 925 0); 926 927 op->alloc_hint.expected_object_size = expected_object_size; 928 op->alloc_hint.expected_write_size = expected_write_size; 929 930 /* 931 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 932 * not worth a feature bit. Set FAILOK per-op flag to make 933 * sure older osds don't trip over an unsupported opcode. 934 */ 935 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 936 } 937 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 938 939 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 940 struct ceph_osd_data *osd_data) 941 { 942 u64 length = ceph_osd_data_length(osd_data); 943 944 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 945 BUG_ON(length > (u64) SIZE_MAX); 946 if (length) 947 ceph_msg_data_add_pages(msg, osd_data->pages, 948 length, osd_data->alignment); 949 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 950 BUG_ON(!length); 951 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 952 #ifdef CONFIG_BLOCK 953 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 954 ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length); 955 #endif 956 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) { 957 ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos); 958 } else { 959 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 960 } 961 } 962 963 static u32 osd_req_encode_op(struct ceph_osd_op *dst, 964 const struct ceph_osd_req_op *src) 965 { 966 switch (src->op) { 967 case CEPH_OSD_OP_STAT: 968 break; 969 case CEPH_OSD_OP_READ: 970 case CEPH_OSD_OP_WRITE: 971 case CEPH_OSD_OP_WRITEFULL: 972 case CEPH_OSD_OP_ZERO: 973 case CEPH_OSD_OP_TRUNCATE: 974 dst->extent.offset = cpu_to_le64(src->extent.offset); 975 dst->extent.length = cpu_to_le64(src->extent.length); 976 dst->extent.truncate_size = 977 cpu_to_le64(src->extent.truncate_size); 978 dst->extent.truncate_seq = 979 cpu_to_le32(src->extent.truncate_seq); 980 break; 981 case CEPH_OSD_OP_CALL: 982 dst->cls.class_len = src->cls.class_len; 983 dst->cls.method_len = src->cls.method_len; 984 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 985 break; 986 case CEPH_OSD_OP_WATCH: 987 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 988 dst->watch.ver = cpu_to_le64(0); 989 dst->watch.op = src->watch.op; 990 dst->watch.gen = cpu_to_le32(src->watch.gen); 991 break; 992 case CEPH_OSD_OP_NOTIFY_ACK: 993 break; 994 case CEPH_OSD_OP_NOTIFY: 995 dst->notify.cookie = cpu_to_le64(src->notify.cookie); 996 break; 997 case CEPH_OSD_OP_LIST_WATCHERS: 998 break; 999 case CEPH_OSD_OP_SETALLOCHINT: 1000 dst->alloc_hint.expected_object_size = 1001 cpu_to_le64(src->alloc_hint.expected_object_size); 1002 dst->alloc_hint.expected_write_size = 1003 cpu_to_le64(src->alloc_hint.expected_write_size); 1004 break; 1005 case CEPH_OSD_OP_SETXATTR: 1006 case CEPH_OSD_OP_CMPXATTR: 1007 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 1008 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 1009 dst->xattr.cmp_op = src->xattr.cmp_op; 1010 dst->xattr.cmp_mode = src->xattr.cmp_mode; 1011 break; 1012 case CEPH_OSD_OP_CREATE: 1013 case CEPH_OSD_OP_DELETE: 1014 break; 1015 case CEPH_OSD_OP_COPY_FROM: 1016 dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid); 1017 dst->copy_from.src_version = 1018 cpu_to_le64(src->copy_from.src_version); 1019 dst->copy_from.flags = src->copy_from.flags; 1020 dst->copy_from.src_fadvise_flags = 1021 cpu_to_le32(src->copy_from.src_fadvise_flags); 1022 break; 1023 default: 1024 pr_err("unsupported osd opcode %s\n", 1025 ceph_osd_op_name(src->op)); 1026 WARN_ON(1); 1027 1028 return 0; 1029 } 1030 1031 dst->op = cpu_to_le16(src->op); 1032 dst->flags = cpu_to_le32(src->flags); 1033 dst->payload_len = cpu_to_le32(src->indata_len); 1034 1035 return src->indata_len; 1036 } 1037 1038 /* 1039 * build new request AND message, calculate layout, and adjust file 1040 * extent as needed. 1041 * 1042 * if the file was recently truncated, we include information about its 1043 * old and new size so that the object can be updated appropriately. (we 1044 * avoid synchronously deleting truncated objects because it's slow.) 1045 */ 1046 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 1047 struct ceph_file_layout *layout, 1048 struct ceph_vino vino, 1049 u64 off, u64 *plen, 1050 unsigned int which, int num_ops, 1051 int opcode, int flags, 1052 struct ceph_snap_context *snapc, 1053 u32 truncate_seq, 1054 u64 truncate_size, 1055 bool use_mempool) 1056 { 1057 struct ceph_osd_request *req; 1058 u64 objnum = 0; 1059 u64 objoff = 0; 1060 u64 objlen = 0; 1061 int r; 1062 1063 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 1064 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 1065 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 1066 1067 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 1068 GFP_NOFS); 1069 if (!req) { 1070 r = -ENOMEM; 1071 goto fail; 1072 } 1073 1074 /* calculate max write size */ 1075 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 1076 if (r) 1077 goto fail; 1078 1079 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 1080 osd_req_op_init(req, which, opcode, 0); 1081 } else { 1082 u32 object_size = layout->object_size; 1083 u32 object_base = off - objoff; 1084 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 1085 if (truncate_size <= object_base) { 1086 truncate_size = 0; 1087 } else { 1088 truncate_size -= object_base; 1089 if (truncate_size > object_size) 1090 truncate_size = object_size; 1091 } 1092 } 1093 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 1094 truncate_size, truncate_seq); 1095 } 1096 1097 req->r_flags = flags; 1098 req->r_base_oloc.pool = layout->pool_id; 1099 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 1100 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 1101 1102 req->r_snapid = vino.snap; 1103 if (flags & CEPH_OSD_FLAG_WRITE) 1104 req->r_data_offset = off; 1105 1106 if (num_ops > 1) 1107 /* 1108 * This is a special case for ceph_writepages_start(), but it 1109 * also covers ceph_uninline_data(). If more multi-op request 1110 * use cases emerge, we will need a separate helper. 1111 */ 1112 r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_ops, 0); 1113 else 1114 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 1115 if (r) 1116 goto fail; 1117 1118 return req; 1119 1120 fail: 1121 ceph_osdc_put_request(req); 1122 return ERR_PTR(r); 1123 } 1124 EXPORT_SYMBOL(ceph_osdc_new_request); 1125 1126 /* 1127 * We keep osd requests in an rbtree, sorted by ->r_tid. 1128 */ 1129 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 1130 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) 1131 1132 /* 1133 * Call @fn on each OSD request as long as @fn returns 0. 1134 */ 1135 static void for_each_request(struct ceph_osd_client *osdc, 1136 int (*fn)(struct ceph_osd_request *req, void *arg), 1137 void *arg) 1138 { 1139 struct rb_node *n, *p; 1140 1141 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 1142 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 1143 1144 for (p = rb_first(&osd->o_requests); p; ) { 1145 struct ceph_osd_request *req = 1146 rb_entry(p, struct ceph_osd_request, r_node); 1147 1148 p = rb_next(p); 1149 if (fn(req, arg)) 1150 return; 1151 } 1152 } 1153 1154 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { 1155 struct ceph_osd_request *req = 1156 rb_entry(p, struct ceph_osd_request, r_node); 1157 1158 p = rb_next(p); 1159 if (fn(req, arg)) 1160 return; 1161 } 1162 } 1163 1164 static bool osd_homeless(struct ceph_osd *osd) 1165 { 1166 return osd->o_osd == CEPH_HOMELESS_OSD; 1167 } 1168 1169 static bool osd_registered(struct ceph_osd *osd) 1170 { 1171 verify_osdc_locked(osd->o_osdc); 1172 1173 return !RB_EMPTY_NODE(&osd->o_node); 1174 } 1175 1176 /* 1177 * Assumes @osd is zero-initialized. 1178 */ 1179 static void osd_init(struct ceph_osd *osd) 1180 { 1181 refcount_set(&osd->o_ref, 1); 1182 RB_CLEAR_NODE(&osd->o_node); 1183 osd->o_requests = RB_ROOT; 1184 osd->o_linger_requests = RB_ROOT; 1185 osd->o_backoff_mappings = RB_ROOT; 1186 osd->o_backoffs_by_id = RB_ROOT; 1187 INIT_LIST_HEAD(&osd->o_osd_lru); 1188 INIT_LIST_HEAD(&osd->o_keepalive_item); 1189 osd->o_incarnation = 1; 1190 mutex_init(&osd->lock); 1191 } 1192 1193 static void osd_cleanup(struct ceph_osd *osd) 1194 { 1195 WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); 1196 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 1197 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 1198 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings)); 1199 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id)); 1200 WARN_ON(!list_empty(&osd->o_osd_lru)); 1201 WARN_ON(!list_empty(&osd->o_keepalive_item)); 1202 1203 if (osd->o_auth.authorizer) { 1204 WARN_ON(osd_homeless(osd)); 1205 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1206 } 1207 } 1208 1209 /* 1210 * Track open sessions with osds. 1211 */ 1212 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1213 { 1214 struct ceph_osd *osd; 1215 1216 WARN_ON(onum == CEPH_HOMELESS_OSD); 1217 1218 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL); 1219 osd_init(osd); 1220 osd->o_osdc = osdc; 1221 osd->o_osd = onum; 1222 1223 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1224 1225 return osd; 1226 } 1227 1228 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1229 { 1230 if (refcount_inc_not_zero(&osd->o_ref)) { 1231 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1, 1232 refcount_read(&osd->o_ref)); 1233 return osd; 1234 } else { 1235 dout("get_osd %p FAIL\n", osd); 1236 return NULL; 1237 } 1238 } 1239 1240 static void put_osd(struct ceph_osd *osd) 1241 { 1242 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref), 1243 refcount_read(&osd->o_ref) - 1); 1244 if (refcount_dec_and_test(&osd->o_ref)) { 1245 osd_cleanup(osd); 1246 kfree(osd); 1247 } 1248 } 1249 1250 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) 1251 1252 static void __move_osd_to_lru(struct ceph_osd *osd) 1253 { 1254 struct ceph_osd_client *osdc = osd->o_osdc; 1255 1256 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1257 BUG_ON(!list_empty(&osd->o_osd_lru)); 1258 1259 spin_lock(&osdc->osd_lru_lock); 1260 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1261 spin_unlock(&osdc->osd_lru_lock); 1262 1263 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1264 } 1265 1266 static void maybe_move_osd_to_lru(struct ceph_osd *osd) 1267 { 1268 if (RB_EMPTY_ROOT(&osd->o_requests) && 1269 RB_EMPTY_ROOT(&osd->o_linger_requests)) 1270 __move_osd_to_lru(osd); 1271 } 1272 1273 static void __remove_osd_from_lru(struct ceph_osd *osd) 1274 { 1275 struct ceph_osd_client *osdc = osd->o_osdc; 1276 1277 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1278 1279 spin_lock(&osdc->osd_lru_lock); 1280 if (!list_empty(&osd->o_osd_lru)) 1281 list_del_init(&osd->o_osd_lru); 1282 spin_unlock(&osdc->osd_lru_lock); 1283 } 1284 1285 /* 1286 * Close the connection and assign any leftover requests to the 1287 * homeless session. 1288 */ 1289 static void close_osd(struct ceph_osd *osd) 1290 { 1291 struct ceph_osd_client *osdc = osd->o_osdc; 1292 struct rb_node *n; 1293 1294 verify_osdc_wrlocked(osdc); 1295 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1296 1297 ceph_con_close(&osd->o_con); 1298 1299 for (n = rb_first(&osd->o_requests); n; ) { 1300 struct ceph_osd_request *req = 1301 rb_entry(n, struct ceph_osd_request, r_node); 1302 1303 n = rb_next(n); /* unlink_request() */ 1304 1305 dout(" reassigning req %p tid %llu\n", req, req->r_tid); 1306 unlink_request(osd, req); 1307 link_request(&osdc->homeless_osd, req); 1308 } 1309 for (n = rb_first(&osd->o_linger_requests); n; ) { 1310 struct ceph_osd_linger_request *lreq = 1311 rb_entry(n, struct ceph_osd_linger_request, node); 1312 1313 n = rb_next(n); /* unlink_linger() */ 1314 1315 dout(" reassigning lreq %p linger_id %llu\n", lreq, 1316 lreq->linger_id); 1317 unlink_linger(osd, lreq); 1318 link_linger(&osdc->homeless_osd, lreq); 1319 } 1320 clear_backoffs(osd); 1321 1322 __remove_osd_from_lru(osd); 1323 erase_osd(&osdc->osds, osd); 1324 put_osd(osd); 1325 } 1326 1327 /* 1328 * reset osd connect 1329 */ 1330 static int reopen_osd(struct ceph_osd *osd) 1331 { 1332 struct ceph_entity_addr *peer_addr; 1333 1334 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1335 1336 if (RB_EMPTY_ROOT(&osd->o_requests) && 1337 RB_EMPTY_ROOT(&osd->o_linger_requests)) { 1338 close_osd(osd); 1339 return -ENODEV; 1340 } 1341 1342 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd]; 1343 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1344 !ceph_con_opened(&osd->o_con)) { 1345 struct rb_node *n; 1346 1347 dout("osd addr hasn't changed and connection never opened, " 1348 "letting msgr retry\n"); 1349 /* touch each r_stamp for handle_timeout()'s benfit */ 1350 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 1351 struct ceph_osd_request *req = 1352 rb_entry(n, struct ceph_osd_request, r_node); 1353 req->r_stamp = jiffies; 1354 } 1355 1356 return -EAGAIN; 1357 } 1358 1359 ceph_con_close(&osd->o_con); 1360 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1361 osd->o_incarnation++; 1362 1363 return 0; 1364 } 1365 1366 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o, 1367 bool wrlocked) 1368 { 1369 struct ceph_osd *osd; 1370 1371 if (wrlocked) 1372 verify_osdc_wrlocked(osdc); 1373 else 1374 verify_osdc_locked(osdc); 1375 1376 if (o != CEPH_HOMELESS_OSD) 1377 osd = lookup_osd(&osdc->osds, o); 1378 else 1379 osd = &osdc->homeless_osd; 1380 if (!osd) { 1381 if (!wrlocked) 1382 return ERR_PTR(-EAGAIN); 1383 1384 osd = create_osd(osdc, o); 1385 insert_osd(&osdc->osds, osd); 1386 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, 1387 &osdc->osdmap->osd_addr[osd->o_osd]); 1388 } 1389 1390 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd); 1391 return osd; 1392 } 1393 1394 /* 1395 * Create request <-> OSD session relation. 1396 * 1397 * @req has to be assigned a tid, @osd may be homeless. 1398 */ 1399 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1400 { 1401 verify_osd_locked(osd); 1402 WARN_ON(!req->r_tid || req->r_osd); 1403 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1404 req, req->r_tid); 1405 1406 if (!osd_homeless(osd)) 1407 __remove_osd_from_lru(osd); 1408 else 1409 atomic_inc(&osd->o_osdc->num_homeless); 1410 1411 get_osd(osd); 1412 insert_request(&osd->o_requests, req); 1413 req->r_osd = osd; 1414 } 1415 1416 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1417 { 1418 verify_osd_locked(osd); 1419 WARN_ON(req->r_osd != osd); 1420 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1421 req, req->r_tid); 1422 1423 req->r_osd = NULL; 1424 erase_request(&osd->o_requests, req); 1425 put_osd(osd); 1426 1427 if (!osd_homeless(osd)) 1428 maybe_move_osd_to_lru(osd); 1429 else 1430 atomic_dec(&osd->o_osdc->num_homeless); 1431 } 1432 1433 static bool __pool_full(struct ceph_pg_pool_info *pi) 1434 { 1435 return pi->flags & CEPH_POOL_FLAG_FULL; 1436 } 1437 1438 static bool have_pool_full(struct ceph_osd_client *osdc) 1439 { 1440 struct rb_node *n; 1441 1442 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 1443 struct ceph_pg_pool_info *pi = 1444 rb_entry(n, struct ceph_pg_pool_info, node); 1445 1446 if (__pool_full(pi)) 1447 return true; 1448 } 1449 1450 return false; 1451 } 1452 1453 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id) 1454 { 1455 struct ceph_pg_pool_info *pi; 1456 1457 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 1458 if (!pi) 1459 return false; 1460 1461 return __pool_full(pi); 1462 } 1463 1464 /* 1465 * Returns whether a request should be blocked from being sent 1466 * based on the current osdmap and osd_client settings. 1467 */ 1468 static bool target_should_be_paused(struct ceph_osd_client *osdc, 1469 const struct ceph_osd_request_target *t, 1470 struct ceph_pg_pool_info *pi) 1471 { 1472 bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 1473 bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 1474 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1475 __pool_full(pi); 1476 1477 WARN_ON(pi->id != t->target_oloc.pool); 1478 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || 1479 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || 1480 (osdc->osdmap->epoch < osdc->epoch_barrier); 1481 } 1482 1483 enum calc_target_result { 1484 CALC_TARGET_NO_ACTION = 0, 1485 CALC_TARGET_NEED_RESEND, 1486 CALC_TARGET_POOL_DNE, 1487 }; 1488 1489 static enum calc_target_result calc_target(struct ceph_osd_client *osdc, 1490 struct ceph_osd_request_target *t, 1491 struct ceph_connection *con, 1492 bool any_change) 1493 { 1494 struct ceph_pg_pool_info *pi; 1495 struct ceph_pg pgid, last_pgid; 1496 struct ceph_osds up, acting; 1497 bool force_resend = false; 1498 bool unpaused = false; 1499 bool legacy_change = false; 1500 bool split = false; 1501 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE); 1502 bool recovery_deletes = ceph_osdmap_flag(osdc, 1503 CEPH_OSDMAP_RECOVERY_DELETES); 1504 enum calc_target_result ct_res; 1505 1506 t->epoch = osdc->osdmap->epoch; 1507 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1508 if (!pi) { 1509 t->osd = CEPH_HOMELESS_OSD; 1510 ct_res = CALC_TARGET_POOL_DNE; 1511 goto out; 1512 } 1513 1514 if (osdc->osdmap->epoch == pi->last_force_request_resend) { 1515 if (t->last_force_resend < pi->last_force_request_resend) { 1516 t->last_force_resend = pi->last_force_request_resend; 1517 force_resend = true; 1518 } else if (t->last_force_resend == 0) { 1519 force_resend = true; 1520 } 1521 } 1522 1523 /* apply tiering */ 1524 ceph_oid_copy(&t->target_oid, &t->base_oid); 1525 ceph_oloc_copy(&t->target_oloc, &t->base_oloc); 1526 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1527 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0) 1528 t->target_oloc.pool = pi->read_tier; 1529 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0) 1530 t->target_oloc.pool = pi->write_tier; 1531 1532 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool); 1533 if (!pi) { 1534 t->osd = CEPH_HOMELESS_OSD; 1535 ct_res = CALC_TARGET_POOL_DNE; 1536 goto out; 1537 } 1538 } 1539 1540 __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid); 1541 last_pgid.pool = pgid.pool; 1542 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1543 1544 ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting); 1545 if (any_change && 1546 ceph_is_new_interval(&t->acting, 1547 &acting, 1548 &t->up, 1549 &up, 1550 t->size, 1551 pi->size, 1552 t->min_size, 1553 pi->min_size, 1554 t->pg_num, 1555 pi->pg_num, 1556 t->sort_bitwise, 1557 sort_bitwise, 1558 t->recovery_deletes, 1559 recovery_deletes, 1560 &last_pgid)) 1561 force_resend = true; 1562 1563 if (t->paused && !target_should_be_paused(osdc, t, pi)) { 1564 t->paused = false; 1565 unpaused = true; 1566 } 1567 legacy_change = ceph_pg_compare(&t->pgid, &pgid) || 1568 ceph_osds_changed(&t->acting, &acting, any_change); 1569 if (t->pg_num) 1570 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num); 1571 1572 if (legacy_change || force_resend || split) { 1573 t->pgid = pgid; /* struct */ 1574 ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid); 1575 ceph_osds_copy(&t->acting, &acting); 1576 ceph_osds_copy(&t->up, &up); 1577 t->size = pi->size; 1578 t->min_size = pi->min_size; 1579 t->pg_num = pi->pg_num; 1580 t->pg_num_mask = pi->pg_num_mask; 1581 t->sort_bitwise = sort_bitwise; 1582 t->recovery_deletes = recovery_deletes; 1583 1584 t->osd = acting.primary; 1585 } 1586 1587 if (unpaused || legacy_change || force_resend || split) 1588 ct_res = CALC_TARGET_NEED_RESEND; 1589 else 1590 ct_res = CALC_TARGET_NO_ACTION; 1591 1592 out: 1593 dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused, 1594 legacy_change, force_resend, split, ct_res, t->osd); 1595 return ct_res; 1596 } 1597 1598 static struct ceph_spg_mapping *alloc_spg_mapping(void) 1599 { 1600 struct ceph_spg_mapping *spg; 1601 1602 spg = kmalloc(sizeof(*spg), GFP_NOIO); 1603 if (!spg) 1604 return NULL; 1605 1606 RB_CLEAR_NODE(&spg->node); 1607 spg->backoffs = RB_ROOT; 1608 return spg; 1609 } 1610 1611 static void free_spg_mapping(struct ceph_spg_mapping *spg) 1612 { 1613 WARN_ON(!RB_EMPTY_NODE(&spg->node)); 1614 WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs)); 1615 1616 kfree(spg); 1617 } 1618 1619 /* 1620 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to 1621 * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is 1622 * defined only within a specific spgid; it does not pass anything to 1623 * children on split, or to another primary. 1624 */ 1625 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare, 1626 RB_BYPTR, const struct ceph_spg *, node) 1627 1628 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid) 1629 { 1630 return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits; 1631 } 1632 1633 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid, 1634 void **pkey, size_t *pkey_len) 1635 { 1636 if (hoid->key_len) { 1637 *pkey = hoid->key; 1638 *pkey_len = hoid->key_len; 1639 } else { 1640 *pkey = hoid->oid; 1641 *pkey_len = hoid->oid_len; 1642 } 1643 } 1644 1645 static int compare_names(const void *name1, size_t name1_len, 1646 const void *name2, size_t name2_len) 1647 { 1648 int ret; 1649 1650 ret = memcmp(name1, name2, min(name1_len, name2_len)); 1651 if (!ret) { 1652 if (name1_len < name2_len) 1653 ret = -1; 1654 else if (name1_len > name2_len) 1655 ret = 1; 1656 } 1657 return ret; 1658 } 1659 1660 static int hoid_compare(const struct ceph_hobject_id *lhs, 1661 const struct ceph_hobject_id *rhs) 1662 { 1663 void *effective_key1, *effective_key2; 1664 size_t effective_key1_len, effective_key2_len; 1665 int ret; 1666 1667 if (lhs->is_max < rhs->is_max) 1668 return -1; 1669 if (lhs->is_max > rhs->is_max) 1670 return 1; 1671 1672 if (lhs->pool < rhs->pool) 1673 return -1; 1674 if (lhs->pool > rhs->pool) 1675 return 1; 1676 1677 if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs)) 1678 return -1; 1679 if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs)) 1680 return 1; 1681 1682 ret = compare_names(lhs->nspace, lhs->nspace_len, 1683 rhs->nspace, rhs->nspace_len); 1684 if (ret) 1685 return ret; 1686 1687 hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len); 1688 hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len); 1689 ret = compare_names(effective_key1, effective_key1_len, 1690 effective_key2, effective_key2_len); 1691 if (ret) 1692 return ret; 1693 1694 ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len); 1695 if (ret) 1696 return ret; 1697 1698 if (lhs->snapid < rhs->snapid) 1699 return -1; 1700 if (lhs->snapid > rhs->snapid) 1701 return 1; 1702 1703 return 0; 1704 } 1705 1706 /* 1707 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX 1708 * compat stuff here. 1709 * 1710 * Assumes @hoid is zero-initialized. 1711 */ 1712 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid) 1713 { 1714 u8 struct_v; 1715 u32 struct_len; 1716 int ret; 1717 1718 ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v, 1719 &struct_len); 1720 if (ret) 1721 return ret; 1722 1723 if (struct_v < 4) { 1724 pr_err("got struct_v %d < 4 of hobject_t\n", struct_v); 1725 goto e_inval; 1726 } 1727 1728 hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len, 1729 GFP_NOIO); 1730 if (IS_ERR(hoid->key)) { 1731 ret = PTR_ERR(hoid->key); 1732 hoid->key = NULL; 1733 return ret; 1734 } 1735 1736 hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len, 1737 GFP_NOIO); 1738 if (IS_ERR(hoid->oid)) { 1739 ret = PTR_ERR(hoid->oid); 1740 hoid->oid = NULL; 1741 return ret; 1742 } 1743 1744 ceph_decode_64_safe(p, end, hoid->snapid, e_inval); 1745 ceph_decode_32_safe(p, end, hoid->hash, e_inval); 1746 ceph_decode_8_safe(p, end, hoid->is_max, e_inval); 1747 1748 hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len, 1749 GFP_NOIO); 1750 if (IS_ERR(hoid->nspace)) { 1751 ret = PTR_ERR(hoid->nspace); 1752 hoid->nspace = NULL; 1753 return ret; 1754 } 1755 1756 ceph_decode_64_safe(p, end, hoid->pool, e_inval); 1757 1758 ceph_hoid_build_hash_cache(hoid); 1759 return 0; 1760 1761 e_inval: 1762 return -EINVAL; 1763 } 1764 1765 static int hoid_encoding_size(const struct ceph_hobject_id *hoid) 1766 { 1767 return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */ 1768 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len; 1769 } 1770 1771 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid) 1772 { 1773 ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid)); 1774 ceph_encode_string(p, end, hoid->key, hoid->key_len); 1775 ceph_encode_string(p, end, hoid->oid, hoid->oid_len); 1776 ceph_encode_64(p, hoid->snapid); 1777 ceph_encode_32(p, hoid->hash); 1778 ceph_encode_8(p, hoid->is_max); 1779 ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len); 1780 ceph_encode_64(p, hoid->pool); 1781 } 1782 1783 static void free_hoid(struct ceph_hobject_id *hoid) 1784 { 1785 if (hoid) { 1786 kfree(hoid->key); 1787 kfree(hoid->oid); 1788 kfree(hoid->nspace); 1789 kfree(hoid); 1790 } 1791 } 1792 1793 static struct ceph_osd_backoff *alloc_backoff(void) 1794 { 1795 struct ceph_osd_backoff *backoff; 1796 1797 backoff = kzalloc(sizeof(*backoff), GFP_NOIO); 1798 if (!backoff) 1799 return NULL; 1800 1801 RB_CLEAR_NODE(&backoff->spg_node); 1802 RB_CLEAR_NODE(&backoff->id_node); 1803 return backoff; 1804 } 1805 1806 static void free_backoff(struct ceph_osd_backoff *backoff) 1807 { 1808 WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node)); 1809 WARN_ON(!RB_EMPTY_NODE(&backoff->id_node)); 1810 1811 free_hoid(backoff->begin); 1812 free_hoid(backoff->end); 1813 kfree(backoff); 1814 } 1815 1816 /* 1817 * Within a specific spgid, backoffs are managed by ->begin hoid. 1818 */ 1819 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare, 1820 RB_BYVAL, spg_node); 1821 1822 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root, 1823 const struct ceph_hobject_id *hoid) 1824 { 1825 struct rb_node *n = root->rb_node; 1826 1827 while (n) { 1828 struct ceph_osd_backoff *cur = 1829 rb_entry(n, struct ceph_osd_backoff, spg_node); 1830 int cmp; 1831 1832 cmp = hoid_compare(hoid, cur->begin); 1833 if (cmp < 0) { 1834 n = n->rb_left; 1835 } else if (cmp > 0) { 1836 if (hoid_compare(hoid, cur->end) < 0) 1837 return cur; 1838 1839 n = n->rb_right; 1840 } else { 1841 return cur; 1842 } 1843 } 1844 1845 return NULL; 1846 } 1847 1848 /* 1849 * Each backoff has a unique id within its OSD session. 1850 */ 1851 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node) 1852 1853 static void clear_backoffs(struct ceph_osd *osd) 1854 { 1855 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) { 1856 struct ceph_spg_mapping *spg = 1857 rb_entry(rb_first(&osd->o_backoff_mappings), 1858 struct ceph_spg_mapping, node); 1859 1860 while (!RB_EMPTY_ROOT(&spg->backoffs)) { 1861 struct ceph_osd_backoff *backoff = 1862 rb_entry(rb_first(&spg->backoffs), 1863 struct ceph_osd_backoff, spg_node); 1864 1865 erase_backoff(&spg->backoffs, backoff); 1866 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 1867 free_backoff(backoff); 1868 } 1869 erase_spg_mapping(&osd->o_backoff_mappings, spg); 1870 free_spg_mapping(spg); 1871 } 1872 } 1873 1874 /* 1875 * Set up a temporary, non-owning view into @t. 1876 */ 1877 static void hoid_fill_from_target(struct ceph_hobject_id *hoid, 1878 const struct ceph_osd_request_target *t) 1879 { 1880 hoid->key = NULL; 1881 hoid->key_len = 0; 1882 hoid->oid = t->target_oid.name; 1883 hoid->oid_len = t->target_oid.name_len; 1884 hoid->snapid = CEPH_NOSNAP; 1885 hoid->hash = t->pgid.seed; 1886 hoid->is_max = false; 1887 if (t->target_oloc.pool_ns) { 1888 hoid->nspace = t->target_oloc.pool_ns->str; 1889 hoid->nspace_len = t->target_oloc.pool_ns->len; 1890 } else { 1891 hoid->nspace = NULL; 1892 hoid->nspace_len = 0; 1893 } 1894 hoid->pool = t->target_oloc.pool; 1895 ceph_hoid_build_hash_cache(hoid); 1896 } 1897 1898 static bool should_plug_request(struct ceph_osd_request *req) 1899 { 1900 struct ceph_osd *osd = req->r_osd; 1901 struct ceph_spg_mapping *spg; 1902 struct ceph_osd_backoff *backoff; 1903 struct ceph_hobject_id hoid; 1904 1905 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid); 1906 if (!spg) 1907 return false; 1908 1909 hoid_fill_from_target(&hoid, &req->r_t); 1910 backoff = lookup_containing_backoff(&spg->backoffs, &hoid); 1911 if (!backoff) 1912 return false; 1913 1914 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n", 1915 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool, 1916 backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id); 1917 return true; 1918 } 1919 1920 /* 1921 * Keep get_num_data_items() in sync with this function. 1922 */ 1923 static void setup_request_data(struct ceph_osd_request *req) 1924 { 1925 struct ceph_msg *request_msg = req->r_request; 1926 struct ceph_msg *reply_msg = req->r_reply; 1927 struct ceph_osd_req_op *op; 1928 1929 if (req->r_request->num_data_items || req->r_reply->num_data_items) 1930 return; 1931 1932 WARN_ON(request_msg->data_length || reply_msg->data_length); 1933 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) { 1934 switch (op->op) { 1935 /* request */ 1936 case CEPH_OSD_OP_WRITE: 1937 case CEPH_OSD_OP_WRITEFULL: 1938 WARN_ON(op->indata_len != op->extent.length); 1939 ceph_osdc_msg_data_add(request_msg, 1940 &op->extent.osd_data); 1941 break; 1942 case CEPH_OSD_OP_SETXATTR: 1943 case CEPH_OSD_OP_CMPXATTR: 1944 WARN_ON(op->indata_len != op->xattr.name_len + 1945 op->xattr.value_len); 1946 ceph_osdc_msg_data_add(request_msg, 1947 &op->xattr.osd_data); 1948 break; 1949 case CEPH_OSD_OP_NOTIFY_ACK: 1950 ceph_osdc_msg_data_add(request_msg, 1951 &op->notify_ack.request_data); 1952 break; 1953 case CEPH_OSD_OP_COPY_FROM: 1954 ceph_osdc_msg_data_add(request_msg, 1955 &op->copy_from.osd_data); 1956 break; 1957 1958 /* reply */ 1959 case CEPH_OSD_OP_STAT: 1960 ceph_osdc_msg_data_add(reply_msg, 1961 &op->raw_data_in); 1962 break; 1963 case CEPH_OSD_OP_READ: 1964 ceph_osdc_msg_data_add(reply_msg, 1965 &op->extent.osd_data); 1966 break; 1967 case CEPH_OSD_OP_LIST_WATCHERS: 1968 ceph_osdc_msg_data_add(reply_msg, 1969 &op->list_watchers.response_data); 1970 break; 1971 1972 /* both */ 1973 case CEPH_OSD_OP_CALL: 1974 WARN_ON(op->indata_len != op->cls.class_len + 1975 op->cls.method_len + 1976 op->cls.indata_len); 1977 ceph_osdc_msg_data_add(request_msg, 1978 &op->cls.request_info); 1979 /* optional, can be NONE */ 1980 ceph_osdc_msg_data_add(request_msg, 1981 &op->cls.request_data); 1982 /* optional, can be NONE */ 1983 ceph_osdc_msg_data_add(reply_msg, 1984 &op->cls.response_data); 1985 break; 1986 case CEPH_OSD_OP_NOTIFY: 1987 ceph_osdc_msg_data_add(request_msg, 1988 &op->notify.request_data); 1989 ceph_osdc_msg_data_add(reply_msg, 1990 &op->notify.response_data); 1991 break; 1992 } 1993 } 1994 } 1995 1996 static void encode_pgid(void **p, const struct ceph_pg *pgid) 1997 { 1998 ceph_encode_8(p, 1); 1999 ceph_encode_64(p, pgid->pool); 2000 ceph_encode_32(p, pgid->seed); 2001 ceph_encode_32(p, -1); /* preferred */ 2002 } 2003 2004 static void encode_spgid(void **p, const struct ceph_spg *spgid) 2005 { 2006 ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1); 2007 encode_pgid(p, &spgid->pgid); 2008 ceph_encode_8(p, spgid->shard); 2009 } 2010 2011 static void encode_oloc(void **p, void *end, 2012 const struct ceph_object_locator *oloc) 2013 { 2014 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc)); 2015 ceph_encode_64(p, oloc->pool); 2016 ceph_encode_32(p, -1); /* preferred */ 2017 ceph_encode_32(p, 0); /* key len */ 2018 if (oloc->pool_ns) 2019 ceph_encode_string(p, end, oloc->pool_ns->str, 2020 oloc->pool_ns->len); 2021 else 2022 ceph_encode_32(p, 0); 2023 } 2024 2025 static void encode_request_partial(struct ceph_osd_request *req, 2026 struct ceph_msg *msg) 2027 { 2028 void *p = msg->front.iov_base; 2029 void *const end = p + msg->front_alloc_len; 2030 u32 data_len = 0; 2031 int i; 2032 2033 if (req->r_flags & CEPH_OSD_FLAG_WRITE) { 2034 /* snapshots aren't writeable */ 2035 WARN_ON(req->r_snapid != CEPH_NOSNAP); 2036 } else { 2037 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec || 2038 req->r_data_offset || req->r_snapc); 2039 } 2040 2041 setup_request_data(req); 2042 2043 encode_spgid(&p, &req->r_t.spgid); /* actual spg */ 2044 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ 2045 ceph_encode_32(&p, req->r_osdc->osdmap->epoch); 2046 ceph_encode_32(&p, req->r_flags); 2047 2048 /* reqid */ 2049 ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid)); 2050 memset(p, 0, sizeof(struct ceph_osd_reqid)); 2051 p += sizeof(struct ceph_osd_reqid); 2052 2053 /* trace */ 2054 memset(p, 0, sizeof(struct ceph_blkin_trace_info)); 2055 p += sizeof(struct ceph_blkin_trace_info); 2056 2057 ceph_encode_32(&p, 0); /* client_inc, always 0 */ 2058 ceph_encode_timespec64(p, &req->r_mtime); 2059 p += sizeof(struct ceph_timespec); 2060 2061 encode_oloc(&p, end, &req->r_t.target_oloc); 2062 ceph_encode_string(&p, end, req->r_t.target_oid.name, 2063 req->r_t.target_oid.name_len); 2064 2065 /* ops, can imply data */ 2066 ceph_encode_16(&p, req->r_num_ops); 2067 for (i = 0; i < req->r_num_ops; i++) { 2068 data_len += osd_req_encode_op(p, &req->r_ops[i]); 2069 p += sizeof(struct ceph_osd_op); 2070 } 2071 2072 ceph_encode_64(&p, req->r_snapid); /* snapid */ 2073 if (req->r_snapc) { 2074 ceph_encode_64(&p, req->r_snapc->seq); 2075 ceph_encode_32(&p, req->r_snapc->num_snaps); 2076 for (i = 0; i < req->r_snapc->num_snaps; i++) 2077 ceph_encode_64(&p, req->r_snapc->snaps[i]); 2078 } else { 2079 ceph_encode_64(&p, 0); /* snap_seq */ 2080 ceph_encode_32(&p, 0); /* snaps len */ 2081 } 2082 2083 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ 2084 BUG_ON(p > end - 8); /* space for features */ 2085 2086 msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */ 2087 /* front_len is finalized in encode_request_finish() */ 2088 msg->front.iov_len = p - msg->front.iov_base; 2089 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2090 msg->hdr.data_len = cpu_to_le32(data_len); 2091 /* 2092 * The header "data_off" is a hint to the receiver allowing it 2093 * to align received data into its buffers such that there's no 2094 * need to re-copy it before writing it to disk (direct I/O). 2095 */ 2096 msg->hdr.data_off = cpu_to_le16(req->r_data_offset); 2097 2098 dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg, 2099 req->r_t.target_oid.name, req->r_t.target_oid.name_len); 2100 } 2101 2102 static void encode_request_finish(struct ceph_msg *msg) 2103 { 2104 void *p = msg->front.iov_base; 2105 void *const partial_end = p + msg->front.iov_len; 2106 void *const end = p + msg->front_alloc_len; 2107 2108 if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) { 2109 /* luminous OSD -- encode features and be done */ 2110 p = partial_end; 2111 ceph_encode_64(&p, msg->con->peer_features); 2112 } else { 2113 struct { 2114 char spgid[CEPH_ENCODING_START_BLK_LEN + 2115 CEPH_PGID_ENCODING_LEN + 1]; 2116 __le32 hash; 2117 __le32 epoch; 2118 __le32 flags; 2119 char reqid[CEPH_ENCODING_START_BLK_LEN + 2120 sizeof(struct ceph_osd_reqid)]; 2121 char trace[sizeof(struct ceph_blkin_trace_info)]; 2122 __le32 client_inc; 2123 struct ceph_timespec mtime; 2124 } __packed head; 2125 struct ceph_pg pgid; 2126 void *oloc, *oid, *tail; 2127 int oloc_len, oid_len, tail_len; 2128 int len; 2129 2130 /* 2131 * Pre-luminous OSD -- reencode v8 into v4 using @head 2132 * as a temporary buffer. Encode the raw PG; the rest 2133 * is just a matter of moving oloc, oid and tail blobs 2134 * around. 2135 */ 2136 memcpy(&head, p, sizeof(head)); 2137 p += sizeof(head); 2138 2139 oloc = p; 2140 p += CEPH_ENCODING_START_BLK_LEN; 2141 pgid.pool = ceph_decode_64(&p); 2142 p += 4 + 4; /* preferred, key len */ 2143 len = ceph_decode_32(&p); 2144 p += len; /* nspace */ 2145 oloc_len = p - oloc; 2146 2147 oid = p; 2148 len = ceph_decode_32(&p); 2149 p += len; 2150 oid_len = p - oid; 2151 2152 tail = p; 2153 tail_len = partial_end - p; 2154 2155 p = msg->front.iov_base; 2156 ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc)); 2157 ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch)); 2158 ceph_encode_copy(&p, &head.flags, sizeof(head.flags)); 2159 ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime)); 2160 2161 /* reassert_version */ 2162 memset(p, 0, sizeof(struct ceph_eversion)); 2163 p += sizeof(struct ceph_eversion); 2164 2165 BUG_ON(p >= oloc); 2166 memmove(p, oloc, oloc_len); 2167 p += oloc_len; 2168 2169 pgid.seed = le32_to_cpu(head.hash); 2170 encode_pgid(&p, &pgid); /* raw pg */ 2171 2172 BUG_ON(p >= oid); 2173 memmove(p, oid, oid_len); 2174 p += oid_len; 2175 2176 /* tail -- ops, snapid, snapc, retry_attempt */ 2177 BUG_ON(p >= tail); 2178 memmove(p, tail, tail_len); 2179 p += tail_len; 2180 2181 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */ 2182 } 2183 2184 BUG_ON(p > end); 2185 msg->front.iov_len = p - msg->front.iov_base; 2186 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2187 2188 dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg, 2189 le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len), 2190 le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len), 2191 le16_to_cpu(msg->hdr.version)); 2192 } 2193 2194 /* 2195 * @req has to be assigned a tid and registered. 2196 */ 2197 static void send_request(struct ceph_osd_request *req) 2198 { 2199 struct ceph_osd *osd = req->r_osd; 2200 2201 verify_osd_locked(osd); 2202 WARN_ON(osd->o_osd != req->r_t.osd); 2203 2204 /* backoff? */ 2205 if (should_plug_request(req)) 2206 return; 2207 2208 /* 2209 * We may have a previously queued request message hanging 2210 * around. Cancel it to avoid corrupting the msgr. 2211 */ 2212 if (req->r_sent) 2213 ceph_msg_revoke(req->r_request); 2214 2215 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; 2216 if (req->r_attempts) 2217 req->r_flags |= CEPH_OSD_FLAG_RETRY; 2218 else 2219 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); 2220 2221 encode_request_partial(req, req->r_request); 2222 2223 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n", 2224 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, 2225 req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed, 2226 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags, 2227 req->r_attempts); 2228 2229 req->r_t.paused = false; 2230 req->r_stamp = jiffies; 2231 req->r_attempts++; 2232 2233 req->r_sent = osd->o_incarnation; 2234 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 2235 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); 2236 } 2237 2238 static void maybe_request_map(struct ceph_osd_client *osdc) 2239 { 2240 bool continuous = false; 2241 2242 verify_osdc_locked(osdc); 2243 WARN_ON(!osdc->osdmap->epoch); 2244 2245 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2246 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) || 2247 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2248 dout("%s osdc %p continuous\n", __func__, osdc); 2249 continuous = true; 2250 } else { 2251 dout("%s osdc %p onetime\n", __func__, osdc); 2252 } 2253 2254 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 2255 osdc->osdmap->epoch + 1, continuous)) 2256 ceph_monc_renew_subs(&osdc->client->monc); 2257 } 2258 2259 static void complete_request(struct ceph_osd_request *req, int err); 2260 static void send_map_check(struct ceph_osd_request *req); 2261 2262 static void __submit_request(struct ceph_osd_request *req, bool wrlocked) 2263 { 2264 struct ceph_osd_client *osdc = req->r_osdc; 2265 struct ceph_osd *osd; 2266 enum calc_target_result ct_res; 2267 int err = 0; 2268 bool need_send = false; 2269 bool promoted = false; 2270 2271 WARN_ON(req->r_tid); 2272 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 2273 2274 again: 2275 ct_res = calc_target(osdc, &req->r_t, NULL, false); 2276 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked) 2277 goto promote; 2278 2279 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked); 2280 if (IS_ERR(osd)) { 2281 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked); 2282 goto promote; 2283 } 2284 2285 if (osdc->abort_err) { 2286 dout("req %p abort_err %d\n", req, osdc->abort_err); 2287 err = osdc->abort_err; 2288 } else if (osdc->osdmap->epoch < osdc->epoch_barrier) { 2289 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, 2290 osdc->epoch_barrier); 2291 req->r_t.paused = true; 2292 maybe_request_map(osdc); 2293 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2294 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2295 dout("req %p pausewr\n", req); 2296 req->r_t.paused = true; 2297 maybe_request_map(osdc); 2298 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) && 2299 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2300 dout("req %p pauserd\n", req); 2301 req->r_t.paused = true; 2302 maybe_request_map(osdc); 2303 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2304 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY | 2305 CEPH_OSD_FLAG_FULL_FORCE)) && 2306 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2307 pool_full(osdc, req->r_t.base_oloc.pool))) { 2308 dout("req %p full/pool_full\n", req); 2309 if (ceph_test_opt(osdc->client, ABORT_ON_FULL)) { 2310 err = -ENOSPC; 2311 } else { 2312 pr_warn_ratelimited("FULL or reached pool quota\n"); 2313 req->r_t.paused = true; 2314 maybe_request_map(osdc); 2315 } 2316 } else if (!osd_homeless(osd)) { 2317 need_send = true; 2318 } else { 2319 maybe_request_map(osdc); 2320 } 2321 2322 mutex_lock(&osd->lock); 2323 /* 2324 * Assign the tid atomically with send_request() to protect 2325 * multiple writes to the same object from racing with each 2326 * other, resulting in out of order ops on the OSDs. 2327 */ 2328 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2329 link_request(osd, req); 2330 if (need_send) 2331 send_request(req); 2332 else if (err) 2333 complete_request(req, err); 2334 mutex_unlock(&osd->lock); 2335 2336 if (!err && ct_res == CALC_TARGET_POOL_DNE) 2337 send_map_check(req); 2338 2339 if (promoted) 2340 downgrade_write(&osdc->lock); 2341 return; 2342 2343 promote: 2344 up_read(&osdc->lock); 2345 down_write(&osdc->lock); 2346 wrlocked = true; 2347 promoted = true; 2348 goto again; 2349 } 2350 2351 static void account_request(struct ceph_osd_request *req) 2352 { 2353 WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK)); 2354 WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); 2355 2356 req->r_flags |= CEPH_OSD_FLAG_ONDISK; 2357 atomic_inc(&req->r_osdc->num_requests); 2358 2359 req->r_start_stamp = jiffies; 2360 } 2361 2362 static void submit_request(struct ceph_osd_request *req, bool wrlocked) 2363 { 2364 ceph_osdc_get_request(req); 2365 account_request(req); 2366 __submit_request(req, wrlocked); 2367 } 2368 2369 static void finish_request(struct ceph_osd_request *req) 2370 { 2371 struct ceph_osd_client *osdc = req->r_osdc; 2372 2373 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); 2374 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2375 2376 if (req->r_osd) 2377 unlink_request(req->r_osd, req); 2378 atomic_dec(&osdc->num_requests); 2379 2380 /* 2381 * If an OSD has failed or returned and a request has been sent 2382 * twice, it's possible to get a reply and end up here while the 2383 * request message is queued for delivery. We will ignore the 2384 * reply, so not a big deal, but better to try and catch it. 2385 */ 2386 ceph_msg_revoke(req->r_request); 2387 ceph_msg_revoke_incoming(req->r_reply); 2388 } 2389 2390 static void __complete_request(struct ceph_osd_request *req) 2391 { 2392 dout("%s req %p tid %llu cb %ps result %d\n", __func__, req, 2393 req->r_tid, req->r_callback, req->r_result); 2394 2395 if (req->r_callback) 2396 req->r_callback(req); 2397 complete_all(&req->r_completion); 2398 ceph_osdc_put_request(req); 2399 } 2400 2401 static void complete_request_workfn(struct work_struct *work) 2402 { 2403 struct ceph_osd_request *req = 2404 container_of(work, struct ceph_osd_request, r_complete_work); 2405 2406 __complete_request(req); 2407 } 2408 2409 /* 2410 * This is open-coded in handle_reply(). 2411 */ 2412 static void complete_request(struct ceph_osd_request *req, int err) 2413 { 2414 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2415 2416 req->r_result = err; 2417 finish_request(req); 2418 2419 INIT_WORK(&req->r_complete_work, complete_request_workfn); 2420 queue_work(req->r_osdc->completion_wq, &req->r_complete_work); 2421 } 2422 2423 static void cancel_map_check(struct ceph_osd_request *req) 2424 { 2425 struct ceph_osd_client *osdc = req->r_osdc; 2426 struct ceph_osd_request *lookup_req; 2427 2428 verify_osdc_wrlocked(osdc); 2429 2430 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2431 if (!lookup_req) 2432 return; 2433 2434 WARN_ON(lookup_req != req); 2435 erase_request_mc(&osdc->map_checks, req); 2436 ceph_osdc_put_request(req); 2437 } 2438 2439 static void cancel_request(struct ceph_osd_request *req) 2440 { 2441 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2442 2443 cancel_map_check(req); 2444 finish_request(req); 2445 complete_all(&req->r_completion); 2446 ceph_osdc_put_request(req); 2447 } 2448 2449 static void abort_request(struct ceph_osd_request *req, int err) 2450 { 2451 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2452 2453 cancel_map_check(req); 2454 complete_request(req, err); 2455 } 2456 2457 static int abort_fn(struct ceph_osd_request *req, void *arg) 2458 { 2459 int err = *(int *)arg; 2460 2461 abort_request(req, err); 2462 return 0; /* continue iteration */ 2463 } 2464 2465 /* 2466 * Abort all in-flight requests with @err and arrange for all future 2467 * requests to be failed immediately. 2468 */ 2469 void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err) 2470 { 2471 dout("%s osdc %p err %d\n", __func__, osdc, err); 2472 down_write(&osdc->lock); 2473 for_each_request(osdc, abort_fn, &err); 2474 osdc->abort_err = err; 2475 up_write(&osdc->lock); 2476 } 2477 EXPORT_SYMBOL(ceph_osdc_abort_requests); 2478 2479 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2480 { 2481 if (likely(eb > osdc->epoch_barrier)) { 2482 dout("updating epoch_barrier from %u to %u\n", 2483 osdc->epoch_barrier, eb); 2484 osdc->epoch_barrier = eb; 2485 /* Request map if we're not to the barrier yet */ 2486 if (eb > osdc->osdmap->epoch) 2487 maybe_request_map(osdc); 2488 } 2489 } 2490 2491 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2492 { 2493 down_read(&osdc->lock); 2494 if (unlikely(eb > osdc->epoch_barrier)) { 2495 up_read(&osdc->lock); 2496 down_write(&osdc->lock); 2497 update_epoch_barrier(osdc, eb); 2498 up_write(&osdc->lock); 2499 } else { 2500 up_read(&osdc->lock); 2501 } 2502 } 2503 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); 2504 2505 /* 2506 * We can end up releasing caps as a result of abort_request(). 2507 * In that case, we probably want to ensure that the cap release message 2508 * has an updated epoch barrier in it, so set the epoch barrier prior to 2509 * aborting the first request. 2510 */ 2511 static int abort_on_full_fn(struct ceph_osd_request *req, void *arg) 2512 { 2513 struct ceph_osd_client *osdc = req->r_osdc; 2514 bool *victims = arg; 2515 2516 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2517 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2518 pool_full(osdc, req->r_t.base_oloc.pool))) { 2519 if (!*victims) { 2520 update_epoch_barrier(osdc, osdc->osdmap->epoch); 2521 *victims = true; 2522 } 2523 abort_request(req, -ENOSPC); 2524 } 2525 2526 return 0; /* continue iteration */ 2527 } 2528 2529 /* 2530 * Drop all pending requests that are stalled waiting on a full condition to 2531 * clear, and complete them with ENOSPC as the return code. Set the 2532 * osdc->epoch_barrier to the latest map epoch that we've seen if any were 2533 * cancelled. 2534 */ 2535 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) 2536 { 2537 bool victims = false; 2538 2539 if (ceph_test_opt(osdc->client, ABORT_ON_FULL) && 2540 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc))) 2541 for_each_request(osdc, abort_on_full_fn, &victims); 2542 } 2543 2544 static void check_pool_dne(struct ceph_osd_request *req) 2545 { 2546 struct ceph_osd_client *osdc = req->r_osdc; 2547 struct ceph_osdmap *map = osdc->osdmap; 2548 2549 verify_osdc_wrlocked(osdc); 2550 WARN_ON(!map->epoch); 2551 2552 if (req->r_attempts) { 2553 /* 2554 * We sent a request earlier, which means that 2555 * previously the pool existed, and now it does not 2556 * (i.e., it was deleted). 2557 */ 2558 req->r_map_dne_bound = map->epoch; 2559 dout("%s req %p tid %llu pool disappeared\n", __func__, req, 2560 req->r_tid); 2561 } else { 2562 dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__, 2563 req, req->r_tid, req->r_map_dne_bound, map->epoch); 2564 } 2565 2566 if (req->r_map_dne_bound) { 2567 if (map->epoch >= req->r_map_dne_bound) { 2568 /* we had a new enough map */ 2569 pr_info_ratelimited("tid %llu pool does not exist\n", 2570 req->r_tid); 2571 complete_request(req, -ENOENT); 2572 } 2573 } else { 2574 send_map_check(req); 2575 } 2576 } 2577 2578 static void map_check_cb(struct ceph_mon_generic_request *greq) 2579 { 2580 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 2581 struct ceph_osd_request *req; 2582 u64 tid = greq->private_data; 2583 2584 WARN_ON(greq->result || !greq->u.newest); 2585 2586 down_write(&osdc->lock); 2587 req = lookup_request_mc(&osdc->map_checks, tid); 2588 if (!req) { 2589 dout("%s tid %llu dne\n", __func__, tid); 2590 goto out_unlock; 2591 } 2592 2593 dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__, 2594 req, req->r_tid, req->r_map_dne_bound, greq->u.newest); 2595 if (!req->r_map_dne_bound) 2596 req->r_map_dne_bound = greq->u.newest; 2597 erase_request_mc(&osdc->map_checks, req); 2598 check_pool_dne(req); 2599 2600 ceph_osdc_put_request(req); 2601 out_unlock: 2602 up_write(&osdc->lock); 2603 } 2604 2605 static void send_map_check(struct ceph_osd_request *req) 2606 { 2607 struct ceph_osd_client *osdc = req->r_osdc; 2608 struct ceph_osd_request *lookup_req; 2609 int ret; 2610 2611 verify_osdc_wrlocked(osdc); 2612 2613 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2614 if (lookup_req) { 2615 WARN_ON(lookup_req != req); 2616 return; 2617 } 2618 2619 ceph_osdc_get_request(req); 2620 insert_request_mc(&osdc->map_checks, req); 2621 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 2622 map_check_cb, req->r_tid); 2623 WARN_ON(ret); 2624 } 2625 2626 /* 2627 * lingering requests, watch/notify v2 infrastructure 2628 */ 2629 static void linger_release(struct kref *kref) 2630 { 2631 struct ceph_osd_linger_request *lreq = 2632 container_of(kref, struct ceph_osd_linger_request, kref); 2633 2634 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, 2635 lreq->reg_req, lreq->ping_req); 2636 WARN_ON(!RB_EMPTY_NODE(&lreq->node)); 2637 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); 2638 WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node)); 2639 WARN_ON(!list_empty(&lreq->scan_item)); 2640 WARN_ON(!list_empty(&lreq->pending_lworks)); 2641 WARN_ON(lreq->osd); 2642 2643 if (lreq->reg_req) 2644 ceph_osdc_put_request(lreq->reg_req); 2645 if (lreq->ping_req) 2646 ceph_osdc_put_request(lreq->ping_req); 2647 target_destroy(&lreq->t); 2648 kfree(lreq); 2649 } 2650 2651 static void linger_put(struct ceph_osd_linger_request *lreq) 2652 { 2653 if (lreq) 2654 kref_put(&lreq->kref, linger_release); 2655 } 2656 2657 static struct ceph_osd_linger_request * 2658 linger_get(struct ceph_osd_linger_request *lreq) 2659 { 2660 kref_get(&lreq->kref); 2661 return lreq; 2662 } 2663 2664 static struct ceph_osd_linger_request * 2665 linger_alloc(struct ceph_osd_client *osdc) 2666 { 2667 struct ceph_osd_linger_request *lreq; 2668 2669 lreq = kzalloc(sizeof(*lreq), GFP_NOIO); 2670 if (!lreq) 2671 return NULL; 2672 2673 kref_init(&lreq->kref); 2674 mutex_init(&lreq->lock); 2675 RB_CLEAR_NODE(&lreq->node); 2676 RB_CLEAR_NODE(&lreq->osdc_node); 2677 RB_CLEAR_NODE(&lreq->mc_node); 2678 INIT_LIST_HEAD(&lreq->scan_item); 2679 INIT_LIST_HEAD(&lreq->pending_lworks); 2680 init_completion(&lreq->reg_commit_wait); 2681 init_completion(&lreq->notify_finish_wait); 2682 2683 lreq->osdc = osdc; 2684 target_init(&lreq->t); 2685 2686 dout("%s lreq %p\n", __func__, lreq); 2687 return lreq; 2688 } 2689 2690 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) 2691 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) 2692 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node) 2693 2694 /* 2695 * Create linger request <-> OSD session relation. 2696 * 2697 * @lreq has to be registered, @osd may be homeless. 2698 */ 2699 static void link_linger(struct ceph_osd *osd, 2700 struct ceph_osd_linger_request *lreq) 2701 { 2702 verify_osd_locked(osd); 2703 WARN_ON(!lreq->linger_id || lreq->osd); 2704 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2705 osd->o_osd, lreq, lreq->linger_id); 2706 2707 if (!osd_homeless(osd)) 2708 __remove_osd_from_lru(osd); 2709 else 2710 atomic_inc(&osd->o_osdc->num_homeless); 2711 2712 get_osd(osd); 2713 insert_linger(&osd->o_linger_requests, lreq); 2714 lreq->osd = osd; 2715 } 2716 2717 static void unlink_linger(struct ceph_osd *osd, 2718 struct ceph_osd_linger_request *lreq) 2719 { 2720 verify_osd_locked(osd); 2721 WARN_ON(lreq->osd != osd); 2722 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2723 osd->o_osd, lreq, lreq->linger_id); 2724 2725 lreq->osd = NULL; 2726 erase_linger(&osd->o_linger_requests, lreq); 2727 put_osd(osd); 2728 2729 if (!osd_homeless(osd)) 2730 maybe_move_osd_to_lru(osd); 2731 else 2732 atomic_dec(&osd->o_osdc->num_homeless); 2733 } 2734 2735 static bool __linger_registered(struct ceph_osd_linger_request *lreq) 2736 { 2737 verify_osdc_locked(lreq->osdc); 2738 2739 return !RB_EMPTY_NODE(&lreq->osdc_node); 2740 } 2741 2742 static bool linger_registered(struct ceph_osd_linger_request *lreq) 2743 { 2744 struct ceph_osd_client *osdc = lreq->osdc; 2745 bool registered; 2746 2747 down_read(&osdc->lock); 2748 registered = __linger_registered(lreq); 2749 up_read(&osdc->lock); 2750 2751 return registered; 2752 } 2753 2754 static void linger_register(struct ceph_osd_linger_request *lreq) 2755 { 2756 struct ceph_osd_client *osdc = lreq->osdc; 2757 2758 verify_osdc_wrlocked(osdc); 2759 WARN_ON(lreq->linger_id); 2760 2761 linger_get(lreq); 2762 lreq->linger_id = ++osdc->last_linger_id; 2763 insert_linger_osdc(&osdc->linger_requests, lreq); 2764 } 2765 2766 static void linger_unregister(struct ceph_osd_linger_request *lreq) 2767 { 2768 struct ceph_osd_client *osdc = lreq->osdc; 2769 2770 verify_osdc_wrlocked(osdc); 2771 2772 erase_linger_osdc(&osdc->linger_requests, lreq); 2773 linger_put(lreq); 2774 } 2775 2776 static void cancel_linger_request(struct ceph_osd_request *req) 2777 { 2778 struct ceph_osd_linger_request *lreq = req->r_priv; 2779 2780 WARN_ON(!req->r_linger); 2781 cancel_request(req); 2782 linger_put(lreq); 2783 } 2784 2785 struct linger_work { 2786 struct work_struct work; 2787 struct ceph_osd_linger_request *lreq; 2788 struct list_head pending_item; 2789 unsigned long queued_stamp; 2790 2791 union { 2792 struct { 2793 u64 notify_id; 2794 u64 notifier_id; 2795 void *payload; /* points into @msg front */ 2796 size_t payload_len; 2797 2798 struct ceph_msg *msg; /* for ceph_msg_put() */ 2799 } notify; 2800 struct { 2801 int err; 2802 } error; 2803 }; 2804 }; 2805 2806 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, 2807 work_func_t workfn) 2808 { 2809 struct linger_work *lwork; 2810 2811 lwork = kzalloc(sizeof(*lwork), GFP_NOIO); 2812 if (!lwork) 2813 return NULL; 2814 2815 INIT_WORK(&lwork->work, workfn); 2816 INIT_LIST_HEAD(&lwork->pending_item); 2817 lwork->lreq = linger_get(lreq); 2818 2819 return lwork; 2820 } 2821 2822 static void lwork_free(struct linger_work *lwork) 2823 { 2824 struct ceph_osd_linger_request *lreq = lwork->lreq; 2825 2826 mutex_lock(&lreq->lock); 2827 list_del(&lwork->pending_item); 2828 mutex_unlock(&lreq->lock); 2829 2830 linger_put(lreq); 2831 kfree(lwork); 2832 } 2833 2834 static void lwork_queue(struct linger_work *lwork) 2835 { 2836 struct ceph_osd_linger_request *lreq = lwork->lreq; 2837 struct ceph_osd_client *osdc = lreq->osdc; 2838 2839 verify_lreq_locked(lreq); 2840 WARN_ON(!list_empty(&lwork->pending_item)); 2841 2842 lwork->queued_stamp = jiffies; 2843 list_add_tail(&lwork->pending_item, &lreq->pending_lworks); 2844 queue_work(osdc->notify_wq, &lwork->work); 2845 } 2846 2847 static void do_watch_notify(struct work_struct *w) 2848 { 2849 struct linger_work *lwork = container_of(w, struct linger_work, work); 2850 struct ceph_osd_linger_request *lreq = lwork->lreq; 2851 2852 if (!linger_registered(lreq)) { 2853 dout("%s lreq %p not registered\n", __func__, lreq); 2854 goto out; 2855 } 2856 2857 WARN_ON(!lreq->is_watch); 2858 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", 2859 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, 2860 lwork->notify.payload_len); 2861 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, 2862 lwork->notify.notifier_id, lwork->notify.payload, 2863 lwork->notify.payload_len); 2864 2865 out: 2866 ceph_msg_put(lwork->notify.msg); 2867 lwork_free(lwork); 2868 } 2869 2870 static void do_watch_error(struct work_struct *w) 2871 { 2872 struct linger_work *lwork = container_of(w, struct linger_work, work); 2873 struct ceph_osd_linger_request *lreq = lwork->lreq; 2874 2875 if (!linger_registered(lreq)) { 2876 dout("%s lreq %p not registered\n", __func__, lreq); 2877 goto out; 2878 } 2879 2880 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); 2881 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); 2882 2883 out: 2884 lwork_free(lwork); 2885 } 2886 2887 static void queue_watch_error(struct ceph_osd_linger_request *lreq) 2888 { 2889 struct linger_work *lwork; 2890 2891 lwork = lwork_alloc(lreq, do_watch_error); 2892 if (!lwork) { 2893 pr_err("failed to allocate error-lwork\n"); 2894 return; 2895 } 2896 2897 lwork->error.err = lreq->last_error; 2898 lwork_queue(lwork); 2899 } 2900 2901 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, 2902 int result) 2903 { 2904 if (!completion_done(&lreq->reg_commit_wait)) { 2905 lreq->reg_commit_error = (result <= 0 ? result : 0); 2906 complete_all(&lreq->reg_commit_wait); 2907 } 2908 } 2909 2910 static void linger_commit_cb(struct ceph_osd_request *req) 2911 { 2912 struct ceph_osd_linger_request *lreq = req->r_priv; 2913 2914 mutex_lock(&lreq->lock); 2915 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, 2916 lreq->linger_id, req->r_result); 2917 linger_reg_commit_complete(lreq, req->r_result); 2918 lreq->committed = true; 2919 2920 if (!lreq->is_watch) { 2921 struct ceph_osd_data *osd_data = 2922 osd_req_op_data(req, 0, notify, response_data); 2923 void *p = page_address(osd_data->pages[0]); 2924 2925 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY || 2926 osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 2927 2928 /* make note of the notify_id */ 2929 if (req->r_ops[0].outdata_len >= sizeof(u64)) { 2930 lreq->notify_id = ceph_decode_64(&p); 2931 dout("lreq %p notify_id %llu\n", lreq, 2932 lreq->notify_id); 2933 } else { 2934 dout("lreq %p no notify_id\n", lreq); 2935 } 2936 } 2937 2938 mutex_unlock(&lreq->lock); 2939 linger_put(lreq); 2940 } 2941 2942 static int normalize_watch_error(int err) 2943 { 2944 /* 2945 * Translate ENOENT -> ENOTCONN so that a delete->disconnection 2946 * notification and a failure to reconnect because we raced with 2947 * the delete appear the same to the user. 2948 */ 2949 if (err == -ENOENT) 2950 err = -ENOTCONN; 2951 2952 return err; 2953 } 2954 2955 static void linger_reconnect_cb(struct ceph_osd_request *req) 2956 { 2957 struct ceph_osd_linger_request *lreq = req->r_priv; 2958 2959 mutex_lock(&lreq->lock); 2960 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, 2961 lreq, lreq->linger_id, req->r_result, lreq->last_error); 2962 if (req->r_result < 0) { 2963 if (!lreq->last_error) { 2964 lreq->last_error = normalize_watch_error(req->r_result); 2965 queue_watch_error(lreq); 2966 } 2967 } 2968 2969 mutex_unlock(&lreq->lock); 2970 linger_put(lreq); 2971 } 2972 2973 static void send_linger(struct ceph_osd_linger_request *lreq) 2974 { 2975 struct ceph_osd_request *req = lreq->reg_req; 2976 struct ceph_osd_req_op *op = &req->r_ops[0]; 2977 2978 verify_osdc_wrlocked(req->r_osdc); 2979 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 2980 2981 if (req->r_osd) 2982 cancel_linger_request(req); 2983 2984 request_reinit(req); 2985 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 2986 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 2987 req->r_flags = lreq->t.flags; 2988 req->r_mtime = lreq->mtime; 2989 2990 mutex_lock(&lreq->lock); 2991 if (lreq->is_watch && lreq->committed) { 2992 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2993 op->watch.cookie != lreq->linger_id); 2994 op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT; 2995 op->watch.gen = ++lreq->register_gen; 2996 dout("lreq %p reconnect register_gen %u\n", lreq, 2997 op->watch.gen); 2998 req->r_callback = linger_reconnect_cb; 2999 } else { 3000 if (!lreq->is_watch) 3001 lreq->notify_id = 0; 3002 else 3003 WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH); 3004 dout("lreq %p register\n", lreq); 3005 req->r_callback = linger_commit_cb; 3006 } 3007 mutex_unlock(&lreq->lock); 3008 3009 req->r_priv = linger_get(lreq); 3010 req->r_linger = true; 3011 3012 submit_request(req, true); 3013 } 3014 3015 static void linger_ping_cb(struct ceph_osd_request *req) 3016 { 3017 struct ceph_osd_linger_request *lreq = req->r_priv; 3018 3019 mutex_lock(&lreq->lock); 3020 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", 3021 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, 3022 lreq->last_error); 3023 if (lreq->register_gen == req->r_ops[0].watch.gen) { 3024 if (!req->r_result) { 3025 lreq->watch_valid_thru = lreq->ping_sent; 3026 } else if (!lreq->last_error) { 3027 lreq->last_error = normalize_watch_error(req->r_result); 3028 queue_watch_error(lreq); 3029 } 3030 } else { 3031 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, 3032 lreq->register_gen, req->r_ops[0].watch.gen); 3033 } 3034 3035 mutex_unlock(&lreq->lock); 3036 linger_put(lreq); 3037 } 3038 3039 static void send_linger_ping(struct ceph_osd_linger_request *lreq) 3040 { 3041 struct ceph_osd_client *osdc = lreq->osdc; 3042 struct ceph_osd_request *req = lreq->ping_req; 3043 struct ceph_osd_req_op *op = &req->r_ops[0]; 3044 3045 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 3046 dout("%s PAUSERD\n", __func__); 3047 return; 3048 } 3049 3050 lreq->ping_sent = jiffies; 3051 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", 3052 __func__, lreq, lreq->linger_id, lreq->ping_sent, 3053 lreq->register_gen); 3054 3055 if (req->r_osd) 3056 cancel_linger_request(req); 3057 3058 request_reinit(req); 3059 target_copy(&req->r_t, &lreq->t); 3060 3061 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 3062 op->watch.cookie != lreq->linger_id || 3063 op->watch.op != CEPH_OSD_WATCH_OP_PING); 3064 op->watch.gen = lreq->register_gen; 3065 req->r_callback = linger_ping_cb; 3066 req->r_priv = linger_get(lreq); 3067 req->r_linger = true; 3068 3069 ceph_osdc_get_request(req); 3070 account_request(req); 3071 req->r_tid = atomic64_inc_return(&osdc->last_tid); 3072 link_request(lreq->osd, req); 3073 send_request(req); 3074 } 3075 3076 static void linger_submit(struct ceph_osd_linger_request *lreq) 3077 { 3078 struct ceph_osd_client *osdc = lreq->osdc; 3079 struct ceph_osd *osd; 3080 3081 down_write(&osdc->lock); 3082 linger_register(lreq); 3083 if (lreq->is_watch) { 3084 lreq->reg_req->r_ops[0].watch.cookie = lreq->linger_id; 3085 lreq->ping_req->r_ops[0].watch.cookie = lreq->linger_id; 3086 } else { 3087 lreq->reg_req->r_ops[0].notify.cookie = lreq->linger_id; 3088 } 3089 3090 calc_target(osdc, &lreq->t, NULL, false); 3091 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3092 link_linger(osd, lreq); 3093 3094 send_linger(lreq); 3095 up_write(&osdc->lock); 3096 } 3097 3098 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) 3099 { 3100 struct ceph_osd_client *osdc = lreq->osdc; 3101 struct ceph_osd_linger_request *lookup_lreq; 3102 3103 verify_osdc_wrlocked(osdc); 3104 3105 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 3106 lreq->linger_id); 3107 if (!lookup_lreq) 3108 return; 3109 3110 WARN_ON(lookup_lreq != lreq); 3111 erase_linger_mc(&osdc->linger_map_checks, lreq); 3112 linger_put(lreq); 3113 } 3114 3115 /* 3116 * @lreq has to be both registered and linked. 3117 */ 3118 static void __linger_cancel(struct ceph_osd_linger_request *lreq) 3119 { 3120 if (lreq->is_watch && lreq->ping_req->r_osd) 3121 cancel_linger_request(lreq->ping_req); 3122 if (lreq->reg_req->r_osd) 3123 cancel_linger_request(lreq->reg_req); 3124 cancel_linger_map_check(lreq); 3125 unlink_linger(lreq->osd, lreq); 3126 linger_unregister(lreq); 3127 } 3128 3129 static void linger_cancel(struct ceph_osd_linger_request *lreq) 3130 { 3131 struct ceph_osd_client *osdc = lreq->osdc; 3132 3133 down_write(&osdc->lock); 3134 if (__linger_registered(lreq)) 3135 __linger_cancel(lreq); 3136 up_write(&osdc->lock); 3137 } 3138 3139 static void send_linger_map_check(struct ceph_osd_linger_request *lreq); 3140 3141 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq) 3142 { 3143 struct ceph_osd_client *osdc = lreq->osdc; 3144 struct ceph_osdmap *map = osdc->osdmap; 3145 3146 verify_osdc_wrlocked(osdc); 3147 WARN_ON(!map->epoch); 3148 3149 if (lreq->register_gen) { 3150 lreq->map_dne_bound = map->epoch; 3151 dout("%s lreq %p linger_id %llu pool disappeared\n", __func__, 3152 lreq, lreq->linger_id); 3153 } else { 3154 dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n", 3155 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3156 map->epoch); 3157 } 3158 3159 if (lreq->map_dne_bound) { 3160 if (map->epoch >= lreq->map_dne_bound) { 3161 /* we had a new enough map */ 3162 pr_info("linger_id %llu pool does not exist\n", 3163 lreq->linger_id); 3164 linger_reg_commit_complete(lreq, -ENOENT); 3165 __linger_cancel(lreq); 3166 } 3167 } else { 3168 send_linger_map_check(lreq); 3169 } 3170 } 3171 3172 static void linger_map_check_cb(struct ceph_mon_generic_request *greq) 3173 { 3174 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 3175 struct ceph_osd_linger_request *lreq; 3176 u64 linger_id = greq->private_data; 3177 3178 WARN_ON(greq->result || !greq->u.newest); 3179 3180 down_write(&osdc->lock); 3181 lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id); 3182 if (!lreq) { 3183 dout("%s linger_id %llu dne\n", __func__, linger_id); 3184 goto out_unlock; 3185 } 3186 3187 dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n", 3188 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3189 greq->u.newest); 3190 if (!lreq->map_dne_bound) 3191 lreq->map_dne_bound = greq->u.newest; 3192 erase_linger_mc(&osdc->linger_map_checks, lreq); 3193 check_linger_pool_dne(lreq); 3194 3195 linger_put(lreq); 3196 out_unlock: 3197 up_write(&osdc->lock); 3198 } 3199 3200 static void send_linger_map_check(struct ceph_osd_linger_request *lreq) 3201 { 3202 struct ceph_osd_client *osdc = lreq->osdc; 3203 struct ceph_osd_linger_request *lookup_lreq; 3204 int ret; 3205 3206 verify_osdc_wrlocked(osdc); 3207 3208 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 3209 lreq->linger_id); 3210 if (lookup_lreq) { 3211 WARN_ON(lookup_lreq != lreq); 3212 return; 3213 } 3214 3215 linger_get(lreq); 3216 insert_linger_mc(&osdc->linger_map_checks, lreq); 3217 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 3218 linger_map_check_cb, lreq->linger_id); 3219 WARN_ON(ret); 3220 } 3221 3222 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) 3223 { 3224 int ret; 3225 3226 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3227 ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); 3228 return ret ?: lreq->reg_commit_error; 3229 } 3230 3231 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq) 3232 { 3233 int ret; 3234 3235 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3236 ret = wait_for_completion_interruptible(&lreq->notify_finish_wait); 3237 return ret ?: lreq->notify_finish_error; 3238 } 3239 3240 /* 3241 * Timeout callback, called every N seconds. When 1 or more OSD 3242 * requests has been active for more than N seconds, we send a keepalive 3243 * (tag + timestamp) to its OSD to ensure any communications channel 3244 * reset is detected. 3245 */ 3246 static void handle_timeout(struct work_struct *work) 3247 { 3248 struct ceph_osd_client *osdc = 3249 container_of(work, struct ceph_osd_client, timeout_work.work); 3250 struct ceph_options *opts = osdc->client->options; 3251 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout; 3252 unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout; 3253 LIST_HEAD(slow_osds); 3254 struct rb_node *n, *p; 3255 3256 dout("%s osdc %p\n", __func__, osdc); 3257 down_write(&osdc->lock); 3258 3259 /* 3260 * ping osds that are a bit slow. this ensures that if there 3261 * is a break in the TCP connection we will notice, and reopen 3262 * a connection with that osd (from the fault callback). 3263 */ 3264 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 3265 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3266 bool found = false; 3267 3268 for (p = rb_first(&osd->o_requests); p; ) { 3269 struct ceph_osd_request *req = 3270 rb_entry(p, struct ceph_osd_request, r_node); 3271 3272 p = rb_next(p); /* abort_request() */ 3273 3274 if (time_before(req->r_stamp, cutoff)) { 3275 dout(" req %p tid %llu on osd%d is laggy\n", 3276 req, req->r_tid, osd->o_osd); 3277 found = true; 3278 } 3279 if (opts->osd_request_timeout && 3280 time_before(req->r_start_stamp, expiry_cutoff)) { 3281 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3282 req->r_tid, osd->o_osd); 3283 abort_request(req, -ETIMEDOUT); 3284 } 3285 } 3286 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { 3287 struct ceph_osd_linger_request *lreq = 3288 rb_entry(p, struct ceph_osd_linger_request, node); 3289 3290 dout(" lreq %p linger_id %llu is served by osd%d\n", 3291 lreq, lreq->linger_id, osd->o_osd); 3292 found = true; 3293 3294 mutex_lock(&lreq->lock); 3295 if (lreq->is_watch && lreq->committed && !lreq->last_error) 3296 send_linger_ping(lreq); 3297 mutex_unlock(&lreq->lock); 3298 } 3299 3300 if (found) 3301 list_move_tail(&osd->o_keepalive_item, &slow_osds); 3302 } 3303 3304 if (opts->osd_request_timeout) { 3305 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { 3306 struct ceph_osd_request *req = 3307 rb_entry(p, struct ceph_osd_request, r_node); 3308 3309 p = rb_next(p); /* abort_request() */ 3310 3311 if (time_before(req->r_start_stamp, expiry_cutoff)) { 3312 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3313 req->r_tid, osdc->homeless_osd.o_osd); 3314 abort_request(req, -ETIMEDOUT); 3315 } 3316 } 3317 } 3318 3319 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds)) 3320 maybe_request_map(osdc); 3321 3322 while (!list_empty(&slow_osds)) { 3323 struct ceph_osd *osd = list_first_entry(&slow_osds, 3324 struct ceph_osd, 3325 o_keepalive_item); 3326 list_del_init(&osd->o_keepalive_item); 3327 ceph_con_keepalive(&osd->o_con); 3328 } 3329 3330 up_write(&osdc->lock); 3331 schedule_delayed_work(&osdc->timeout_work, 3332 osdc->client->options->osd_keepalive_timeout); 3333 } 3334 3335 static void handle_osds_timeout(struct work_struct *work) 3336 { 3337 struct ceph_osd_client *osdc = 3338 container_of(work, struct ceph_osd_client, 3339 osds_timeout_work.work); 3340 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 3341 struct ceph_osd *osd, *nosd; 3342 3343 dout("%s osdc %p\n", __func__, osdc); 3344 down_write(&osdc->lock); 3345 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 3346 if (time_before(jiffies, osd->lru_ttl)) 3347 break; 3348 3349 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 3350 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 3351 close_osd(osd); 3352 } 3353 3354 up_write(&osdc->lock); 3355 schedule_delayed_work(&osdc->osds_timeout_work, 3356 round_jiffies_relative(delay)); 3357 } 3358 3359 static int ceph_oloc_decode(void **p, void *end, 3360 struct ceph_object_locator *oloc) 3361 { 3362 u8 struct_v, struct_cv; 3363 u32 len; 3364 void *struct_end; 3365 int ret = 0; 3366 3367 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3368 struct_v = ceph_decode_8(p); 3369 struct_cv = ceph_decode_8(p); 3370 if (struct_v < 3) { 3371 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 3372 struct_v, struct_cv); 3373 goto e_inval; 3374 } 3375 if (struct_cv > 6) { 3376 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 3377 struct_v, struct_cv); 3378 goto e_inval; 3379 } 3380 len = ceph_decode_32(p); 3381 ceph_decode_need(p, end, len, e_inval); 3382 struct_end = *p + len; 3383 3384 oloc->pool = ceph_decode_64(p); 3385 *p += 4; /* skip preferred */ 3386 3387 len = ceph_decode_32(p); 3388 if (len > 0) { 3389 pr_warn("ceph_object_locator::key is set\n"); 3390 goto e_inval; 3391 } 3392 3393 if (struct_v >= 5) { 3394 bool changed = false; 3395 3396 len = ceph_decode_32(p); 3397 if (len > 0) { 3398 ceph_decode_need(p, end, len, e_inval); 3399 if (!oloc->pool_ns || 3400 ceph_compare_string(oloc->pool_ns, *p, len)) 3401 changed = true; 3402 *p += len; 3403 } else { 3404 if (oloc->pool_ns) 3405 changed = true; 3406 } 3407 if (changed) { 3408 /* redirect changes namespace */ 3409 pr_warn("ceph_object_locator::nspace is changed\n"); 3410 goto e_inval; 3411 } 3412 } 3413 3414 if (struct_v >= 6) { 3415 s64 hash = ceph_decode_64(p); 3416 if (hash != -1) { 3417 pr_warn("ceph_object_locator::hash is set\n"); 3418 goto e_inval; 3419 } 3420 } 3421 3422 /* skip the rest */ 3423 *p = struct_end; 3424 out: 3425 return ret; 3426 3427 e_inval: 3428 ret = -EINVAL; 3429 goto out; 3430 } 3431 3432 static int ceph_redirect_decode(void **p, void *end, 3433 struct ceph_request_redirect *redir) 3434 { 3435 u8 struct_v, struct_cv; 3436 u32 len; 3437 void *struct_end; 3438 int ret; 3439 3440 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3441 struct_v = ceph_decode_8(p); 3442 struct_cv = ceph_decode_8(p); 3443 if (struct_cv > 1) { 3444 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 3445 struct_v, struct_cv); 3446 goto e_inval; 3447 } 3448 len = ceph_decode_32(p); 3449 ceph_decode_need(p, end, len, e_inval); 3450 struct_end = *p + len; 3451 3452 ret = ceph_oloc_decode(p, end, &redir->oloc); 3453 if (ret) 3454 goto out; 3455 3456 len = ceph_decode_32(p); 3457 if (len > 0) { 3458 pr_warn("ceph_request_redirect::object_name is set\n"); 3459 goto e_inval; 3460 } 3461 3462 len = ceph_decode_32(p); 3463 *p += len; /* skip osd_instructions */ 3464 3465 /* skip the rest */ 3466 *p = struct_end; 3467 out: 3468 return ret; 3469 3470 e_inval: 3471 ret = -EINVAL; 3472 goto out; 3473 } 3474 3475 struct MOSDOpReply { 3476 struct ceph_pg pgid; 3477 u64 flags; 3478 int result; 3479 u32 epoch; 3480 int num_ops; 3481 u32 outdata_len[CEPH_OSD_MAX_OPS]; 3482 s32 rval[CEPH_OSD_MAX_OPS]; 3483 int retry_attempt; 3484 struct ceph_eversion replay_version; 3485 u64 user_version; 3486 struct ceph_request_redirect redirect; 3487 }; 3488 3489 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m) 3490 { 3491 void *p = msg->front.iov_base; 3492 void *const end = p + msg->front.iov_len; 3493 u16 version = le16_to_cpu(msg->hdr.version); 3494 struct ceph_eversion bad_replay_version; 3495 u8 decode_redir; 3496 u32 len; 3497 int ret; 3498 int i; 3499 3500 ceph_decode_32_safe(&p, end, len, e_inval); 3501 ceph_decode_need(&p, end, len, e_inval); 3502 p += len; /* skip oid */ 3503 3504 ret = ceph_decode_pgid(&p, end, &m->pgid); 3505 if (ret) 3506 return ret; 3507 3508 ceph_decode_64_safe(&p, end, m->flags, e_inval); 3509 ceph_decode_32_safe(&p, end, m->result, e_inval); 3510 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval); 3511 memcpy(&bad_replay_version, p, sizeof(bad_replay_version)); 3512 p += sizeof(bad_replay_version); 3513 ceph_decode_32_safe(&p, end, m->epoch, e_inval); 3514 3515 ceph_decode_32_safe(&p, end, m->num_ops, e_inval); 3516 if (m->num_ops > ARRAY_SIZE(m->outdata_len)) 3517 goto e_inval; 3518 3519 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op), 3520 e_inval); 3521 for (i = 0; i < m->num_ops; i++) { 3522 struct ceph_osd_op *op = p; 3523 3524 m->outdata_len[i] = le32_to_cpu(op->payload_len); 3525 p += sizeof(*op); 3526 } 3527 3528 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval); 3529 for (i = 0; i < m->num_ops; i++) 3530 ceph_decode_32_safe(&p, end, m->rval[i], e_inval); 3531 3532 if (version >= 5) { 3533 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval); 3534 memcpy(&m->replay_version, p, sizeof(m->replay_version)); 3535 p += sizeof(m->replay_version); 3536 ceph_decode_64_safe(&p, end, m->user_version, e_inval); 3537 } else { 3538 m->replay_version = bad_replay_version; /* struct */ 3539 m->user_version = le64_to_cpu(m->replay_version.version); 3540 } 3541 3542 if (version >= 6) { 3543 if (version >= 7) 3544 ceph_decode_8_safe(&p, end, decode_redir, e_inval); 3545 else 3546 decode_redir = 1; 3547 } else { 3548 decode_redir = 0; 3549 } 3550 3551 if (decode_redir) { 3552 ret = ceph_redirect_decode(&p, end, &m->redirect); 3553 if (ret) 3554 return ret; 3555 } else { 3556 ceph_oloc_init(&m->redirect.oloc); 3557 } 3558 3559 return 0; 3560 3561 e_inval: 3562 return -EINVAL; 3563 } 3564 3565 /* 3566 * Handle MOSDOpReply. Set ->r_result and call the callback if it is 3567 * specified. 3568 */ 3569 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) 3570 { 3571 struct ceph_osd_client *osdc = osd->o_osdc; 3572 struct ceph_osd_request *req; 3573 struct MOSDOpReply m; 3574 u64 tid = le64_to_cpu(msg->hdr.tid); 3575 u32 data_len = 0; 3576 int ret; 3577 int i; 3578 3579 dout("%s msg %p tid %llu\n", __func__, msg, tid); 3580 3581 down_read(&osdc->lock); 3582 if (!osd_registered(osd)) { 3583 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3584 goto out_unlock_osdc; 3585 } 3586 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 3587 3588 mutex_lock(&osd->lock); 3589 req = lookup_request(&osd->o_requests, tid); 3590 if (!req) { 3591 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid); 3592 goto out_unlock_session; 3593 } 3594 3595 m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns; 3596 ret = decode_MOSDOpReply(msg, &m); 3597 m.redirect.oloc.pool_ns = NULL; 3598 if (ret) { 3599 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n", 3600 req->r_tid, ret); 3601 ceph_msg_dump(msg); 3602 goto fail_request; 3603 } 3604 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n", 3605 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed, 3606 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch), 3607 le64_to_cpu(m.replay_version.version), m.user_version); 3608 3609 if (m.retry_attempt >= 0) { 3610 if (m.retry_attempt != req->r_attempts - 1) { 3611 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n", 3612 req, req->r_tid, m.retry_attempt, 3613 req->r_attempts - 1); 3614 goto out_unlock_session; 3615 } 3616 } else { 3617 WARN_ON(1); /* MOSDOpReply v4 is assumed */ 3618 } 3619 3620 if (!ceph_oloc_empty(&m.redirect.oloc)) { 3621 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid, 3622 m.redirect.oloc.pool); 3623 unlink_request(osd, req); 3624 mutex_unlock(&osd->lock); 3625 3626 /* 3627 * Not ceph_oloc_copy() - changing pool_ns is not 3628 * supported. 3629 */ 3630 req->r_t.target_oloc.pool = m.redirect.oloc.pool; 3631 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED; 3632 req->r_tid = 0; 3633 __submit_request(req, false); 3634 goto out_unlock_osdc; 3635 } 3636 3637 if (m.num_ops != req->r_num_ops) { 3638 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops, 3639 req->r_num_ops, req->r_tid); 3640 goto fail_request; 3641 } 3642 for (i = 0; i < req->r_num_ops; i++) { 3643 dout(" req %p tid %llu op %d rval %d len %u\n", req, 3644 req->r_tid, i, m.rval[i], m.outdata_len[i]); 3645 req->r_ops[i].rval = m.rval[i]; 3646 req->r_ops[i].outdata_len = m.outdata_len[i]; 3647 data_len += m.outdata_len[i]; 3648 } 3649 if (data_len != le32_to_cpu(msg->hdr.data_len)) { 3650 pr_err("sum of lens %u != %u for tid %llu\n", data_len, 3651 le32_to_cpu(msg->hdr.data_len), req->r_tid); 3652 goto fail_request; 3653 } 3654 dout("%s req %p tid %llu result %d data_len %u\n", __func__, 3655 req, req->r_tid, m.result, data_len); 3656 3657 /* 3658 * Since we only ever request ONDISK, we should only ever get 3659 * one (type of) reply back. 3660 */ 3661 WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); 3662 req->r_result = m.result ?: data_len; 3663 finish_request(req); 3664 mutex_unlock(&osd->lock); 3665 up_read(&osdc->lock); 3666 3667 __complete_request(req); 3668 return; 3669 3670 fail_request: 3671 complete_request(req, -EIO); 3672 out_unlock_session: 3673 mutex_unlock(&osd->lock); 3674 out_unlock_osdc: 3675 up_read(&osdc->lock); 3676 } 3677 3678 static void set_pool_was_full(struct ceph_osd_client *osdc) 3679 { 3680 struct rb_node *n; 3681 3682 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 3683 struct ceph_pg_pool_info *pi = 3684 rb_entry(n, struct ceph_pg_pool_info, node); 3685 3686 pi->was_full = __pool_full(pi); 3687 } 3688 } 3689 3690 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) 3691 { 3692 struct ceph_pg_pool_info *pi; 3693 3694 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 3695 if (!pi) 3696 return false; 3697 3698 return pi->was_full && !__pool_full(pi); 3699 } 3700 3701 static enum calc_target_result 3702 recalc_linger_target(struct ceph_osd_linger_request *lreq) 3703 { 3704 struct ceph_osd_client *osdc = lreq->osdc; 3705 enum calc_target_result ct_res; 3706 3707 ct_res = calc_target(osdc, &lreq->t, NULL, true); 3708 if (ct_res == CALC_TARGET_NEED_RESEND) { 3709 struct ceph_osd *osd; 3710 3711 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3712 if (osd != lreq->osd) { 3713 unlink_linger(lreq->osd, lreq); 3714 link_linger(osd, lreq); 3715 } 3716 } 3717 3718 return ct_res; 3719 } 3720 3721 /* 3722 * Requeue requests whose mapping to an OSD has changed. 3723 */ 3724 static void scan_requests(struct ceph_osd *osd, 3725 bool force_resend, 3726 bool cleared_full, 3727 bool check_pool_cleared_full, 3728 struct rb_root *need_resend, 3729 struct list_head *need_resend_linger) 3730 { 3731 struct ceph_osd_client *osdc = osd->o_osdc; 3732 struct rb_node *n; 3733 bool force_resend_writes; 3734 3735 for (n = rb_first(&osd->o_linger_requests); n; ) { 3736 struct ceph_osd_linger_request *lreq = 3737 rb_entry(n, struct ceph_osd_linger_request, node); 3738 enum calc_target_result ct_res; 3739 3740 n = rb_next(n); /* recalc_linger_target() */ 3741 3742 dout("%s lreq %p linger_id %llu\n", __func__, lreq, 3743 lreq->linger_id); 3744 ct_res = recalc_linger_target(lreq); 3745 switch (ct_res) { 3746 case CALC_TARGET_NO_ACTION: 3747 force_resend_writes = cleared_full || 3748 (check_pool_cleared_full && 3749 pool_cleared_full(osdc, lreq->t.base_oloc.pool)); 3750 if (!force_resend && !force_resend_writes) 3751 break; 3752 3753 /* fall through */ 3754 case CALC_TARGET_NEED_RESEND: 3755 cancel_linger_map_check(lreq); 3756 /* 3757 * scan_requests() for the previous epoch(s) 3758 * may have already added it to the list, since 3759 * it's not unlinked here. 3760 */ 3761 if (list_empty(&lreq->scan_item)) 3762 list_add_tail(&lreq->scan_item, need_resend_linger); 3763 break; 3764 case CALC_TARGET_POOL_DNE: 3765 list_del_init(&lreq->scan_item); 3766 check_linger_pool_dne(lreq); 3767 break; 3768 } 3769 } 3770 3771 for (n = rb_first(&osd->o_requests); n; ) { 3772 struct ceph_osd_request *req = 3773 rb_entry(n, struct ceph_osd_request, r_node); 3774 enum calc_target_result ct_res; 3775 3776 n = rb_next(n); /* unlink_request(), check_pool_dne() */ 3777 3778 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 3779 ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con, 3780 false); 3781 switch (ct_res) { 3782 case CALC_TARGET_NO_ACTION: 3783 force_resend_writes = cleared_full || 3784 (check_pool_cleared_full && 3785 pool_cleared_full(osdc, req->r_t.base_oloc.pool)); 3786 if (!force_resend && 3787 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) || 3788 !force_resend_writes)) 3789 break; 3790 3791 /* fall through */ 3792 case CALC_TARGET_NEED_RESEND: 3793 cancel_map_check(req); 3794 unlink_request(osd, req); 3795 insert_request(need_resend, req); 3796 break; 3797 case CALC_TARGET_POOL_DNE: 3798 check_pool_dne(req); 3799 break; 3800 } 3801 } 3802 } 3803 3804 static int handle_one_map(struct ceph_osd_client *osdc, 3805 void *p, void *end, bool incremental, 3806 struct rb_root *need_resend, 3807 struct list_head *need_resend_linger) 3808 { 3809 struct ceph_osdmap *newmap; 3810 struct rb_node *n; 3811 bool skipped_map = false; 3812 bool was_full; 3813 3814 was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3815 set_pool_was_full(osdc); 3816 3817 if (incremental) 3818 newmap = osdmap_apply_incremental(&p, end, osdc->osdmap); 3819 else 3820 newmap = ceph_osdmap_decode(&p, end); 3821 if (IS_ERR(newmap)) 3822 return PTR_ERR(newmap); 3823 3824 if (newmap != osdc->osdmap) { 3825 /* 3826 * Preserve ->was_full before destroying the old map. 3827 * For pools that weren't in the old map, ->was_full 3828 * should be false. 3829 */ 3830 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) { 3831 struct ceph_pg_pool_info *pi = 3832 rb_entry(n, struct ceph_pg_pool_info, node); 3833 struct ceph_pg_pool_info *old_pi; 3834 3835 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id); 3836 if (old_pi) 3837 pi->was_full = old_pi->was_full; 3838 else 3839 WARN_ON(pi->was_full); 3840 } 3841 3842 if (osdc->osdmap->epoch && 3843 osdc->osdmap->epoch + 1 < newmap->epoch) { 3844 WARN_ON(incremental); 3845 skipped_map = true; 3846 } 3847 3848 ceph_osdmap_destroy(osdc->osdmap); 3849 osdc->osdmap = newmap; 3850 } 3851 3852 was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3853 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true, 3854 need_resend, need_resend_linger); 3855 3856 for (n = rb_first(&osdc->osds); n; ) { 3857 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3858 3859 n = rb_next(n); /* close_osd() */ 3860 3861 scan_requests(osd, skipped_map, was_full, true, need_resend, 3862 need_resend_linger); 3863 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 3864 memcmp(&osd->o_con.peer_addr, 3865 ceph_osd_addr(osdc->osdmap, osd->o_osd), 3866 sizeof(struct ceph_entity_addr))) 3867 close_osd(osd); 3868 } 3869 3870 return 0; 3871 } 3872 3873 static void kick_requests(struct ceph_osd_client *osdc, 3874 struct rb_root *need_resend, 3875 struct list_head *need_resend_linger) 3876 { 3877 struct ceph_osd_linger_request *lreq, *nlreq; 3878 enum calc_target_result ct_res; 3879 struct rb_node *n; 3880 3881 /* make sure need_resend targets reflect latest map */ 3882 for (n = rb_first(need_resend); n; ) { 3883 struct ceph_osd_request *req = 3884 rb_entry(n, struct ceph_osd_request, r_node); 3885 3886 n = rb_next(n); 3887 3888 if (req->r_t.epoch < osdc->osdmap->epoch) { 3889 ct_res = calc_target(osdc, &req->r_t, NULL, false); 3890 if (ct_res == CALC_TARGET_POOL_DNE) { 3891 erase_request(need_resend, req); 3892 check_pool_dne(req); 3893 } 3894 } 3895 } 3896 3897 for (n = rb_first(need_resend); n; ) { 3898 struct ceph_osd_request *req = 3899 rb_entry(n, struct ceph_osd_request, r_node); 3900 struct ceph_osd *osd; 3901 3902 n = rb_next(n); 3903 erase_request(need_resend, req); /* before link_request() */ 3904 3905 osd = lookup_create_osd(osdc, req->r_t.osd, true); 3906 link_request(osd, req); 3907 if (!req->r_linger) { 3908 if (!osd_homeless(osd) && !req->r_t.paused) 3909 send_request(req); 3910 } else { 3911 cancel_linger_request(req); 3912 } 3913 } 3914 3915 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { 3916 if (!osd_homeless(lreq->osd)) 3917 send_linger(lreq); 3918 3919 list_del_init(&lreq->scan_item); 3920 } 3921 } 3922 3923 /* 3924 * Process updated osd map. 3925 * 3926 * The message contains any number of incremental and full maps, normally 3927 * indicating some sort of topology change in the cluster. Kick requests 3928 * off to different OSDs as needed. 3929 */ 3930 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 3931 { 3932 void *p = msg->front.iov_base; 3933 void *const end = p + msg->front.iov_len; 3934 u32 nr_maps, maplen; 3935 u32 epoch; 3936 struct ceph_fsid fsid; 3937 struct rb_root need_resend = RB_ROOT; 3938 LIST_HEAD(need_resend_linger); 3939 bool handled_incremental = false; 3940 bool was_pauserd, was_pausewr; 3941 bool pauserd, pausewr; 3942 int err; 3943 3944 dout("%s have %u\n", __func__, osdc->osdmap->epoch); 3945 down_write(&osdc->lock); 3946 3947 /* verify fsid */ 3948 ceph_decode_need(&p, end, sizeof(fsid), bad); 3949 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3950 if (ceph_check_fsid(osdc->client, &fsid) < 0) 3951 goto bad; 3952 3953 was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3954 was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3955 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3956 have_pool_full(osdc); 3957 3958 /* incremental maps */ 3959 ceph_decode_32_safe(&p, end, nr_maps, bad); 3960 dout(" %d inc maps\n", nr_maps); 3961 while (nr_maps > 0) { 3962 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3963 epoch = ceph_decode_32(&p); 3964 maplen = ceph_decode_32(&p); 3965 ceph_decode_need(&p, end, maplen, bad); 3966 if (osdc->osdmap->epoch && 3967 osdc->osdmap->epoch + 1 == epoch) { 3968 dout("applying incremental map %u len %d\n", 3969 epoch, maplen); 3970 err = handle_one_map(osdc, p, p + maplen, true, 3971 &need_resend, &need_resend_linger); 3972 if (err) 3973 goto bad; 3974 handled_incremental = true; 3975 } else { 3976 dout("ignoring incremental map %u len %d\n", 3977 epoch, maplen); 3978 } 3979 p += maplen; 3980 nr_maps--; 3981 } 3982 if (handled_incremental) 3983 goto done; 3984 3985 /* full maps */ 3986 ceph_decode_32_safe(&p, end, nr_maps, bad); 3987 dout(" %d full maps\n", nr_maps); 3988 while (nr_maps) { 3989 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3990 epoch = ceph_decode_32(&p); 3991 maplen = ceph_decode_32(&p); 3992 ceph_decode_need(&p, end, maplen, bad); 3993 if (nr_maps > 1) { 3994 dout("skipping non-latest full map %u len %d\n", 3995 epoch, maplen); 3996 } else if (osdc->osdmap->epoch >= epoch) { 3997 dout("skipping full map %u len %d, " 3998 "older than our %u\n", epoch, maplen, 3999 osdc->osdmap->epoch); 4000 } else { 4001 dout("taking full map %u len %d\n", epoch, maplen); 4002 err = handle_one_map(osdc, p, p + maplen, false, 4003 &need_resend, &need_resend_linger); 4004 if (err) 4005 goto bad; 4006 } 4007 p += maplen; 4008 nr_maps--; 4009 } 4010 4011 done: 4012 /* 4013 * subscribe to subsequent osdmap updates if full to ensure 4014 * we find out when we are no longer full and stop returning 4015 * ENOSPC. 4016 */ 4017 pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 4018 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 4019 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 4020 have_pool_full(osdc); 4021 if (was_pauserd || was_pausewr || pauserd || pausewr || 4022 osdc->osdmap->epoch < osdc->epoch_barrier) 4023 maybe_request_map(osdc); 4024 4025 kick_requests(osdc, &need_resend, &need_resend_linger); 4026 4027 ceph_osdc_abort_on_full(osdc); 4028 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 4029 osdc->osdmap->epoch); 4030 up_write(&osdc->lock); 4031 wake_up_all(&osdc->client->auth_wq); 4032 return; 4033 4034 bad: 4035 pr_err("osdc handle_map corrupt msg\n"); 4036 ceph_msg_dump(msg); 4037 up_write(&osdc->lock); 4038 } 4039 4040 /* 4041 * Resubmit requests pending on the given osd. 4042 */ 4043 static void kick_osd_requests(struct ceph_osd *osd) 4044 { 4045 struct rb_node *n; 4046 4047 clear_backoffs(osd); 4048 4049 for (n = rb_first(&osd->o_requests); n; ) { 4050 struct ceph_osd_request *req = 4051 rb_entry(n, struct ceph_osd_request, r_node); 4052 4053 n = rb_next(n); /* cancel_linger_request() */ 4054 4055 if (!req->r_linger) { 4056 if (!req->r_t.paused) 4057 send_request(req); 4058 } else { 4059 cancel_linger_request(req); 4060 } 4061 } 4062 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { 4063 struct ceph_osd_linger_request *lreq = 4064 rb_entry(n, struct ceph_osd_linger_request, node); 4065 4066 send_linger(lreq); 4067 } 4068 } 4069 4070 /* 4071 * If the osd connection drops, we need to resubmit all requests. 4072 */ 4073 static void osd_fault(struct ceph_connection *con) 4074 { 4075 struct ceph_osd *osd = con->private; 4076 struct ceph_osd_client *osdc = osd->o_osdc; 4077 4078 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 4079 4080 down_write(&osdc->lock); 4081 if (!osd_registered(osd)) { 4082 dout("%s osd%d unknown\n", __func__, osd->o_osd); 4083 goto out_unlock; 4084 } 4085 4086 if (!reopen_osd(osd)) 4087 kick_osd_requests(osd); 4088 maybe_request_map(osdc); 4089 4090 out_unlock: 4091 up_write(&osdc->lock); 4092 } 4093 4094 struct MOSDBackoff { 4095 struct ceph_spg spgid; 4096 u32 map_epoch; 4097 u8 op; 4098 u64 id; 4099 struct ceph_hobject_id *begin; 4100 struct ceph_hobject_id *end; 4101 }; 4102 4103 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m) 4104 { 4105 void *p = msg->front.iov_base; 4106 void *const end = p + msg->front.iov_len; 4107 u8 struct_v; 4108 u32 struct_len; 4109 int ret; 4110 4111 ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len); 4112 if (ret) 4113 return ret; 4114 4115 ret = ceph_decode_pgid(&p, end, &m->spgid.pgid); 4116 if (ret) 4117 return ret; 4118 4119 ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval); 4120 ceph_decode_32_safe(&p, end, m->map_epoch, e_inval); 4121 ceph_decode_8_safe(&p, end, m->op, e_inval); 4122 ceph_decode_64_safe(&p, end, m->id, e_inval); 4123 4124 m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO); 4125 if (!m->begin) 4126 return -ENOMEM; 4127 4128 ret = decode_hoid(&p, end, m->begin); 4129 if (ret) { 4130 free_hoid(m->begin); 4131 return ret; 4132 } 4133 4134 m->end = kzalloc(sizeof(*m->end), GFP_NOIO); 4135 if (!m->end) { 4136 free_hoid(m->begin); 4137 return -ENOMEM; 4138 } 4139 4140 ret = decode_hoid(&p, end, m->end); 4141 if (ret) { 4142 free_hoid(m->begin); 4143 free_hoid(m->end); 4144 return ret; 4145 } 4146 4147 return 0; 4148 4149 e_inval: 4150 return -EINVAL; 4151 } 4152 4153 static struct ceph_msg *create_backoff_message( 4154 const struct ceph_osd_backoff *backoff, 4155 u32 map_epoch) 4156 { 4157 struct ceph_msg *msg; 4158 void *p, *end; 4159 int msg_size; 4160 4161 msg_size = CEPH_ENCODING_START_BLK_LEN + 4162 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 4163 msg_size += 4 + 1 + 8; /* map_epoch, op, id */ 4164 msg_size += CEPH_ENCODING_START_BLK_LEN + 4165 hoid_encoding_size(backoff->begin); 4166 msg_size += CEPH_ENCODING_START_BLK_LEN + 4167 hoid_encoding_size(backoff->end); 4168 4169 msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true); 4170 if (!msg) 4171 return NULL; 4172 4173 p = msg->front.iov_base; 4174 end = p + msg->front_alloc_len; 4175 4176 encode_spgid(&p, &backoff->spgid); 4177 ceph_encode_32(&p, map_epoch); 4178 ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK); 4179 ceph_encode_64(&p, backoff->id); 4180 encode_hoid(&p, end, backoff->begin); 4181 encode_hoid(&p, end, backoff->end); 4182 BUG_ON(p != end); 4183 4184 msg->front.iov_len = p - msg->front.iov_base; 4185 msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */ 4186 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 4187 4188 return msg; 4189 } 4190 4191 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m) 4192 { 4193 struct ceph_spg_mapping *spg; 4194 struct ceph_osd_backoff *backoff; 4195 struct ceph_msg *msg; 4196 4197 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4198 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4199 4200 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid); 4201 if (!spg) { 4202 spg = alloc_spg_mapping(); 4203 if (!spg) { 4204 pr_err("%s failed to allocate spg\n", __func__); 4205 return; 4206 } 4207 spg->spgid = m->spgid; /* struct */ 4208 insert_spg_mapping(&osd->o_backoff_mappings, spg); 4209 } 4210 4211 backoff = alloc_backoff(); 4212 if (!backoff) { 4213 pr_err("%s failed to allocate backoff\n", __func__); 4214 return; 4215 } 4216 backoff->spgid = m->spgid; /* struct */ 4217 backoff->id = m->id; 4218 backoff->begin = m->begin; 4219 m->begin = NULL; /* backoff now owns this */ 4220 backoff->end = m->end; 4221 m->end = NULL; /* ditto */ 4222 4223 insert_backoff(&spg->backoffs, backoff); 4224 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4225 4226 /* 4227 * Ack with original backoff's epoch so that the OSD can 4228 * discard this if there was a PG split. 4229 */ 4230 msg = create_backoff_message(backoff, m->map_epoch); 4231 if (!msg) { 4232 pr_err("%s failed to allocate msg\n", __func__); 4233 return; 4234 } 4235 ceph_con_send(&osd->o_con, msg); 4236 } 4237 4238 static bool target_contained_by(const struct ceph_osd_request_target *t, 4239 const struct ceph_hobject_id *begin, 4240 const struct ceph_hobject_id *end) 4241 { 4242 struct ceph_hobject_id hoid; 4243 int cmp; 4244 4245 hoid_fill_from_target(&hoid, t); 4246 cmp = hoid_compare(&hoid, begin); 4247 return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0); 4248 } 4249 4250 static void handle_backoff_unblock(struct ceph_osd *osd, 4251 const struct MOSDBackoff *m) 4252 { 4253 struct ceph_spg_mapping *spg; 4254 struct ceph_osd_backoff *backoff; 4255 struct rb_node *n; 4256 4257 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4258 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4259 4260 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id); 4261 if (!backoff) { 4262 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n", 4263 __func__, osd->o_osd, m->spgid.pgid.pool, 4264 m->spgid.pgid.seed, m->spgid.shard, m->id); 4265 return; 4266 } 4267 4268 if (hoid_compare(backoff->begin, m->begin) && 4269 hoid_compare(backoff->end, m->end)) { 4270 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n", 4271 __func__, osd->o_osd, m->spgid.pgid.pool, 4272 m->spgid.pgid.seed, m->spgid.shard, m->id); 4273 /* unblock it anyway... */ 4274 } 4275 4276 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid); 4277 BUG_ON(!spg); 4278 4279 erase_backoff(&spg->backoffs, backoff); 4280 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4281 free_backoff(backoff); 4282 4283 if (RB_EMPTY_ROOT(&spg->backoffs)) { 4284 erase_spg_mapping(&osd->o_backoff_mappings, spg); 4285 free_spg_mapping(spg); 4286 } 4287 4288 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 4289 struct ceph_osd_request *req = 4290 rb_entry(n, struct ceph_osd_request, r_node); 4291 4292 if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) { 4293 /* 4294 * Match against @m, not @backoff -- the PG may 4295 * have split on the OSD. 4296 */ 4297 if (target_contained_by(&req->r_t, m->begin, m->end)) { 4298 /* 4299 * If no other installed backoff applies, 4300 * resend. 4301 */ 4302 send_request(req); 4303 } 4304 } 4305 } 4306 } 4307 4308 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg) 4309 { 4310 struct ceph_osd_client *osdc = osd->o_osdc; 4311 struct MOSDBackoff m; 4312 int ret; 4313 4314 down_read(&osdc->lock); 4315 if (!osd_registered(osd)) { 4316 dout("%s osd%d unknown\n", __func__, osd->o_osd); 4317 up_read(&osdc->lock); 4318 return; 4319 } 4320 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 4321 4322 mutex_lock(&osd->lock); 4323 ret = decode_MOSDBackoff(msg, &m); 4324 if (ret) { 4325 pr_err("failed to decode MOSDBackoff: %d\n", ret); 4326 ceph_msg_dump(msg); 4327 goto out_unlock; 4328 } 4329 4330 switch (m.op) { 4331 case CEPH_OSD_BACKOFF_OP_BLOCK: 4332 handle_backoff_block(osd, &m); 4333 break; 4334 case CEPH_OSD_BACKOFF_OP_UNBLOCK: 4335 handle_backoff_unblock(osd, &m); 4336 break; 4337 default: 4338 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op); 4339 } 4340 4341 free_hoid(m.begin); 4342 free_hoid(m.end); 4343 4344 out_unlock: 4345 mutex_unlock(&osd->lock); 4346 up_read(&osdc->lock); 4347 } 4348 4349 /* 4350 * Process osd watch notifications 4351 */ 4352 static void handle_watch_notify(struct ceph_osd_client *osdc, 4353 struct ceph_msg *msg) 4354 { 4355 void *p = msg->front.iov_base; 4356 void *const end = p + msg->front.iov_len; 4357 struct ceph_osd_linger_request *lreq; 4358 struct linger_work *lwork; 4359 u8 proto_ver, opcode; 4360 u64 cookie, notify_id; 4361 u64 notifier_id = 0; 4362 s32 return_code = 0; 4363 void *payload = NULL; 4364 u32 payload_len = 0; 4365 4366 ceph_decode_8_safe(&p, end, proto_ver, bad); 4367 ceph_decode_8_safe(&p, end, opcode, bad); 4368 ceph_decode_64_safe(&p, end, cookie, bad); 4369 p += 8; /* skip ver */ 4370 ceph_decode_64_safe(&p, end, notify_id, bad); 4371 4372 if (proto_ver >= 1) { 4373 ceph_decode_32_safe(&p, end, payload_len, bad); 4374 ceph_decode_need(&p, end, payload_len, bad); 4375 payload = p; 4376 p += payload_len; 4377 } 4378 4379 if (le16_to_cpu(msg->hdr.version) >= 2) 4380 ceph_decode_32_safe(&p, end, return_code, bad); 4381 4382 if (le16_to_cpu(msg->hdr.version) >= 3) 4383 ceph_decode_64_safe(&p, end, notifier_id, bad); 4384 4385 down_read(&osdc->lock); 4386 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); 4387 if (!lreq) { 4388 dout("%s opcode %d cookie %llu dne\n", __func__, opcode, 4389 cookie); 4390 goto out_unlock_osdc; 4391 } 4392 4393 mutex_lock(&lreq->lock); 4394 dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__, 4395 opcode, cookie, lreq, lreq->is_watch); 4396 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { 4397 if (!lreq->last_error) { 4398 lreq->last_error = -ENOTCONN; 4399 queue_watch_error(lreq); 4400 } 4401 } else if (!lreq->is_watch) { 4402 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */ 4403 if (lreq->notify_id && lreq->notify_id != notify_id) { 4404 dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq, 4405 lreq->notify_id, notify_id); 4406 } else if (!completion_done(&lreq->notify_finish_wait)) { 4407 struct ceph_msg_data *data = 4408 msg->num_data_items ? &msg->data[0] : NULL; 4409 4410 if (data) { 4411 if (lreq->preply_pages) { 4412 WARN_ON(data->type != 4413 CEPH_MSG_DATA_PAGES); 4414 *lreq->preply_pages = data->pages; 4415 *lreq->preply_len = data->length; 4416 } else { 4417 ceph_release_page_vector(data->pages, 4418 calc_pages_for(0, data->length)); 4419 } 4420 } 4421 lreq->notify_finish_error = return_code; 4422 complete_all(&lreq->notify_finish_wait); 4423 } 4424 } else { 4425 /* CEPH_WATCH_EVENT_NOTIFY */ 4426 lwork = lwork_alloc(lreq, do_watch_notify); 4427 if (!lwork) { 4428 pr_err("failed to allocate notify-lwork\n"); 4429 goto out_unlock_lreq; 4430 } 4431 4432 lwork->notify.notify_id = notify_id; 4433 lwork->notify.notifier_id = notifier_id; 4434 lwork->notify.payload = payload; 4435 lwork->notify.payload_len = payload_len; 4436 lwork->notify.msg = ceph_msg_get(msg); 4437 lwork_queue(lwork); 4438 } 4439 4440 out_unlock_lreq: 4441 mutex_unlock(&lreq->lock); 4442 out_unlock_osdc: 4443 up_read(&osdc->lock); 4444 return; 4445 4446 bad: 4447 pr_err("osdc handle_watch_notify corrupt msg\n"); 4448 } 4449 4450 /* 4451 * Register request, send initial attempt. 4452 */ 4453 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 4454 struct ceph_osd_request *req, 4455 bool nofail) 4456 { 4457 down_read(&osdc->lock); 4458 submit_request(req, false); 4459 up_read(&osdc->lock); 4460 4461 return 0; 4462 } 4463 EXPORT_SYMBOL(ceph_osdc_start_request); 4464 4465 /* 4466 * Unregister a registered request. The request is not completed: 4467 * ->r_result isn't set and __complete_request() isn't called. 4468 */ 4469 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 4470 { 4471 struct ceph_osd_client *osdc = req->r_osdc; 4472 4473 down_write(&osdc->lock); 4474 if (req->r_osd) 4475 cancel_request(req); 4476 up_write(&osdc->lock); 4477 } 4478 EXPORT_SYMBOL(ceph_osdc_cancel_request); 4479 4480 /* 4481 * @timeout: in jiffies, 0 means "wait forever" 4482 */ 4483 static int wait_request_timeout(struct ceph_osd_request *req, 4484 unsigned long timeout) 4485 { 4486 long left; 4487 4488 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 4489 left = wait_for_completion_killable_timeout(&req->r_completion, 4490 ceph_timeout_jiffies(timeout)); 4491 if (left <= 0) { 4492 left = left ?: -ETIMEDOUT; 4493 ceph_osdc_cancel_request(req); 4494 } else { 4495 left = req->r_result; /* completed */ 4496 } 4497 4498 return left; 4499 } 4500 4501 /* 4502 * wait for a request to complete 4503 */ 4504 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 4505 struct ceph_osd_request *req) 4506 { 4507 return wait_request_timeout(req, 0); 4508 } 4509 EXPORT_SYMBOL(ceph_osdc_wait_request); 4510 4511 /* 4512 * sync - wait for all in-flight requests to flush. avoid starvation. 4513 */ 4514 void ceph_osdc_sync(struct ceph_osd_client *osdc) 4515 { 4516 struct rb_node *n, *p; 4517 u64 last_tid = atomic64_read(&osdc->last_tid); 4518 4519 again: 4520 down_read(&osdc->lock); 4521 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 4522 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 4523 4524 mutex_lock(&osd->lock); 4525 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) { 4526 struct ceph_osd_request *req = 4527 rb_entry(p, struct ceph_osd_request, r_node); 4528 4529 if (req->r_tid > last_tid) 4530 break; 4531 4532 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE)) 4533 continue; 4534 4535 ceph_osdc_get_request(req); 4536 mutex_unlock(&osd->lock); 4537 up_read(&osdc->lock); 4538 dout("%s waiting on req %p tid %llu last_tid %llu\n", 4539 __func__, req, req->r_tid, last_tid); 4540 wait_for_completion(&req->r_completion); 4541 ceph_osdc_put_request(req); 4542 goto again; 4543 } 4544 4545 mutex_unlock(&osd->lock); 4546 } 4547 4548 up_read(&osdc->lock); 4549 dout("%s done last_tid %llu\n", __func__, last_tid); 4550 } 4551 EXPORT_SYMBOL(ceph_osdc_sync); 4552 4553 static struct ceph_osd_request * 4554 alloc_linger_request(struct ceph_osd_linger_request *lreq) 4555 { 4556 struct ceph_osd_request *req; 4557 4558 req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO); 4559 if (!req) 4560 return NULL; 4561 4562 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4563 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4564 return req; 4565 } 4566 4567 static struct ceph_osd_request * 4568 alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode) 4569 { 4570 struct ceph_osd_request *req; 4571 4572 req = alloc_linger_request(lreq); 4573 if (!req) 4574 return NULL; 4575 4576 /* 4577 * Pass 0 for cookie because we don't know it yet, it will be 4578 * filled in by linger_submit(). 4579 */ 4580 osd_req_op_watch_init(req, 0, 0, watch_opcode); 4581 4582 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { 4583 ceph_osdc_put_request(req); 4584 return NULL; 4585 } 4586 4587 return req; 4588 } 4589 4590 /* 4591 * Returns a handle, caller owns a ref. 4592 */ 4593 struct ceph_osd_linger_request * 4594 ceph_osdc_watch(struct ceph_osd_client *osdc, 4595 struct ceph_object_id *oid, 4596 struct ceph_object_locator *oloc, 4597 rados_watchcb2_t wcb, 4598 rados_watcherrcb_t errcb, 4599 void *data) 4600 { 4601 struct ceph_osd_linger_request *lreq; 4602 int ret; 4603 4604 lreq = linger_alloc(osdc); 4605 if (!lreq) 4606 return ERR_PTR(-ENOMEM); 4607 4608 lreq->is_watch = true; 4609 lreq->wcb = wcb; 4610 lreq->errcb = errcb; 4611 lreq->data = data; 4612 lreq->watch_valid_thru = jiffies; 4613 4614 ceph_oid_copy(&lreq->t.base_oid, oid); 4615 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4616 lreq->t.flags = CEPH_OSD_FLAG_WRITE; 4617 ktime_get_real_ts64(&lreq->mtime); 4618 4619 lreq->reg_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_WATCH); 4620 if (!lreq->reg_req) { 4621 ret = -ENOMEM; 4622 goto err_put_lreq; 4623 } 4624 4625 lreq->ping_req = alloc_watch_request(lreq, CEPH_OSD_WATCH_OP_PING); 4626 if (!lreq->ping_req) { 4627 ret = -ENOMEM; 4628 goto err_put_lreq; 4629 } 4630 4631 linger_submit(lreq); 4632 ret = linger_reg_commit_wait(lreq); 4633 if (ret) { 4634 linger_cancel(lreq); 4635 goto err_put_lreq; 4636 } 4637 4638 return lreq; 4639 4640 err_put_lreq: 4641 linger_put(lreq); 4642 return ERR_PTR(ret); 4643 } 4644 EXPORT_SYMBOL(ceph_osdc_watch); 4645 4646 /* 4647 * Releases a ref. 4648 * 4649 * Times out after mount_timeout to preserve rbd unmap behaviour 4650 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap 4651 * with mount_timeout"). 4652 */ 4653 int ceph_osdc_unwatch(struct ceph_osd_client *osdc, 4654 struct ceph_osd_linger_request *lreq) 4655 { 4656 struct ceph_options *opts = osdc->client->options; 4657 struct ceph_osd_request *req; 4658 int ret; 4659 4660 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4661 if (!req) 4662 return -ENOMEM; 4663 4664 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4665 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4666 req->r_flags = CEPH_OSD_FLAG_WRITE; 4667 ktime_get_real_ts64(&req->r_mtime); 4668 osd_req_op_watch_init(req, 0, lreq->linger_id, 4669 CEPH_OSD_WATCH_OP_UNWATCH); 4670 4671 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4672 if (ret) 4673 goto out_put_req; 4674 4675 ceph_osdc_start_request(osdc, req, false); 4676 linger_cancel(lreq); 4677 linger_put(lreq); 4678 ret = wait_request_timeout(req, opts->mount_timeout); 4679 4680 out_put_req: 4681 ceph_osdc_put_request(req); 4682 return ret; 4683 } 4684 EXPORT_SYMBOL(ceph_osdc_unwatch); 4685 4686 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, 4687 u64 notify_id, u64 cookie, void *payload, 4688 u32 payload_len) 4689 { 4690 struct ceph_osd_req_op *op; 4691 struct ceph_pagelist *pl; 4692 int ret; 4693 4694 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); 4695 4696 pl = ceph_pagelist_alloc(GFP_NOIO); 4697 if (!pl) 4698 return -ENOMEM; 4699 4700 ret = ceph_pagelist_encode_64(pl, notify_id); 4701 ret |= ceph_pagelist_encode_64(pl, cookie); 4702 if (payload) { 4703 ret |= ceph_pagelist_encode_32(pl, payload_len); 4704 ret |= ceph_pagelist_append(pl, payload, payload_len); 4705 } else { 4706 ret |= ceph_pagelist_encode_32(pl, 0); 4707 } 4708 if (ret) { 4709 ceph_pagelist_release(pl); 4710 return -ENOMEM; 4711 } 4712 4713 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); 4714 op->indata_len = pl->length; 4715 return 0; 4716 } 4717 4718 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, 4719 struct ceph_object_id *oid, 4720 struct ceph_object_locator *oloc, 4721 u64 notify_id, 4722 u64 cookie, 4723 void *payload, 4724 u32 payload_len) 4725 { 4726 struct ceph_osd_request *req; 4727 int ret; 4728 4729 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4730 if (!req) 4731 return -ENOMEM; 4732 4733 ceph_oid_copy(&req->r_base_oid, oid); 4734 ceph_oloc_copy(&req->r_base_oloc, oloc); 4735 req->r_flags = CEPH_OSD_FLAG_READ; 4736 4737 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, 4738 payload_len); 4739 if (ret) 4740 goto out_put_req; 4741 4742 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4743 if (ret) 4744 goto out_put_req; 4745 4746 ceph_osdc_start_request(osdc, req, false); 4747 ret = ceph_osdc_wait_request(osdc, req); 4748 4749 out_put_req: 4750 ceph_osdc_put_request(req); 4751 return ret; 4752 } 4753 EXPORT_SYMBOL(ceph_osdc_notify_ack); 4754 4755 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which, 4756 u64 cookie, u32 prot_ver, u32 timeout, 4757 void *payload, u32 payload_len) 4758 { 4759 struct ceph_osd_req_op *op; 4760 struct ceph_pagelist *pl; 4761 int ret; 4762 4763 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); 4764 op->notify.cookie = cookie; 4765 4766 pl = ceph_pagelist_alloc(GFP_NOIO); 4767 if (!pl) 4768 return -ENOMEM; 4769 4770 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ 4771 ret |= ceph_pagelist_encode_32(pl, timeout); 4772 ret |= ceph_pagelist_encode_32(pl, payload_len); 4773 ret |= ceph_pagelist_append(pl, payload, payload_len); 4774 if (ret) { 4775 ceph_pagelist_release(pl); 4776 return -ENOMEM; 4777 } 4778 4779 ceph_osd_data_pagelist_init(&op->notify.request_data, pl); 4780 op->indata_len = pl->length; 4781 return 0; 4782 } 4783 4784 /* 4785 * @timeout: in seconds 4786 * 4787 * @preply_{pages,len} are initialized both on success and error. 4788 * The caller is responsible for: 4789 * 4790 * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)) 4791 */ 4792 int ceph_osdc_notify(struct ceph_osd_client *osdc, 4793 struct ceph_object_id *oid, 4794 struct ceph_object_locator *oloc, 4795 void *payload, 4796 u32 payload_len, 4797 u32 timeout, 4798 struct page ***preply_pages, 4799 size_t *preply_len) 4800 { 4801 struct ceph_osd_linger_request *lreq; 4802 struct page **pages; 4803 int ret; 4804 4805 WARN_ON(!timeout); 4806 if (preply_pages) { 4807 *preply_pages = NULL; 4808 *preply_len = 0; 4809 } 4810 4811 lreq = linger_alloc(osdc); 4812 if (!lreq) 4813 return -ENOMEM; 4814 4815 lreq->preply_pages = preply_pages; 4816 lreq->preply_len = preply_len; 4817 4818 ceph_oid_copy(&lreq->t.base_oid, oid); 4819 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4820 lreq->t.flags = CEPH_OSD_FLAG_READ; 4821 4822 lreq->reg_req = alloc_linger_request(lreq); 4823 if (!lreq->reg_req) { 4824 ret = -ENOMEM; 4825 goto out_put_lreq; 4826 } 4827 4828 /* 4829 * Pass 0 for cookie because we don't know it yet, it will be 4830 * filled in by linger_submit(). 4831 */ 4832 ret = osd_req_op_notify_init(lreq->reg_req, 0, 0, 1, timeout, 4833 payload, payload_len); 4834 if (ret) 4835 goto out_put_lreq; 4836 4837 /* for notify_id */ 4838 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4839 if (IS_ERR(pages)) { 4840 ret = PTR_ERR(pages); 4841 goto out_put_lreq; 4842 } 4843 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, 4844 response_data), 4845 pages, PAGE_SIZE, 0, false, true); 4846 4847 ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO); 4848 if (ret) 4849 goto out_put_lreq; 4850 4851 linger_submit(lreq); 4852 ret = linger_reg_commit_wait(lreq); 4853 if (!ret) 4854 ret = linger_notify_finish_wait(lreq); 4855 else 4856 dout("lreq %p failed to initiate notify %d\n", lreq, ret); 4857 4858 linger_cancel(lreq); 4859 out_put_lreq: 4860 linger_put(lreq); 4861 return ret; 4862 } 4863 EXPORT_SYMBOL(ceph_osdc_notify); 4864 4865 /* 4866 * Return the number of milliseconds since the watch was last 4867 * confirmed, or an error. If there is an error, the watch is no 4868 * longer valid, and should be destroyed with ceph_osdc_unwatch(). 4869 */ 4870 int ceph_osdc_watch_check(struct ceph_osd_client *osdc, 4871 struct ceph_osd_linger_request *lreq) 4872 { 4873 unsigned long stamp, age; 4874 int ret; 4875 4876 down_read(&osdc->lock); 4877 mutex_lock(&lreq->lock); 4878 stamp = lreq->watch_valid_thru; 4879 if (!list_empty(&lreq->pending_lworks)) { 4880 struct linger_work *lwork = 4881 list_first_entry(&lreq->pending_lworks, 4882 struct linger_work, 4883 pending_item); 4884 4885 if (time_before(lwork->queued_stamp, stamp)) 4886 stamp = lwork->queued_stamp; 4887 } 4888 age = jiffies - stamp; 4889 dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__, 4890 lreq, lreq->linger_id, age, lreq->last_error); 4891 /* we are truncating to msecs, so return a safe upper bound */ 4892 ret = lreq->last_error ?: 1 + jiffies_to_msecs(age); 4893 4894 mutex_unlock(&lreq->lock); 4895 up_read(&osdc->lock); 4896 return ret; 4897 } 4898 4899 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) 4900 { 4901 u8 struct_v; 4902 u32 struct_len; 4903 int ret; 4904 4905 ret = ceph_start_decoding(p, end, 2, "watch_item_t", 4906 &struct_v, &struct_len); 4907 if (ret) 4908 goto bad; 4909 4910 ret = -EINVAL; 4911 ceph_decode_copy_safe(p, end, &item->name, sizeof(item->name), bad); 4912 ceph_decode_64_safe(p, end, item->cookie, bad); 4913 ceph_decode_skip_32(p, end, bad); /* skip timeout seconds */ 4914 4915 if (struct_v >= 2) { 4916 ret = ceph_decode_entity_addr(p, end, &item->addr); 4917 if (ret) 4918 goto bad; 4919 } else { 4920 ret = 0; 4921 } 4922 4923 dout("%s %s%llu cookie %llu addr %s\n", __func__, 4924 ENTITY_NAME(item->name), item->cookie, 4925 ceph_pr_addr(&item->addr)); 4926 bad: 4927 return ret; 4928 } 4929 4930 static int decode_watchers(void **p, void *end, 4931 struct ceph_watch_item **watchers, 4932 u32 *num_watchers) 4933 { 4934 u8 struct_v; 4935 u32 struct_len; 4936 int i; 4937 int ret; 4938 4939 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t", 4940 &struct_v, &struct_len); 4941 if (ret) 4942 return ret; 4943 4944 *num_watchers = ceph_decode_32(p); 4945 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO); 4946 if (!*watchers) 4947 return -ENOMEM; 4948 4949 for (i = 0; i < *num_watchers; i++) { 4950 ret = decode_watcher(p, end, *watchers + i); 4951 if (ret) { 4952 kfree(*watchers); 4953 return ret; 4954 } 4955 } 4956 4957 return 0; 4958 } 4959 4960 /* 4961 * On success, the caller is responsible for: 4962 * 4963 * kfree(watchers); 4964 */ 4965 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, 4966 struct ceph_object_id *oid, 4967 struct ceph_object_locator *oloc, 4968 struct ceph_watch_item **watchers, 4969 u32 *num_watchers) 4970 { 4971 struct ceph_osd_request *req; 4972 struct page **pages; 4973 int ret; 4974 4975 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4976 if (!req) 4977 return -ENOMEM; 4978 4979 ceph_oid_copy(&req->r_base_oid, oid); 4980 ceph_oloc_copy(&req->r_base_oloc, oloc); 4981 req->r_flags = CEPH_OSD_FLAG_READ; 4982 4983 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4984 if (IS_ERR(pages)) { 4985 ret = PTR_ERR(pages); 4986 goto out_put_req; 4987 } 4988 4989 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0); 4990 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers, 4991 response_data), 4992 pages, PAGE_SIZE, 0, false, true); 4993 4994 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4995 if (ret) 4996 goto out_put_req; 4997 4998 ceph_osdc_start_request(osdc, req, false); 4999 ret = ceph_osdc_wait_request(osdc, req); 5000 if (ret >= 0) { 5001 void *p = page_address(pages[0]); 5002 void *const end = p + req->r_ops[0].outdata_len; 5003 5004 ret = decode_watchers(&p, end, watchers, num_watchers); 5005 } 5006 5007 out_put_req: 5008 ceph_osdc_put_request(req); 5009 return ret; 5010 } 5011 EXPORT_SYMBOL(ceph_osdc_list_watchers); 5012 5013 /* 5014 * Call all pending notify callbacks - for use after a watch is 5015 * unregistered, to make sure no more callbacks for it will be invoked 5016 */ 5017 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 5018 { 5019 dout("%s osdc %p\n", __func__, osdc); 5020 flush_workqueue(osdc->notify_wq); 5021 } 5022 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 5023 5024 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) 5025 { 5026 down_read(&osdc->lock); 5027 maybe_request_map(osdc); 5028 up_read(&osdc->lock); 5029 } 5030 EXPORT_SYMBOL(ceph_osdc_maybe_request_map); 5031 5032 /* 5033 * Execute an OSD class method on an object. 5034 * 5035 * @flags: CEPH_OSD_FLAG_* 5036 * @resp_len: in/out param for reply length 5037 */ 5038 int ceph_osdc_call(struct ceph_osd_client *osdc, 5039 struct ceph_object_id *oid, 5040 struct ceph_object_locator *oloc, 5041 const char *class, const char *method, 5042 unsigned int flags, 5043 struct page *req_page, size_t req_len, 5044 struct page **resp_pages, size_t *resp_len) 5045 { 5046 struct ceph_osd_request *req; 5047 int ret; 5048 5049 if (req_len > PAGE_SIZE) 5050 return -E2BIG; 5051 5052 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 5053 if (!req) 5054 return -ENOMEM; 5055 5056 ceph_oid_copy(&req->r_base_oid, oid); 5057 ceph_oloc_copy(&req->r_base_oloc, oloc); 5058 req->r_flags = flags; 5059 5060 ret = osd_req_op_cls_init(req, 0, class, method); 5061 if (ret) 5062 goto out_put_req; 5063 5064 if (req_page) 5065 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 5066 0, false, false); 5067 if (resp_pages) 5068 osd_req_op_cls_response_data_pages(req, 0, resp_pages, 5069 *resp_len, 0, false, false); 5070 5071 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 5072 if (ret) 5073 goto out_put_req; 5074 5075 ceph_osdc_start_request(osdc, req, false); 5076 ret = ceph_osdc_wait_request(osdc, req); 5077 if (ret >= 0) { 5078 ret = req->r_ops[0].rval; 5079 if (resp_pages) 5080 *resp_len = req->r_ops[0].outdata_len; 5081 } 5082 5083 out_put_req: 5084 ceph_osdc_put_request(req); 5085 return ret; 5086 } 5087 EXPORT_SYMBOL(ceph_osdc_call); 5088 5089 /* 5090 * init, shutdown 5091 */ 5092 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 5093 { 5094 int err; 5095 5096 dout("init\n"); 5097 osdc->client = client; 5098 init_rwsem(&osdc->lock); 5099 osdc->osds = RB_ROOT; 5100 INIT_LIST_HEAD(&osdc->osd_lru); 5101 spin_lock_init(&osdc->osd_lru_lock); 5102 osd_init(&osdc->homeless_osd); 5103 osdc->homeless_osd.o_osdc = osdc; 5104 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; 5105 osdc->last_linger_id = CEPH_LINGER_ID_START; 5106 osdc->linger_requests = RB_ROOT; 5107 osdc->map_checks = RB_ROOT; 5108 osdc->linger_map_checks = RB_ROOT; 5109 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 5110 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 5111 5112 err = -ENOMEM; 5113 osdc->osdmap = ceph_osdmap_alloc(); 5114 if (!osdc->osdmap) 5115 goto out; 5116 5117 osdc->req_mempool = mempool_create_slab_pool(10, 5118 ceph_osd_request_cache); 5119 if (!osdc->req_mempool) 5120 goto out_map; 5121 5122 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 5123 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, "osd_op"); 5124 if (err < 0) 5125 goto out_mempool; 5126 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 5127 PAGE_SIZE, CEPH_OSD_SLAB_OPS, 10, 5128 "osd_op_reply"); 5129 if (err < 0) 5130 goto out_msgpool; 5131 5132 err = -ENOMEM; 5133 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 5134 if (!osdc->notify_wq) 5135 goto out_msgpool_reply; 5136 5137 osdc->completion_wq = create_singlethread_workqueue("ceph-completion"); 5138 if (!osdc->completion_wq) 5139 goto out_notify_wq; 5140 5141 schedule_delayed_work(&osdc->timeout_work, 5142 osdc->client->options->osd_keepalive_timeout); 5143 schedule_delayed_work(&osdc->osds_timeout_work, 5144 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 5145 5146 return 0; 5147 5148 out_notify_wq: 5149 destroy_workqueue(osdc->notify_wq); 5150 out_msgpool_reply: 5151 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5152 out_msgpool: 5153 ceph_msgpool_destroy(&osdc->msgpool_op); 5154 out_mempool: 5155 mempool_destroy(osdc->req_mempool); 5156 out_map: 5157 ceph_osdmap_destroy(osdc->osdmap); 5158 out: 5159 return err; 5160 } 5161 5162 void ceph_osdc_stop(struct ceph_osd_client *osdc) 5163 { 5164 destroy_workqueue(osdc->completion_wq); 5165 destroy_workqueue(osdc->notify_wq); 5166 cancel_delayed_work_sync(&osdc->timeout_work); 5167 cancel_delayed_work_sync(&osdc->osds_timeout_work); 5168 5169 down_write(&osdc->lock); 5170 while (!RB_EMPTY_ROOT(&osdc->osds)) { 5171 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 5172 struct ceph_osd, o_node); 5173 close_osd(osd); 5174 } 5175 up_write(&osdc->lock); 5176 WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1); 5177 osd_cleanup(&osdc->homeless_osd); 5178 5179 WARN_ON(!list_empty(&osdc->osd_lru)); 5180 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); 5181 WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks)); 5182 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks)); 5183 WARN_ON(atomic_read(&osdc->num_requests)); 5184 WARN_ON(atomic_read(&osdc->num_homeless)); 5185 5186 ceph_osdmap_destroy(osdc->osdmap); 5187 mempool_destroy(osdc->req_mempool); 5188 ceph_msgpool_destroy(&osdc->msgpool_op); 5189 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 5190 } 5191 5192 /* 5193 * Read some contiguous pages. If we cross a stripe boundary, shorten 5194 * *plen. Return number of bytes read, or error. 5195 */ 5196 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 5197 struct ceph_vino vino, struct ceph_file_layout *layout, 5198 u64 off, u64 *plen, 5199 u32 truncate_seq, u64 truncate_size, 5200 struct page **pages, int num_pages, int page_align) 5201 { 5202 struct ceph_osd_request *req; 5203 int rc = 0; 5204 5205 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 5206 vino.snap, off, *plen); 5207 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, 5208 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 5209 NULL, truncate_seq, truncate_size, 5210 false); 5211 if (IS_ERR(req)) 5212 return PTR_ERR(req); 5213 5214 /* it may be a short read due to an object boundary */ 5215 osd_req_op_extent_osd_data_pages(req, 0, 5216 pages, *plen, page_align, false, false); 5217 5218 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 5219 off, *plen, *plen, page_align); 5220 5221 rc = ceph_osdc_start_request(osdc, req, false); 5222 if (!rc) 5223 rc = ceph_osdc_wait_request(osdc, req); 5224 5225 ceph_osdc_put_request(req); 5226 dout("readpages result %d\n", rc); 5227 return rc; 5228 } 5229 EXPORT_SYMBOL(ceph_osdc_readpages); 5230 5231 /* 5232 * do a synchronous write on N pages 5233 */ 5234 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 5235 struct ceph_file_layout *layout, 5236 struct ceph_snap_context *snapc, 5237 u64 off, u64 len, 5238 u32 truncate_seq, u64 truncate_size, 5239 struct timespec64 *mtime, 5240 struct page **pages, int num_pages) 5241 { 5242 struct ceph_osd_request *req; 5243 int rc = 0; 5244 int page_align = off & ~PAGE_MASK; 5245 5246 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 5247 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 5248 snapc, truncate_seq, truncate_size, 5249 true); 5250 if (IS_ERR(req)) 5251 return PTR_ERR(req); 5252 5253 /* it may be a short write due to an object boundary */ 5254 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 5255 false, false); 5256 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 5257 5258 req->r_mtime = *mtime; 5259 rc = ceph_osdc_start_request(osdc, req, true); 5260 if (!rc) 5261 rc = ceph_osdc_wait_request(osdc, req); 5262 5263 ceph_osdc_put_request(req); 5264 if (rc == 0) 5265 rc = len; 5266 dout("writepages result %d\n", rc); 5267 return rc; 5268 } 5269 EXPORT_SYMBOL(ceph_osdc_writepages); 5270 5271 static int osd_req_op_copy_from_init(struct ceph_osd_request *req, 5272 u64 src_snapid, u64 src_version, 5273 struct ceph_object_id *src_oid, 5274 struct ceph_object_locator *src_oloc, 5275 u32 src_fadvise_flags, 5276 u32 dst_fadvise_flags, 5277 u8 copy_from_flags) 5278 { 5279 struct ceph_osd_req_op *op; 5280 struct page **pages; 5281 void *p, *end; 5282 5283 pages = ceph_alloc_page_vector(1, GFP_KERNEL); 5284 if (IS_ERR(pages)) 5285 return PTR_ERR(pages); 5286 5287 op = _osd_req_op_init(req, 0, CEPH_OSD_OP_COPY_FROM, dst_fadvise_flags); 5288 op->copy_from.snapid = src_snapid; 5289 op->copy_from.src_version = src_version; 5290 op->copy_from.flags = copy_from_flags; 5291 op->copy_from.src_fadvise_flags = src_fadvise_flags; 5292 5293 p = page_address(pages[0]); 5294 end = p + PAGE_SIZE; 5295 ceph_encode_string(&p, end, src_oid->name, src_oid->name_len); 5296 encode_oloc(&p, end, src_oloc); 5297 op->indata_len = PAGE_SIZE - (end - p); 5298 5299 ceph_osd_data_pages_init(&op->copy_from.osd_data, pages, 5300 op->indata_len, 0, false, true); 5301 return 0; 5302 } 5303 5304 int ceph_osdc_copy_from(struct ceph_osd_client *osdc, 5305 u64 src_snapid, u64 src_version, 5306 struct ceph_object_id *src_oid, 5307 struct ceph_object_locator *src_oloc, 5308 u32 src_fadvise_flags, 5309 struct ceph_object_id *dst_oid, 5310 struct ceph_object_locator *dst_oloc, 5311 u32 dst_fadvise_flags, 5312 u8 copy_from_flags) 5313 { 5314 struct ceph_osd_request *req; 5315 int ret; 5316 5317 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); 5318 if (!req) 5319 return -ENOMEM; 5320 5321 req->r_flags = CEPH_OSD_FLAG_WRITE; 5322 5323 ceph_oloc_copy(&req->r_t.base_oloc, dst_oloc); 5324 ceph_oid_copy(&req->r_t.base_oid, dst_oid); 5325 5326 ret = osd_req_op_copy_from_init(req, src_snapid, src_version, src_oid, 5327 src_oloc, src_fadvise_flags, 5328 dst_fadvise_flags, copy_from_flags); 5329 if (ret) 5330 goto out; 5331 5332 ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); 5333 if (ret) 5334 goto out; 5335 5336 ceph_osdc_start_request(osdc, req, false); 5337 ret = ceph_osdc_wait_request(osdc, req); 5338 5339 out: 5340 ceph_osdc_put_request(req); 5341 return ret; 5342 } 5343 EXPORT_SYMBOL(ceph_osdc_copy_from); 5344 5345 int __init ceph_osdc_setup(void) 5346 { 5347 size_t size = sizeof(struct ceph_osd_request) + 5348 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 5349 5350 BUG_ON(ceph_osd_request_cache); 5351 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 5352 0, 0, NULL); 5353 5354 return ceph_osd_request_cache ? 0 : -ENOMEM; 5355 } 5356 5357 void ceph_osdc_cleanup(void) 5358 { 5359 BUG_ON(!ceph_osd_request_cache); 5360 kmem_cache_destroy(ceph_osd_request_cache); 5361 ceph_osd_request_cache = NULL; 5362 } 5363 5364 /* 5365 * handle incoming message 5366 */ 5367 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5368 { 5369 struct ceph_osd *osd = con->private; 5370 struct ceph_osd_client *osdc = osd->o_osdc; 5371 int type = le16_to_cpu(msg->hdr.type); 5372 5373 switch (type) { 5374 case CEPH_MSG_OSD_MAP: 5375 ceph_osdc_handle_map(osdc, msg); 5376 break; 5377 case CEPH_MSG_OSD_OPREPLY: 5378 handle_reply(osd, msg); 5379 break; 5380 case CEPH_MSG_OSD_BACKOFF: 5381 handle_backoff(osd, msg); 5382 break; 5383 case CEPH_MSG_WATCH_NOTIFY: 5384 handle_watch_notify(osdc, msg); 5385 break; 5386 5387 default: 5388 pr_err("received unknown message type %d %s\n", type, 5389 ceph_msg_type_name(type)); 5390 } 5391 5392 ceph_msg_put(msg); 5393 } 5394 5395 /* 5396 * Lookup and return message for incoming reply. Don't try to do 5397 * anything about a larger than preallocated data portion of the 5398 * message at the moment - for now, just skip the message. 5399 */ 5400 static struct ceph_msg *get_reply(struct ceph_connection *con, 5401 struct ceph_msg_header *hdr, 5402 int *skip) 5403 { 5404 struct ceph_osd *osd = con->private; 5405 struct ceph_osd_client *osdc = osd->o_osdc; 5406 struct ceph_msg *m = NULL; 5407 struct ceph_osd_request *req; 5408 int front_len = le32_to_cpu(hdr->front_len); 5409 int data_len = le32_to_cpu(hdr->data_len); 5410 u64 tid = le64_to_cpu(hdr->tid); 5411 5412 down_read(&osdc->lock); 5413 if (!osd_registered(osd)) { 5414 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd); 5415 *skip = 1; 5416 goto out_unlock_osdc; 5417 } 5418 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num)); 5419 5420 mutex_lock(&osd->lock); 5421 req = lookup_request(&osd->o_requests, tid); 5422 if (!req) { 5423 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 5424 osd->o_osd, tid); 5425 *skip = 1; 5426 goto out_unlock_session; 5427 } 5428 5429 ceph_msg_revoke_incoming(req->r_reply); 5430 5431 if (front_len > req->r_reply->front_alloc_len) { 5432 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 5433 __func__, osd->o_osd, req->r_tid, front_len, 5434 req->r_reply->front_alloc_len); 5435 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 5436 false); 5437 if (!m) 5438 goto out_unlock_session; 5439 ceph_msg_put(req->r_reply); 5440 req->r_reply = m; 5441 } 5442 5443 if (data_len > req->r_reply->data_length) { 5444 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 5445 __func__, osd->o_osd, req->r_tid, data_len, 5446 req->r_reply->data_length); 5447 m = NULL; 5448 *skip = 1; 5449 goto out_unlock_session; 5450 } 5451 5452 m = ceph_msg_get(req->r_reply); 5453 dout("get_reply tid %lld %p\n", tid, m); 5454 5455 out_unlock_session: 5456 mutex_unlock(&osd->lock); 5457 out_unlock_osdc: 5458 up_read(&osdc->lock); 5459 return m; 5460 } 5461 5462 /* 5463 * TODO: switch to a msg-owned pagelist 5464 */ 5465 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) 5466 { 5467 struct ceph_msg *m; 5468 int type = le16_to_cpu(hdr->type); 5469 u32 front_len = le32_to_cpu(hdr->front_len); 5470 u32 data_len = le32_to_cpu(hdr->data_len); 5471 5472 m = ceph_msg_new2(type, front_len, 1, GFP_NOIO, false); 5473 if (!m) 5474 return NULL; 5475 5476 if (data_len) { 5477 struct page **pages; 5478 struct ceph_osd_data osd_data; 5479 5480 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len), 5481 GFP_NOIO); 5482 if (IS_ERR(pages)) { 5483 ceph_msg_put(m); 5484 return NULL; 5485 } 5486 5487 ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false, 5488 false); 5489 ceph_osdc_msg_data_add(m, &osd_data); 5490 } 5491 5492 return m; 5493 } 5494 5495 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 5496 struct ceph_msg_header *hdr, 5497 int *skip) 5498 { 5499 struct ceph_osd *osd = con->private; 5500 int type = le16_to_cpu(hdr->type); 5501 5502 *skip = 0; 5503 switch (type) { 5504 case CEPH_MSG_OSD_MAP: 5505 case CEPH_MSG_OSD_BACKOFF: 5506 case CEPH_MSG_WATCH_NOTIFY: 5507 return alloc_msg_with_page_vector(hdr); 5508 case CEPH_MSG_OSD_OPREPLY: 5509 return get_reply(con, hdr, skip); 5510 default: 5511 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__, 5512 osd->o_osd, type); 5513 *skip = 1; 5514 return NULL; 5515 } 5516 } 5517 5518 /* 5519 * Wrappers to refcount containing ceph_osd struct 5520 */ 5521 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 5522 { 5523 struct ceph_osd *osd = con->private; 5524 if (get_osd(osd)) 5525 return con; 5526 return NULL; 5527 } 5528 5529 static void put_osd_con(struct ceph_connection *con) 5530 { 5531 struct ceph_osd *osd = con->private; 5532 put_osd(osd); 5533 } 5534 5535 /* 5536 * authentication 5537 */ 5538 /* 5539 * Note: returned pointer is the address of a structure that's 5540 * managed separately. Caller must *not* attempt to free it. 5541 */ 5542 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 5543 int *proto, int force_new) 5544 { 5545 struct ceph_osd *o = con->private; 5546 struct ceph_osd_client *osdc = o->o_osdc; 5547 struct ceph_auth_client *ac = osdc->client->monc.auth; 5548 struct ceph_auth_handshake *auth = &o->o_auth; 5549 5550 if (force_new && auth->authorizer) { 5551 ceph_auth_destroy_authorizer(auth->authorizer); 5552 auth->authorizer = NULL; 5553 } 5554 if (!auth->authorizer) { 5555 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 5556 auth); 5557 if (ret) 5558 return ERR_PTR(ret); 5559 } else { 5560 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 5561 auth); 5562 if (ret) 5563 return ERR_PTR(ret); 5564 } 5565 *proto = ac->protocol; 5566 5567 return auth; 5568 } 5569 5570 static int add_authorizer_challenge(struct ceph_connection *con, 5571 void *challenge_buf, int challenge_buf_len) 5572 { 5573 struct ceph_osd *o = con->private; 5574 struct ceph_osd_client *osdc = o->o_osdc; 5575 struct ceph_auth_client *ac = osdc->client->monc.auth; 5576 5577 return ceph_auth_add_authorizer_challenge(ac, o->o_auth.authorizer, 5578 challenge_buf, challenge_buf_len); 5579 } 5580 5581 static int verify_authorizer_reply(struct ceph_connection *con) 5582 { 5583 struct ceph_osd *o = con->private; 5584 struct ceph_osd_client *osdc = o->o_osdc; 5585 struct ceph_auth_client *ac = osdc->client->monc.auth; 5586 5587 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer); 5588 } 5589 5590 static int invalidate_authorizer(struct ceph_connection *con) 5591 { 5592 struct ceph_osd *o = con->private; 5593 struct ceph_osd_client *osdc = o->o_osdc; 5594 struct ceph_auth_client *ac = osdc->client->monc.auth; 5595 5596 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 5597 return ceph_monc_validate_auth(&osdc->client->monc); 5598 } 5599 5600 static void osd_reencode_message(struct ceph_msg *msg) 5601 { 5602 int type = le16_to_cpu(msg->hdr.type); 5603 5604 if (type == CEPH_MSG_OSD_OP) 5605 encode_request_finish(msg); 5606 } 5607 5608 static int osd_sign_message(struct ceph_msg *msg) 5609 { 5610 struct ceph_osd *o = msg->con->private; 5611 struct ceph_auth_handshake *auth = &o->o_auth; 5612 5613 return ceph_auth_sign_message(auth, msg); 5614 } 5615 5616 static int osd_check_message_signature(struct ceph_msg *msg) 5617 { 5618 struct ceph_osd *o = msg->con->private; 5619 struct ceph_auth_handshake *auth = &o->o_auth; 5620 5621 return ceph_auth_check_message_signature(auth, msg); 5622 } 5623 5624 static const struct ceph_connection_operations osd_con_ops = { 5625 .get = get_osd_con, 5626 .put = put_osd_con, 5627 .dispatch = dispatch, 5628 .get_authorizer = get_authorizer, 5629 .add_authorizer_challenge = add_authorizer_challenge, 5630 .verify_authorizer_reply = verify_authorizer_reply, 5631 .invalidate_authorizer = invalidate_authorizer, 5632 .alloc_msg = alloc_msg, 5633 .reencode_message = osd_reencode_message, 5634 .sign_message = osd_sign_message, 5635 .check_message_signature = osd_check_message_signature, 5636 .fault = osd_fault, 5637 }; 5638