1 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/err.h> 6 #include <linux/highmem.h> 7 #include <linux/mm.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/uaccess.h> 11 #ifdef CONFIG_BLOCK 12 #include <linux/bio.h> 13 #endif 14 15 #include <linux/ceph/libceph.h> 16 #include <linux/ceph/osd_client.h> 17 #include <linux/ceph/messenger.h> 18 #include <linux/ceph/decode.h> 19 #include <linux/ceph/auth.h> 20 #include <linux/ceph/pagelist.h> 21 22 #define OSD_OPREPLY_FRONT_LEN 512 23 24 static struct kmem_cache *ceph_osd_request_cache; 25 26 static const struct ceph_connection_operations osd_con_ops; 27 28 /* 29 * Implement client access to distributed object storage cluster. 30 * 31 * All data objects are stored within a cluster/cloud of OSDs, or 32 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 33 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 34 * remote daemons serving up and coordinating consistent and safe 35 * access to storage. 36 * 37 * Cluster membership and the mapping of data objects onto storage devices 38 * are described by the osd map. 39 * 40 * We keep track of pending OSD requests (read, write), resubmit 41 * requests to different OSDs when the cluster topology/data layout 42 * change, or retry the affected requests when the communications 43 * channel with an OSD is reset. 44 */ 45 46 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); 47 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); 48 static void link_linger(struct ceph_osd *osd, 49 struct ceph_osd_linger_request *lreq); 50 static void unlink_linger(struct ceph_osd *osd, 51 struct ceph_osd_linger_request *lreq); 52 53 #if 1 54 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) 55 { 56 bool wrlocked = true; 57 58 if (unlikely(down_read_trylock(sem))) { 59 wrlocked = false; 60 up_read(sem); 61 } 62 63 return wrlocked; 64 } 65 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) 66 { 67 WARN_ON(!rwsem_is_locked(&osdc->lock)); 68 } 69 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) 70 { 71 WARN_ON(!rwsem_is_wrlocked(&osdc->lock)); 72 } 73 static inline void verify_osd_locked(struct ceph_osd *osd) 74 { 75 struct ceph_osd_client *osdc = osd->o_osdc; 76 77 WARN_ON(!(mutex_is_locked(&osd->lock) && 78 rwsem_is_locked(&osdc->lock)) && 79 !rwsem_is_wrlocked(&osdc->lock)); 80 } 81 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) 82 { 83 WARN_ON(!mutex_is_locked(&lreq->lock)); 84 } 85 #else 86 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } 87 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } 88 static inline void verify_osd_locked(struct ceph_osd *osd) { } 89 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } 90 #endif 91 92 /* 93 * calculate the mapping of a file extent onto an object, and fill out the 94 * request accordingly. shorten extent as necessary if it crosses an 95 * object boundary. 96 * 97 * fill osd op in request message. 98 */ 99 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 100 u64 *objnum, u64 *objoff, u64 *objlen) 101 { 102 u64 orig_len = *plen; 103 int r; 104 105 /* object extent? */ 106 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 107 objoff, objlen); 108 if (r < 0) 109 return r; 110 if (*objlen < orig_len) { 111 *plen = *objlen; 112 dout(" skipping last %llu, final file extent %llu~%llu\n", 113 orig_len - *plen, off, *plen); 114 } 115 116 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 117 118 return 0; 119 } 120 121 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 122 { 123 memset(osd_data, 0, sizeof (*osd_data)); 124 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 125 } 126 127 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 128 struct page **pages, u64 length, u32 alignment, 129 bool pages_from_pool, bool own_pages) 130 { 131 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 132 osd_data->pages = pages; 133 osd_data->length = length; 134 osd_data->alignment = alignment; 135 osd_data->pages_from_pool = pages_from_pool; 136 osd_data->own_pages = own_pages; 137 } 138 139 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 140 struct ceph_pagelist *pagelist) 141 { 142 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 143 osd_data->pagelist = pagelist; 144 } 145 146 #ifdef CONFIG_BLOCK 147 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 148 struct bio *bio, size_t bio_length) 149 { 150 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 151 osd_data->bio = bio; 152 osd_data->bio_length = bio_length; 153 } 154 #endif /* CONFIG_BLOCK */ 155 156 #define osd_req_op_data(oreq, whch, typ, fld) \ 157 ({ \ 158 struct ceph_osd_request *__oreq = (oreq); \ 159 unsigned int __whch = (whch); \ 160 BUG_ON(__whch >= __oreq->r_num_ops); \ 161 &__oreq->r_ops[__whch].typ.fld; \ 162 }) 163 164 static struct ceph_osd_data * 165 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 166 { 167 BUG_ON(which >= osd_req->r_num_ops); 168 169 return &osd_req->r_ops[which].raw_data_in; 170 } 171 172 struct ceph_osd_data * 173 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 174 unsigned int which) 175 { 176 return osd_req_op_data(osd_req, which, extent, osd_data); 177 } 178 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 179 180 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 181 unsigned int which, struct page **pages, 182 u64 length, u32 alignment, 183 bool pages_from_pool, bool own_pages) 184 { 185 struct ceph_osd_data *osd_data; 186 187 osd_data = osd_req_op_raw_data_in(osd_req, which); 188 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 189 pages_from_pool, own_pages); 190 } 191 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 192 193 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 194 unsigned int which, struct page **pages, 195 u64 length, u32 alignment, 196 bool pages_from_pool, bool own_pages) 197 { 198 struct ceph_osd_data *osd_data; 199 200 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 201 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 202 pages_from_pool, own_pages); 203 } 204 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 205 206 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 207 unsigned int which, struct ceph_pagelist *pagelist) 208 { 209 struct ceph_osd_data *osd_data; 210 211 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 212 ceph_osd_data_pagelist_init(osd_data, pagelist); 213 } 214 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 215 216 #ifdef CONFIG_BLOCK 217 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 218 unsigned int which, struct bio *bio, size_t bio_length) 219 { 220 struct ceph_osd_data *osd_data; 221 222 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 223 ceph_osd_data_bio_init(osd_data, bio, bio_length); 224 } 225 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 226 #endif /* CONFIG_BLOCK */ 227 228 static void osd_req_op_cls_request_info_pagelist( 229 struct ceph_osd_request *osd_req, 230 unsigned int which, struct ceph_pagelist *pagelist) 231 { 232 struct ceph_osd_data *osd_data; 233 234 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 235 ceph_osd_data_pagelist_init(osd_data, pagelist); 236 } 237 238 void osd_req_op_cls_request_data_pagelist( 239 struct ceph_osd_request *osd_req, 240 unsigned int which, struct ceph_pagelist *pagelist) 241 { 242 struct ceph_osd_data *osd_data; 243 244 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 245 ceph_osd_data_pagelist_init(osd_data, pagelist); 246 osd_req->r_ops[which].cls.indata_len += pagelist->length; 247 osd_req->r_ops[which].indata_len += pagelist->length; 248 } 249 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 250 251 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 252 unsigned int which, struct page **pages, u64 length, 253 u32 alignment, bool pages_from_pool, bool own_pages) 254 { 255 struct ceph_osd_data *osd_data; 256 257 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 258 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 259 pages_from_pool, own_pages); 260 osd_req->r_ops[which].cls.indata_len += length; 261 osd_req->r_ops[which].indata_len += length; 262 } 263 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 264 265 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 266 unsigned int which, struct page **pages, u64 length, 267 u32 alignment, bool pages_from_pool, bool own_pages) 268 { 269 struct ceph_osd_data *osd_data; 270 271 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 272 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 273 pages_from_pool, own_pages); 274 } 275 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 276 277 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 278 { 279 switch (osd_data->type) { 280 case CEPH_OSD_DATA_TYPE_NONE: 281 return 0; 282 case CEPH_OSD_DATA_TYPE_PAGES: 283 return osd_data->length; 284 case CEPH_OSD_DATA_TYPE_PAGELIST: 285 return (u64)osd_data->pagelist->length; 286 #ifdef CONFIG_BLOCK 287 case CEPH_OSD_DATA_TYPE_BIO: 288 return (u64)osd_data->bio_length; 289 #endif /* CONFIG_BLOCK */ 290 default: 291 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 292 return 0; 293 } 294 } 295 296 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 297 { 298 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 299 int num_pages; 300 301 num_pages = calc_pages_for((u64)osd_data->alignment, 302 (u64)osd_data->length); 303 ceph_release_page_vector(osd_data->pages, num_pages); 304 } 305 ceph_osd_data_init(osd_data); 306 } 307 308 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 309 unsigned int which) 310 { 311 struct ceph_osd_req_op *op; 312 313 BUG_ON(which >= osd_req->r_num_ops); 314 op = &osd_req->r_ops[which]; 315 316 switch (op->op) { 317 case CEPH_OSD_OP_READ: 318 case CEPH_OSD_OP_WRITE: 319 case CEPH_OSD_OP_WRITEFULL: 320 ceph_osd_data_release(&op->extent.osd_data); 321 break; 322 case CEPH_OSD_OP_CALL: 323 ceph_osd_data_release(&op->cls.request_info); 324 ceph_osd_data_release(&op->cls.request_data); 325 ceph_osd_data_release(&op->cls.response_data); 326 break; 327 case CEPH_OSD_OP_SETXATTR: 328 case CEPH_OSD_OP_CMPXATTR: 329 ceph_osd_data_release(&op->xattr.osd_data); 330 break; 331 case CEPH_OSD_OP_STAT: 332 ceph_osd_data_release(&op->raw_data_in); 333 break; 334 case CEPH_OSD_OP_NOTIFY_ACK: 335 ceph_osd_data_release(&op->notify_ack.request_data); 336 break; 337 case CEPH_OSD_OP_NOTIFY: 338 ceph_osd_data_release(&op->notify.request_data); 339 ceph_osd_data_release(&op->notify.response_data); 340 break; 341 case CEPH_OSD_OP_LIST_WATCHERS: 342 ceph_osd_data_release(&op->list_watchers.response_data); 343 break; 344 default: 345 break; 346 } 347 } 348 349 /* 350 * Assumes @t is zero-initialized. 351 */ 352 static void target_init(struct ceph_osd_request_target *t) 353 { 354 ceph_oid_init(&t->base_oid); 355 ceph_oloc_init(&t->base_oloc); 356 ceph_oid_init(&t->target_oid); 357 ceph_oloc_init(&t->target_oloc); 358 359 ceph_osds_init(&t->acting); 360 ceph_osds_init(&t->up); 361 t->size = -1; 362 t->min_size = -1; 363 364 t->osd = CEPH_HOMELESS_OSD; 365 } 366 367 static void target_copy(struct ceph_osd_request_target *dest, 368 const struct ceph_osd_request_target *src) 369 { 370 ceph_oid_copy(&dest->base_oid, &src->base_oid); 371 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); 372 ceph_oid_copy(&dest->target_oid, &src->target_oid); 373 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); 374 375 dest->pgid = src->pgid; /* struct */ 376 dest->spgid = src->spgid; /* struct */ 377 dest->pg_num = src->pg_num; 378 dest->pg_num_mask = src->pg_num_mask; 379 ceph_osds_copy(&dest->acting, &src->acting); 380 ceph_osds_copy(&dest->up, &src->up); 381 dest->size = src->size; 382 dest->min_size = src->min_size; 383 dest->sort_bitwise = src->sort_bitwise; 384 385 dest->flags = src->flags; 386 dest->paused = src->paused; 387 388 dest->last_force_resend = src->last_force_resend; 389 390 dest->osd = src->osd; 391 } 392 393 static void target_destroy(struct ceph_osd_request_target *t) 394 { 395 ceph_oid_destroy(&t->base_oid); 396 ceph_oloc_destroy(&t->base_oloc); 397 ceph_oid_destroy(&t->target_oid); 398 ceph_oloc_destroy(&t->target_oloc); 399 } 400 401 /* 402 * requests 403 */ 404 static void request_release_checks(struct ceph_osd_request *req) 405 { 406 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 407 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); 408 WARN_ON(!list_empty(&req->r_unsafe_item)); 409 WARN_ON(req->r_osd); 410 } 411 412 static void ceph_osdc_release_request(struct kref *kref) 413 { 414 struct ceph_osd_request *req = container_of(kref, 415 struct ceph_osd_request, r_kref); 416 unsigned int which; 417 418 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 419 req->r_request, req->r_reply); 420 request_release_checks(req); 421 422 if (req->r_request) 423 ceph_msg_put(req->r_request); 424 if (req->r_reply) 425 ceph_msg_put(req->r_reply); 426 427 for (which = 0; which < req->r_num_ops; which++) 428 osd_req_op_data_release(req, which); 429 430 target_destroy(&req->r_t); 431 ceph_put_snap_context(req->r_snapc); 432 433 if (req->r_mempool) 434 mempool_free(req, req->r_osdc->req_mempool); 435 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 436 kmem_cache_free(ceph_osd_request_cache, req); 437 else 438 kfree(req); 439 } 440 441 void ceph_osdc_get_request(struct ceph_osd_request *req) 442 { 443 dout("%s %p (was %d)\n", __func__, req, 444 kref_read(&req->r_kref)); 445 kref_get(&req->r_kref); 446 } 447 EXPORT_SYMBOL(ceph_osdc_get_request); 448 449 void ceph_osdc_put_request(struct ceph_osd_request *req) 450 { 451 if (req) { 452 dout("%s %p (was %d)\n", __func__, req, 453 kref_read(&req->r_kref)); 454 kref_put(&req->r_kref, ceph_osdc_release_request); 455 } 456 } 457 EXPORT_SYMBOL(ceph_osdc_put_request); 458 459 static void request_init(struct ceph_osd_request *req) 460 { 461 /* req only, each op is zeroed in _osd_req_op_init() */ 462 memset(req, 0, sizeof(*req)); 463 464 kref_init(&req->r_kref); 465 init_completion(&req->r_completion); 466 RB_CLEAR_NODE(&req->r_node); 467 RB_CLEAR_NODE(&req->r_mc_node); 468 INIT_LIST_HEAD(&req->r_unsafe_item); 469 470 target_init(&req->r_t); 471 } 472 473 /* 474 * This is ugly, but it allows us to reuse linger registration and ping 475 * requests, keeping the structure of the code around send_linger{_ping}() 476 * reasonable. Setting up a min_nr=2 mempool for each linger request 477 * and dealing with copying ops (this blasts req only, watch op remains 478 * intact) isn't any better. 479 */ 480 static void request_reinit(struct ceph_osd_request *req) 481 { 482 struct ceph_osd_client *osdc = req->r_osdc; 483 bool mempool = req->r_mempool; 484 unsigned int num_ops = req->r_num_ops; 485 u64 snapid = req->r_snapid; 486 struct ceph_snap_context *snapc = req->r_snapc; 487 bool linger = req->r_linger; 488 struct ceph_msg *request_msg = req->r_request; 489 struct ceph_msg *reply_msg = req->r_reply; 490 491 dout("%s req %p\n", __func__, req); 492 WARN_ON(kref_read(&req->r_kref) != 1); 493 request_release_checks(req); 494 495 WARN_ON(kref_read(&request_msg->kref) != 1); 496 WARN_ON(kref_read(&reply_msg->kref) != 1); 497 target_destroy(&req->r_t); 498 499 request_init(req); 500 req->r_osdc = osdc; 501 req->r_mempool = mempool; 502 req->r_num_ops = num_ops; 503 req->r_snapid = snapid; 504 req->r_snapc = snapc; 505 req->r_linger = linger; 506 req->r_request = request_msg; 507 req->r_reply = reply_msg; 508 } 509 510 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 511 struct ceph_snap_context *snapc, 512 unsigned int num_ops, 513 bool use_mempool, 514 gfp_t gfp_flags) 515 { 516 struct ceph_osd_request *req; 517 518 if (use_mempool) { 519 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 520 req = mempool_alloc(osdc->req_mempool, gfp_flags); 521 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 522 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 523 } else { 524 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 525 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]), 526 gfp_flags); 527 } 528 if (unlikely(!req)) 529 return NULL; 530 531 request_init(req); 532 req->r_osdc = osdc; 533 req->r_mempool = use_mempool; 534 req->r_num_ops = num_ops; 535 req->r_snapid = CEPH_NOSNAP; 536 req->r_snapc = ceph_get_snap_context(snapc); 537 538 dout("%s req %p\n", __func__, req); 539 return req; 540 } 541 EXPORT_SYMBOL(ceph_osdc_alloc_request); 542 543 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) 544 { 545 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 546 } 547 548 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 549 { 550 struct ceph_osd_client *osdc = req->r_osdc; 551 struct ceph_msg *msg; 552 int msg_size; 553 554 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 555 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 556 557 /* create request message */ 558 msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */ 559 msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */ 560 msg_size += CEPH_ENCODING_START_BLK_LEN + 561 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ 562 msg_size += 1 + 8 + 4 + 4; /* pgid */ 563 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 564 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 565 msg_size += 8; /* snapid */ 566 msg_size += 8; /* snap_seq */ 567 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 568 msg_size += 4; /* retry_attempt */ 569 570 if (req->r_mempool) 571 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 572 else 573 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); 574 if (!msg) 575 return -ENOMEM; 576 577 memset(msg->front.iov_base, 0, msg->front.iov_len); 578 req->r_request = msg; 579 580 /* create reply message */ 581 msg_size = OSD_OPREPLY_FRONT_LEN; 582 msg_size += req->r_base_oid.name_len; 583 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 584 585 if (req->r_mempool) 586 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 587 else 588 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); 589 if (!msg) 590 return -ENOMEM; 591 592 req->r_reply = msg; 593 594 return 0; 595 } 596 EXPORT_SYMBOL(ceph_osdc_alloc_messages); 597 598 static bool osd_req_opcode_valid(u16 opcode) 599 { 600 switch (opcode) { 601 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 602 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 603 #undef GENERATE_CASE 604 default: 605 return false; 606 } 607 } 608 609 /* 610 * This is an osd op init function for opcodes that have no data or 611 * other information associated with them. It also serves as a 612 * common init routine for all the other init functions, below. 613 */ 614 static struct ceph_osd_req_op * 615 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 616 u16 opcode, u32 flags) 617 { 618 struct ceph_osd_req_op *op; 619 620 BUG_ON(which >= osd_req->r_num_ops); 621 BUG_ON(!osd_req_opcode_valid(opcode)); 622 623 op = &osd_req->r_ops[which]; 624 memset(op, 0, sizeof (*op)); 625 op->op = opcode; 626 op->flags = flags; 627 628 return op; 629 } 630 631 void osd_req_op_init(struct ceph_osd_request *osd_req, 632 unsigned int which, u16 opcode, u32 flags) 633 { 634 (void)_osd_req_op_init(osd_req, which, opcode, flags); 635 } 636 EXPORT_SYMBOL(osd_req_op_init); 637 638 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 639 unsigned int which, u16 opcode, 640 u64 offset, u64 length, 641 u64 truncate_size, u32 truncate_seq) 642 { 643 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 644 opcode, 0); 645 size_t payload_len = 0; 646 647 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 648 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 649 opcode != CEPH_OSD_OP_TRUNCATE); 650 651 op->extent.offset = offset; 652 op->extent.length = length; 653 op->extent.truncate_size = truncate_size; 654 op->extent.truncate_seq = truncate_seq; 655 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 656 payload_len += length; 657 658 op->indata_len = payload_len; 659 } 660 EXPORT_SYMBOL(osd_req_op_extent_init); 661 662 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 663 unsigned int which, u64 length) 664 { 665 struct ceph_osd_req_op *op; 666 u64 previous; 667 668 BUG_ON(which >= osd_req->r_num_ops); 669 op = &osd_req->r_ops[which]; 670 previous = op->extent.length; 671 672 if (length == previous) 673 return; /* Nothing to do */ 674 BUG_ON(length > previous); 675 676 op->extent.length = length; 677 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 678 op->indata_len -= previous - length; 679 } 680 EXPORT_SYMBOL(osd_req_op_extent_update); 681 682 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 683 unsigned int which, u64 offset_inc) 684 { 685 struct ceph_osd_req_op *op, *prev_op; 686 687 BUG_ON(which + 1 >= osd_req->r_num_ops); 688 689 prev_op = &osd_req->r_ops[which]; 690 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 691 /* dup previous one */ 692 op->indata_len = prev_op->indata_len; 693 op->outdata_len = prev_op->outdata_len; 694 op->extent = prev_op->extent; 695 /* adjust offset */ 696 op->extent.offset += offset_inc; 697 op->extent.length -= offset_inc; 698 699 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 700 op->indata_len -= offset_inc; 701 } 702 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 703 704 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 705 u16 opcode, const char *class, const char *method) 706 { 707 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 708 opcode, 0); 709 struct ceph_pagelist *pagelist; 710 size_t payload_len = 0; 711 size_t size; 712 713 BUG_ON(opcode != CEPH_OSD_OP_CALL); 714 715 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 716 BUG_ON(!pagelist); 717 ceph_pagelist_init(pagelist); 718 719 op->cls.class_name = class; 720 size = strlen(class); 721 BUG_ON(size > (size_t) U8_MAX); 722 op->cls.class_len = size; 723 ceph_pagelist_append(pagelist, class, size); 724 payload_len += size; 725 726 op->cls.method_name = method; 727 size = strlen(method); 728 BUG_ON(size > (size_t) U8_MAX); 729 op->cls.method_len = size; 730 ceph_pagelist_append(pagelist, method, size); 731 payload_len += size; 732 733 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 734 735 op->indata_len = payload_len; 736 } 737 EXPORT_SYMBOL(osd_req_op_cls_init); 738 739 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 740 u16 opcode, const char *name, const void *value, 741 size_t size, u8 cmp_op, u8 cmp_mode) 742 { 743 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 744 opcode, 0); 745 struct ceph_pagelist *pagelist; 746 size_t payload_len; 747 748 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 749 750 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 751 if (!pagelist) 752 return -ENOMEM; 753 754 ceph_pagelist_init(pagelist); 755 756 payload_len = strlen(name); 757 op->xattr.name_len = payload_len; 758 ceph_pagelist_append(pagelist, name, payload_len); 759 760 op->xattr.value_len = size; 761 ceph_pagelist_append(pagelist, value, size); 762 payload_len += size; 763 764 op->xattr.cmp_op = cmp_op; 765 op->xattr.cmp_mode = cmp_mode; 766 767 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 768 op->indata_len = payload_len; 769 return 0; 770 } 771 EXPORT_SYMBOL(osd_req_op_xattr_init); 772 773 /* 774 * @watch_opcode: CEPH_OSD_WATCH_OP_* 775 */ 776 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, 777 u64 cookie, u8 watch_opcode) 778 { 779 struct ceph_osd_req_op *op; 780 781 op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); 782 op->watch.cookie = cookie; 783 op->watch.op = watch_opcode; 784 op->watch.gen = 0; 785 } 786 787 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 788 unsigned int which, 789 u64 expected_object_size, 790 u64 expected_write_size) 791 { 792 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 793 CEPH_OSD_OP_SETALLOCHINT, 794 0); 795 796 op->alloc_hint.expected_object_size = expected_object_size; 797 op->alloc_hint.expected_write_size = expected_write_size; 798 799 /* 800 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 801 * not worth a feature bit. Set FAILOK per-op flag to make 802 * sure older osds don't trip over an unsupported opcode. 803 */ 804 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 805 } 806 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 807 808 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 809 struct ceph_osd_data *osd_data) 810 { 811 u64 length = ceph_osd_data_length(osd_data); 812 813 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 814 BUG_ON(length > (u64) SIZE_MAX); 815 if (length) 816 ceph_msg_data_add_pages(msg, osd_data->pages, 817 length, osd_data->alignment); 818 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 819 BUG_ON(!length); 820 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 821 #ifdef CONFIG_BLOCK 822 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 823 ceph_msg_data_add_bio(msg, osd_data->bio, length); 824 #endif 825 } else { 826 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 827 } 828 } 829 830 static u32 osd_req_encode_op(struct ceph_osd_op *dst, 831 const struct ceph_osd_req_op *src) 832 { 833 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 834 pr_err("unrecognized osd opcode %d\n", src->op); 835 836 return 0; 837 } 838 839 switch (src->op) { 840 case CEPH_OSD_OP_STAT: 841 break; 842 case CEPH_OSD_OP_READ: 843 case CEPH_OSD_OP_WRITE: 844 case CEPH_OSD_OP_WRITEFULL: 845 case CEPH_OSD_OP_ZERO: 846 case CEPH_OSD_OP_TRUNCATE: 847 dst->extent.offset = cpu_to_le64(src->extent.offset); 848 dst->extent.length = cpu_to_le64(src->extent.length); 849 dst->extent.truncate_size = 850 cpu_to_le64(src->extent.truncate_size); 851 dst->extent.truncate_seq = 852 cpu_to_le32(src->extent.truncate_seq); 853 break; 854 case CEPH_OSD_OP_CALL: 855 dst->cls.class_len = src->cls.class_len; 856 dst->cls.method_len = src->cls.method_len; 857 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 858 break; 859 case CEPH_OSD_OP_STARTSYNC: 860 break; 861 case CEPH_OSD_OP_WATCH: 862 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 863 dst->watch.ver = cpu_to_le64(0); 864 dst->watch.op = src->watch.op; 865 dst->watch.gen = cpu_to_le32(src->watch.gen); 866 break; 867 case CEPH_OSD_OP_NOTIFY_ACK: 868 break; 869 case CEPH_OSD_OP_NOTIFY: 870 dst->notify.cookie = cpu_to_le64(src->notify.cookie); 871 break; 872 case CEPH_OSD_OP_LIST_WATCHERS: 873 break; 874 case CEPH_OSD_OP_SETALLOCHINT: 875 dst->alloc_hint.expected_object_size = 876 cpu_to_le64(src->alloc_hint.expected_object_size); 877 dst->alloc_hint.expected_write_size = 878 cpu_to_le64(src->alloc_hint.expected_write_size); 879 break; 880 case CEPH_OSD_OP_SETXATTR: 881 case CEPH_OSD_OP_CMPXATTR: 882 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 883 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 884 dst->xattr.cmp_op = src->xattr.cmp_op; 885 dst->xattr.cmp_mode = src->xattr.cmp_mode; 886 break; 887 case CEPH_OSD_OP_CREATE: 888 case CEPH_OSD_OP_DELETE: 889 break; 890 default: 891 pr_err("unsupported osd opcode %s\n", 892 ceph_osd_op_name(src->op)); 893 WARN_ON(1); 894 895 return 0; 896 } 897 898 dst->op = cpu_to_le16(src->op); 899 dst->flags = cpu_to_le32(src->flags); 900 dst->payload_len = cpu_to_le32(src->indata_len); 901 902 return src->indata_len; 903 } 904 905 /* 906 * build new request AND message, calculate layout, and adjust file 907 * extent as needed. 908 * 909 * if the file was recently truncated, we include information about its 910 * old and new size so that the object can be updated appropriately. (we 911 * avoid synchronously deleting truncated objects because it's slow.) 912 * 913 * if @do_sync, include a 'startsync' command so that the osd will flush 914 * data quickly. 915 */ 916 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 917 struct ceph_file_layout *layout, 918 struct ceph_vino vino, 919 u64 off, u64 *plen, 920 unsigned int which, int num_ops, 921 int opcode, int flags, 922 struct ceph_snap_context *snapc, 923 u32 truncate_seq, 924 u64 truncate_size, 925 bool use_mempool) 926 { 927 struct ceph_osd_request *req; 928 u64 objnum = 0; 929 u64 objoff = 0; 930 u64 objlen = 0; 931 int r; 932 933 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 934 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 935 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 936 937 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 938 GFP_NOFS); 939 if (!req) { 940 r = -ENOMEM; 941 goto fail; 942 } 943 944 /* calculate max write size */ 945 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 946 if (r) 947 goto fail; 948 949 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 950 osd_req_op_init(req, which, opcode, 0); 951 } else { 952 u32 object_size = layout->object_size; 953 u32 object_base = off - objoff; 954 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 955 if (truncate_size <= object_base) { 956 truncate_size = 0; 957 } else { 958 truncate_size -= object_base; 959 if (truncate_size > object_size) 960 truncate_size = object_size; 961 } 962 } 963 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 964 truncate_size, truncate_seq); 965 } 966 967 req->r_abort_on_full = true; 968 req->r_flags = flags; 969 req->r_base_oloc.pool = layout->pool_id; 970 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 971 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 972 973 req->r_snapid = vino.snap; 974 if (flags & CEPH_OSD_FLAG_WRITE) 975 req->r_data_offset = off; 976 977 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 978 if (r) 979 goto fail; 980 981 return req; 982 983 fail: 984 ceph_osdc_put_request(req); 985 return ERR_PTR(r); 986 } 987 EXPORT_SYMBOL(ceph_osdc_new_request); 988 989 /* 990 * We keep osd requests in an rbtree, sorted by ->r_tid. 991 */ 992 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 993 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) 994 995 static bool osd_homeless(struct ceph_osd *osd) 996 { 997 return osd->o_osd == CEPH_HOMELESS_OSD; 998 } 999 1000 static bool osd_registered(struct ceph_osd *osd) 1001 { 1002 verify_osdc_locked(osd->o_osdc); 1003 1004 return !RB_EMPTY_NODE(&osd->o_node); 1005 } 1006 1007 /* 1008 * Assumes @osd is zero-initialized. 1009 */ 1010 static void osd_init(struct ceph_osd *osd) 1011 { 1012 refcount_set(&osd->o_ref, 1); 1013 RB_CLEAR_NODE(&osd->o_node); 1014 osd->o_requests = RB_ROOT; 1015 osd->o_linger_requests = RB_ROOT; 1016 INIT_LIST_HEAD(&osd->o_osd_lru); 1017 INIT_LIST_HEAD(&osd->o_keepalive_item); 1018 osd->o_incarnation = 1; 1019 mutex_init(&osd->lock); 1020 } 1021 1022 static void osd_cleanup(struct ceph_osd *osd) 1023 { 1024 WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); 1025 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 1026 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 1027 WARN_ON(!list_empty(&osd->o_osd_lru)); 1028 WARN_ON(!list_empty(&osd->o_keepalive_item)); 1029 1030 if (osd->o_auth.authorizer) { 1031 WARN_ON(osd_homeless(osd)); 1032 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1033 } 1034 } 1035 1036 /* 1037 * Track open sessions with osds. 1038 */ 1039 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1040 { 1041 struct ceph_osd *osd; 1042 1043 WARN_ON(onum == CEPH_HOMELESS_OSD); 1044 1045 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL); 1046 osd_init(osd); 1047 osd->o_osdc = osdc; 1048 osd->o_osd = onum; 1049 1050 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1051 1052 return osd; 1053 } 1054 1055 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1056 { 1057 if (refcount_inc_not_zero(&osd->o_ref)) { 1058 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1, 1059 refcount_read(&osd->o_ref)); 1060 return osd; 1061 } else { 1062 dout("get_osd %p FAIL\n", osd); 1063 return NULL; 1064 } 1065 } 1066 1067 static void put_osd(struct ceph_osd *osd) 1068 { 1069 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref), 1070 refcount_read(&osd->o_ref) - 1); 1071 if (refcount_dec_and_test(&osd->o_ref)) { 1072 osd_cleanup(osd); 1073 kfree(osd); 1074 } 1075 } 1076 1077 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) 1078 1079 static void __move_osd_to_lru(struct ceph_osd *osd) 1080 { 1081 struct ceph_osd_client *osdc = osd->o_osdc; 1082 1083 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1084 BUG_ON(!list_empty(&osd->o_osd_lru)); 1085 1086 spin_lock(&osdc->osd_lru_lock); 1087 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1088 spin_unlock(&osdc->osd_lru_lock); 1089 1090 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1091 } 1092 1093 static void maybe_move_osd_to_lru(struct ceph_osd *osd) 1094 { 1095 if (RB_EMPTY_ROOT(&osd->o_requests) && 1096 RB_EMPTY_ROOT(&osd->o_linger_requests)) 1097 __move_osd_to_lru(osd); 1098 } 1099 1100 static void __remove_osd_from_lru(struct ceph_osd *osd) 1101 { 1102 struct ceph_osd_client *osdc = osd->o_osdc; 1103 1104 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1105 1106 spin_lock(&osdc->osd_lru_lock); 1107 if (!list_empty(&osd->o_osd_lru)) 1108 list_del_init(&osd->o_osd_lru); 1109 spin_unlock(&osdc->osd_lru_lock); 1110 } 1111 1112 /* 1113 * Close the connection and assign any leftover requests to the 1114 * homeless session. 1115 */ 1116 static void close_osd(struct ceph_osd *osd) 1117 { 1118 struct ceph_osd_client *osdc = osd->o_osdc; 1119 struct rb_node *n; 1120 1121 verify_osdc_wrlocked(osdc); 1122 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1123 1124 ceph_con_close(&osd->o_con); 1125 1126 for (n = rb_first(&osd->o_requests); n; ) { 1127 struct ceph_osd_request *req = 1128 rb_entry(n, struct ceph_osd_request, r_node); 1129 1130 n = rb_next(n); /* unlink_request() */ 1131 1132 dout(" reassigning req %p tid %llu\n", req, req->r_tid); 1133 unlink_request(osd, req); 1134 link_request(&osdc->homeless_osd, req); 1135 } 1136 for (n = rb_first(&osd->o_linger_requests); n; ) { 1137 struct ceph_osd_linger_request *lreq = 1138 rb_entry(n, struct ceph_osd_linger_request, node); 1139 1140 n = rb_next(n); /* unlink_linger() */ 1141 1142 dout(" reassigning lreq %p linger_id %llu\n", lreq, 1143 lreq->linger_id); 1144 unlink_linger(osd, lreq); 1145 link_linger(&osdc->homeless_osd, lreq); 1146 } 1147 1148 __remove_osd_from_lru(osd); 1149 erase_osd(&osdc->osds, osd); 1150 put_osd(osd); 1151 } 1152 1153 /* 1154 * reset osd connect 1155 */ 1156 static int reopen_osd(struct ceph_osd *osd) 1157 { 1158 struct ceph_entity_addr *peer_addr; 1159 1160 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1161 1162 if (RB_EMPTY_ROOT(&osd->o_requests) && 1163 RB_EMPTY_ROOT(&osd->o_linger_requests)) { 1164 close_osd(osd); 1165 return -ENODEV; 1166 } 1167 1168 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd]; 1169 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1170 !ceph_con_opened(&osd->o_con)) { 1171 struct rb_node *n; 1172 1173 dout("osd addr hasn't changed and connection never opened, " 1174 "letting msgr retry\n"); 1175 /* touch each r_stamp for handle_timeout()'s benfit */ 1176 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 1177 struct ceph_osd_request *req = 1178 rb_entry(n, struct ceph_osd_request, r_node); 1179 req->r_stamp = jiffies; 1180 } 1181 1182 return -EAGAIN; 1183 } 1184 1185 ceph_con_close(&osd->o_con); 1186 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1187 osd->o_incarnation++; 1188 1189 return 0; 1190 } 1191 1192 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o, 1193 bool wrlocked) 1194 { 1195 struct ceph_osd *osd; 1196 1197 if (wrlocked) 1198 verify_osdc_wrlocked(osdc); 1199 else 1200 verify_osdc_locked(osdc); 1201 1202 if (o != CEPH_HOMELESS_OSD) 1203 osd = lookup_osd(&osdc->osds, o); 1204 else 1205 osd = &osdc->homeless_osd; 1206 if (!osd) { 1207 if (!wrlocked) 1208 return ERR_PTR(-EAGAIN); 1209 1210 osd = create_osd(osdc, o); 1211 insert_osd(&osdc->osds, osd); 1212 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, 1213 &osdc->osdmap->osd_addr[osd->o_osd]); 1214 } 1215 1216 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd); 1217 return osd; 1218 } 1219 1220 /* 1221 * Create request <-> OSD session relation. 1222 * 1223 * @req has to be assigned a tid, @osd may be homeless. 1224 */ 1225 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1226 { 1227 verify_osd_locked(osd); 1228 WARN_ON(!req->r_tid || req->r_osd); 1229 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1230 req, req->r_tid); 1231 1232 if (!osd_homeless(osd)) 1233 __remove_osd_from_lru(osd); 1234 else 1235 atomic_inc(&osd->o_osdc->num_homeless); 1236 1237 get_osd(osd); 1238 insert_request(&osd->o_requests, req); 1239 req->r_osd = osd; 1240 } 1241 1242 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1243 { 1244 verify_osd_locked(osd); 1245 WARN_ON(req->r_osd != osd); 1246 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1247 req, req->r_tid); 1248 1249 req->r_osd = NULL; 1250 erase_request(&osd->o_requests, req); 1251 put_osd(osd); 1252 1253 if (!osd_homeless(osd)) 1254 maybe_move_osd_to_lru(osd); 1255 else 1256 atomic_dec(&osd->o_osdc->num_homeless); 1257 } 1258 1259 static bool __pool_full(struct ceph_pg_pool_info *pi) 1260 { 1261 return pi->flags & CEPH_POOL_FLAG_FULL; 1262 } 1263 1264 static bool have_pool_full(struct ceph_osd_client *osdc) 1265 { 1266 struct rb_node *n; 1267 1268 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 1269 struct ceph_pg_pool_info *pi = 1270 rb_entry(n, struct ceph_pg_pool_info, node); 1271 1272 if (__pool_full(pi)) 1273 return true; 1274 } 1275 1276 return false; 1277 } 1278 1279 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id) 1280 { 1281 struct ceph_pg_pool_info *pi; 1282 1283 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 1284 if (!pi) 1285 return false; 1286 1287 return __pool_full(pi); 1288 } 1289 1290 /* 1291 * Returns whether a request should be blocked from being sent 1292 * based on the current osdmap and osd_client settings. 1293 */ 1294 static bool target_should_be_paused(struct ceph_osd_client *osdc, 1295 const struct ceph_osd_request_target *t, 1296 struct ceph_pg_pool_info *pi) 1297 { 1298 bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 1299 bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 1300 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1301 __pool_full(pi); 1302 1303 WARN_ON(pi->id != t->base_oloc.pool); 1304 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || 1305 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || 1306 (osdc->osdmap->epoch < osdc->epoch_barrier); 1307 } 1308 1309 enum calc_target_result { 1310 CALC_TARGET_NO_ACTION = 0, 1311 CALC_TARGET_NEED_RESEND, 1312 CALC_TARGET_POOL_DNE, 1313 }; 1314 1315 static enum calc_target_result calc_target(struct ceph_osd_client *osdc, 1316 struct ceph_osd_request_target *t, 1317 bool any_change) 1318 { 1319 struct ceph_pg_pool_info *pi; 1320 struct ceph_pg pgid, last_pgid; 1321 struct ceph_osds up, acting; 1322 bool force_resend = false; 1323 bool need_check_tiering = false; 1324 bool need_resend = false; 1325 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE); 1326 enum calc_target_result ct_res; 1327 int ret; 1328 1329 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1330 if (!pi) { 1331 t->osd = CEPH_HOMELESS_OSD; 1332 ct_res = CALC_TARGET_POOL_DNE; 1333 goto out; 1334 } 1335 1336 if (osdc->osdmap->epoch == pi->last_force_request_resend) { 1337 if (t->last_force_resend < pi->last_force_request_resend) { 1338 t->last_force_resend = pi->last_force_request_resend; 1339 force_resend = true; 1340 } else if (t->last_force_resend == 0) { 1341 force_resend = true; 1342 } 1343 } 1344 if (ceph_oid_empty(&t->target_oid) || force_resend) { 1345 ceph_oid_copy(&t->target_oid, &t->base_oid); 1346 need_check_tiering = true; 1347 } 1348 if (ceph_oloc_empty(&t->target_oloc) || force_resend) { 1349 ceph_oloc_copy(&t->target_oloc, &t->base_oloc); 1350 need_check_tiering = true; 1351 } 1352 1353 if (need_check_tiering && 1354 (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1355 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0) 1356 t->target_oloc.pool = pi->read_tier; 1357 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0) 1358 t->target_oloc.pool = pi->write_tier; 1359 } 1360 1361 ret = ceph_object_locator_to_pg(osdc->osdmap, &t->target_oid, 1362 &t->target_oloc, &pgid); 1363 if (ret) { 1364 WARN_ON(ret != -ENOENT); 1365 t->osd = CEPH_HOMELESS_OSD; 1366 ct_res = CALC_TARGET_POOL_DNE; 1367 goto out; 1368 } 1369 last_pgid.pool = pgid.pool; 1370 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1371 1372 ceph_pg_to_up_acting_osds(osdc->osdmap, &pgid, &up, &acting); 1373 if (any_change && 1374 ceph_is_new_interval(&t->acting, 1375 &acting, 1376 &t->up, 1377 &up, 1378 t->size, 1379 pi->size, 1380 t->min_size, 1381 pi->min_size, 1382 t->pg_num, 1383 pi->pg_num, 1384 t->sort_bitwise, 1385 sort_bitwise, 1386 &last_pgid)) 1387 force_resend = true; 1388 1389 if (t->paused && !target_should_be_paused(osdc, t, pi)) { 1390 t->paused = false; 1391 need_resend = true; 1392 } 1393 1394 if (ceph_pg_compare(&t->pgid, &pgid) || 1395 ceph_osds_changed(&t->acting, &acting, any_change) || 1396 force_resend) { 1397 t->pgid = pgid; /* struct */ 1398 ceph_pg_to_primary_shard(osdc->osdmap, &pgid, &t->spgid); 1399 ceph_osds_copy(&t->acting, &acting); 1400 ceph_osds_copy(&t->up, &up); 1401 t->size = pi->size; 1402 t->min_size = pi->min_size; 1403 t->pg_num = pi->pg_num; 1404 t->pg_num_mask = pi->pg_num_mask; 1405 t->sort_bitwise = sort_bitwise; 1406 1407 t->osd = acting.primary; 1408 need_resend = true; 1409 } 1410 1411 ct_res = need_resend ? CALC_TARGET_NEED_RESEND : CALC_TARGET_NO_ACTION; 1412 out: 1413 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd); 1414 return ct_res; 1415 } 1416 1417 static void setup_request_data(struct ceph_osd_request *req, 1418 struct ceph_msg *msg) 1419 { 1420 u32 data_len = 0; 1421 int i; 1422 1423 if (!list_empty(&msg->data)) 1424 return; 1425 1426 WARN_ON(msg->data_length); 1427 for (i = 0; i < req->r_num_ops; i++) { 1428 struct ceph_osd_req_op *op = &req->r_ops[i]; 1429 1430 switch (op->op) { 1431 /* request */ 1432 case CEPH_OSD_OP_WRITE: 1433 case CEPH_OSD_OP_WRITEFULL: 1434 WARN_ON(op->indata_len != op->extent.length); 1435 ceph_osdc_msg_data_add(msg, &op->extent.osd_data); 1436 break; 1437 case CEPH_OSD_OP_SETXATTR: 1438 case CEPH_OSD_OP_CMPXATTR: 1439 WARN_ON(op->indata_len != op->xattr.name_len + 1440 op->xattr.value_len); 1441 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); 1442 break; 1443 case CEPH_OSD_OP_NOTIFY_ACK: 1444 ceph_osdc_msg_data_add(msg, 1445 &op->notify_ack.request_data); 1446 break; 1447 1448 /* reply */ 1449 case CEPH_OSD_OP_STAT: 1450 ceph_osdc_msg_data_add(req->r_reply, 1451 &op->raw_data_in); 1452 break; 1453 case CEPH_OSD_OP_READ: 1454 ceph_osdc_msg_data_add(req->r_reply, 1455 &op->extent.osd_data); 1456 break; 1457 case CEPH_OSD_OP_LIST_WATCHERS: 1458 ceph_osdc_msg_data_add(req->r_reply, 1459 &op->list_watchers.response_data); 1460 break; 1461 1462 /* both */ 1463 case CEPH_OSD_OP_CALL: 1464 WARN_ON(op->indata_len != op->cls.class_len + 1465 op->cls.method_len + 1466 op->cls.indata_len); 1467 ceph_osdc_msg_data_add(msg, &op->cls.request_info); 1468 /* optional, can be NONE */ 1469 ceph_osdc_msg_data_add(msg, &op->cls.request_data); 1470 /* optional, can be NONE */ 1471 ceph_osdc_msg_data_add(req->r_reply, 1472 &op->cls.response_data); 1473 break; 1474 case CEPH_OSD_OP_NOTIFY: 1475 ceph_osdc_msg_data_add(msg, 1476 &op->notify.request_data); 1477 ceph_osdc_msg_data_add(req->r_reply, 1478 &op->notify.response_data); 1479 break; 1480 } 1481 1482 data_len += op->indata_len; 1483 } 1484 1485 WARN_ON(data_len != msg->data_length); 1486 } 1487 1488 static void encode_pgid(void **p, const struct ceph_pg *pgid) 1489 { 1490 ceph_encode_8(p, 1); 1491 ceph_encode_64(p, pgid->pool); 1492 ceph_encode_32(p, pgid->seed); 1493 ceph_encode_32(p, -1); /* preferred */ 1494 } 1495 1496 static void encode_oloc(void **p, void *end, 1497 const struct ceph_object_locator *oloc) 1498 { 1499 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc)); 1500 ceph_encode_64(p, oloc->pool); 1501 ceph_encode_32(p, -1); /* preferred */ 1502 ceph_encode_32(p, 0); /* key len */ 1503 if (oloc->pool_ns) 1504 ceph_encode_string(p, end, oloc->pool_ns->str, 1505 oloc->pool_ns->len); 1506 else 1507 ceph_encode_32(p, 0); 1508 } 1509 1510 static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg) 1511 { 1512 void *p = msg->front.iov_base; 1513 void *const end = p + msg->front_alloc_len; 1514 u32 data_len = 0; 1515 int i; 1516 1517 if (req->r_flags & CEPH_OSD_FLAG_WRITE) { 1518 /* snapshots aren't writeable */ 1519 WARN_ON(req->r_snapid != CEPH_NOSNAP); 1520 } else { 1521 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec || 1522 req->r_data_offset || req->r_snapc); 1523 } 1524 1525 setup_request_data(req, msg); 1526 1527 ceph_encode_32(&p, 1); /* client_inc, always 1 */ 1528 ceph_encode_32(&p, req->r_osdc->osdmap->epoch); 1529 ceph_encode_32(&p, req->r_flags); 1530 ceph_encode_timespec(p, &req->r_mtime); 1531 p += sizeof(struct ceph_timespec); 1532 1533 /* reassert_version */ 1534 memset(p, 0, sizeof(struct ceph_eversion)); 1535 p += sizeof(struct ceph_eversion); 1536 1537 encode_oloc(&p, end, &req->r_t.target_oloc); 1538 encode_pgid(&p, &req->r_t.pgid); 1539 ceph_encode_string(&p, end, req->r_t.target_oid.name, 1540 req->r_t.target_oid.name_len); 1541 1542 /* ops, can imply data */ 1543 ceph_encode_16(&p, req->r_num_ops); 1544 for (i = 0; i < req->r_num_ops; i++) { 1545 data_len += osd_req_encode_op(p, &req->r_ops[i]); 1546 p += sizeof(struct ceph_osd_op); 1547 } 1548 1549 ceph_encode_64(&p, req->r_snapid); /* snapid */ 1550 if (req->r_snapc) { 1551 ceph_encode_64(&p, req->r_snapc->seq); 1552 ceph_encode_32(&p, req->r_snapc->num_snaps); 1553 for (i = 0; i < req->r_snapc->num_snaps; i++) 1554 ceph_encode_64(&p, req->r_snapc->snaps[i]); 1555 } else { 1556 ceph_encode_64(&p, 0); /* snap_seq */ 1557 ceph_encode_32(&p, 0); /* snaps len */ 1558 } 1559 1560 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ 1561 1562 BUG_ON(p > end); 1563 msg->front.iov_len = p - msg->front.iov_base; 1564 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */ 1565 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1566 msg->hdr.data_len = cpu_to_le32(data_len); 1567 /* 1568 * The header "data_off" is a hint to the receiver allowing it 1569 * to align received data into its buffers such that there's no 1570 * need to re-copy it before writing it to disk (direct I/O). 1571 */ 1572 msg->hdr.data_off = cpu_to_le16(req->r_data_offset); 1573 1574 dout("%s req %p oid %s oid_len %d front %zu data %u\n", __func__, 1575 req, req->r_t.target_oid.name, req->r_t.target_oid.name_len, 1576 msg->front.iov_len, data_len); 1577 } 1578 1579 /* 1580 * @req has to be assigned a tid and registered. 1581 */ 1582 static void send_request(struct ceph_osd_request *req) 1583 { 1584 struct ceph_osd *osd = req->r_osd; 1585 1586 verify_osd_locked(osd); 1587 WARN_ON(osd->o_osd != req->r_t.osd); 1588 1589 /* 1590 * We may have a previously queued request message hanging 1591 * around. Cancel it to avoid corrupting the msgr. 1592 */ 1593 if (req->r_sent) 1594 ceph_msg_revoke(req->r_request); 1595 1596 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; 1597 if (req->r_attempts) 1598 req->r_flags |= CEPH_OSD_FLAG_RETRY; 1599 else 1600 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); 1601 1602 encode_request(req, req->r_request); 1603 1604 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d flags 0x%x attempt %d\n", 1605 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, 1606 req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed, 1607 req->r_t.spgid.shard, osd->o_osd, req->r_flags, req->r_attempts); 1608 1609 req->r_t.paused = false; 1610 req->r_stamp = jiffies; 1611 req->r_attempts++; 1612 1613 req->r_sent = osd->o_incarnation; 1614 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 1615 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); 1616 } 1617 1618 static void maybe_request_map(struct ceph_osd_client *osdc) 1619 { 1620 bool continuous = false; 1621 1622 verify_osdc_locked(osdc); 1623 WARN_ON(!osdc->osdmap->epoch); 1624 1625 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1626 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) || 1627 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 1628 dout("%s osdc %p continuous\n", __func__, osdc); 1629 continuous = true; 1630 } else { 1631 dout("%s osdc %p onetime\n", __func__, osdc); 1632 } 1633 1634 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 1635 osdc->osdmap->epoch + 1, continuous)) 1636 ceph_monc_renew_subs(&osdc->client->monc); 1637 } 1638 1639 static void complete_request(struct ceph_osd_request *req, int err); 1640 static void send_map_check(struct ceph_osd_request *req); 1641 1642 static void __submit_request(struct ceph_osd_request *req, bool wrlocked) 1643 { 1644 struct ceph_osd_client *osdc = req->r_osdc; 1645 struct ceph_osd *osd; 1646 enum calc_target_result ct_res; 1647 bool need_send = false; 1648 bool promoted = false; 1649 bool need_abort = false; 1650 1651 WARN_ON(req->r_tid); 1652 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 1653 1654 again: 1655 ct_res = calc_target(osdc, &req->r_t, false); 1656 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked) 1657 goto promote; 1658 1659 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked); 1660 if (IS_ERR(osd)) { 1661 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked); 1662 goto promote; 1663 } 1664 1665 if (osdc->osdmap->epoch < osdc->epoch_barrier) { 1666 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, 1667 osdc->epoch_barrier); 1668 req->r_t.paused = true; 1669 maybe_request_map(osdc); 1670 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 1671 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 1672 dout("req %p pausewr\n", req); 1673 req->r_t.paused = true; 1674 maybe_request_map(osdc); 1675 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) && 1676 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 1677 dout("req %p pauserd\n", req); 1678 req->r_t.paused = true; 1679 maybe_request_map(osdc); 1680 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 1681 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY | 1682 CEPH_OSD_FLAG_FULL_FORCE)) && 1683 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1684 pool_full(osdc, req->r_t.base_oloc.pool))) { 1685 dout("req %p full/pool_full\n", req); 1686 pr_warn_ratelimited("FULL or reached pool quota\n"); 1687 req->r_t.paused = true; 1688 maybe_request_map(osdc); 1689 if (req->r_abort_on_full) 1690 need_abort = true; 1691 } else if (!osd_homeless(osd)) { 1692 need_send = true; 1693 } else { 1694 maybe_request_map(osdc); 1695 } 1696 1697 mutex_lock(&osd->lock); 1698 /* 1699 * Assign the tid atomically with send_request() to protect 1700 * multiple writes to the same object from racing with each 1701 * other, resulting in out of order ops on the OSDs. 1702 */ 1703 req->r_tid = atomic64_inc_return(&osdc->last_tid); 1704 link_request(osd, req); 1705 if (need_send) 1706 send_request(req); 1707 else if (need_abort) 1708 complete_request(req, -ENOSPC); 1709 mutex_unlock(&osd->lock); 1710 1711 if (ct_res == CALC_TARGET_POOL_DNE) 1712 send_map_check(req); 1713 1714 if (promoted) 1715 downgrade_write(&osdc->lock); 1716 return; 1717 1718 promote: 1719 up_read(&osdc->lock); 1720 down_write(&osdc->lock); 1721 wrlocked = true; 1722 promoted = true; 1723 goto again; 1724 } 1725 1726 static void account_request(struct ceph_osd_request *req) 1727 { 1728 WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK)); 1729 WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); 1730 1731 req->r_flags |= CEPH_OSD_FLAG_ONDISK; 1732 atomic_inc(&req->r_osdc->num_requests); 1733 1734 req->r_start_stamp = jiffies; 1735 } 1736 1737 static void submit_request(struct ceph_osd_request *req, bool wrlocked) 1738 { 1739 ceph_osdc_get_request(req); 1740 account_request(req); 1741 __submit_request(req, wrlocked); 1742 } 1743 1744 static void finish_request(struct ceph_osd_request *req) 1745 { 1746 struct ceph_osd_client *osdc = req->r_osdc; 1747 struct ceph_osd *osd = req->r_osd; 1748 1749 verify_osd_locked(osd); 1750 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 1751 1752 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); 1753 unlink_request(osd, req); 1754 atomic_dec(&osdc->num_requests); 1755 1756 /* 1757 * If an OSD has failed or returned and a request has been sent 1758 * twice, it's possible to get a reply and end up here while the 1759 * request message is queued for delivery. We will ignore the 1760 * reply, so not a big deal, but better to try and catch it. 1761 */ 1762 ceph_msg_revoke(req->r_request); 1763 ceph_msg_revoke_incoming(req->r_reply); 1764 } 1765 1766 static void __complete_request(struct ceph_osd_request *req) 1767 { 1768 if (req->r_callback) { 1769 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, 1770 req->r_tid, req->r_callback, req->r_result); 1771 req->r_callback(req); 1772 } 1773 } 1774 1775 /* 1776 * This is open-coded in handle_reply(). 1777 */ 1778 static void complete_request(struct ceph_osd_request *req, int err) 1779 { 1780 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 1781 1782 req->r_result = err; 1783 finish_request(req); 1784 __complete_request(req); 1785 complete_all(&req->r_completion); 1786 ceph_osdc_put_request(req); 1787 } 1788 1789 static void cancel_map_check(struct ceph_osd_request *req) 1790 { 1791 struct ceph_osd_client *osdc = req->r_osdc; 1792 struct ceph_osd_request *lookup_req; 1793 1794 verify_osdc_wrlocked(osdc); 1795 1796 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 1797 if (!lookup_req) 1798 return; 1799 1800 WARN_ON(lookup_req != req); 1801 erase_request_mc(&osdc->map_checks, req); 1802 ceph_osdc_put_request(req); 1803 } 1804 1805 static void cancel_request(struct ceph_osd_request *req) 1806 { 1807 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 1808 1809 cancel_map_check(req); 1810 finish_request(req); 1811 complete_all(&req->r_completion); 1812 ceph_osdc_put_request(req); 1813 } 1814 1815 static void abort_request(struct ceph_osd_request *req, int err) 1816 { 1817 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 1818 1819 cancel_map_check(req); 1820 complete_request(req, err); 1821 } 1822 1823 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 1824 { 1825 if (likely(eb > osdc->epoch_barrier)) { 1826 dout("updating epoch_barrier from %u to %u\n", 1827 osdc->epoch_barrier, eb); 1828 osdc->epoch_barrier = eb; 1829 /* Request map if we're not to the barrier yet */ 1830 if (eb > osdc->osdmap->epoch) 1831 maybe_request_map(osdc); 1832 } 1833 } 1834 1835 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 1836 { 1837 down_read(&osdc->lock); 1838 if (unlikely(eb > osdc->epoch_barrier)) { 1839 up_read(&osdc->lock); 1840 down_write(&osdc->lock); 1841 update_epoch_barrier(osdc, eb); 1842 up_write(&osdc->lock); 1843 } else { 1844 up_read(&osdc->lock); 1845 } 1846 } 1847 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); 1848 1849 /* 1850 * Drop all pending requests that are stalled waiting on a full condition to 1851 * clear, and complete them with ENOSPC as the return code. Set the 1852 * osdc->epoch_barrier to the latest map epoch that we've seen if any were 1853 * cancelled. 1854 */ 1855 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) 1856 { 1857 struct rb_node *n; 1858 bool victims = false; 1859 1860 dout("enter abort_on_full\n"); 1861 1862 if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) 1863 goto out; 1864 1865 /* Scan list and see if there is anything to abort */ 1866 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 1867 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 1868 struct rb_node *m; 1869 1870 m = rb_first(&osd->o_requests); 1871 while (m) { 1872 struct ceph_osd_request *req = rb_entry(m, 1873 struct ceph_osd_request, r_node); 1874 m = rb_next(m); 1875 1876 if (req->r_abort_on_full) { 1877 victims = true; 1878 break; 1879 } 1880 } 1881 if (victims) 1882 break; 1883 } 1884 1885 if (!victims) 1886 goto out; 1887 1888 /* 1889 * Update the barrier to current epoch if it's behind that point, 1890 * since we know we have some calls to be aborted in the tree. 1891 */ 1892 update_epoch_barrier(osdc, osdc->osdmap->epoch); 1893 1894 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 1895 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 1896 struct rb_node *m; 1897 1898 m = rb_first(&osd->o_requests); 1899 while (m) { 1900 struct ceph_osd_request *req = rb_entry(m, 1901 struct ceph_osd_request, r_node); 1902 m = rb_next(m); 1903 1904 if (req->r_abort_on_full && 1905 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1906 pool_full(osdc, req->r_t.target_oloc.pool))) 1907 abort_request(req, -ENOSPC); 1908 } 1909 } 1910 out: 1911 dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); 1912 } 1913 1914 static void check_pool_dne(struct ceph_osd_request *req) 1915 { 1916 struct ceph_osd_client *osdc = req->r_osdc; 1917 struct ceph_osdmap *map = osdc->osdmap; 1918 1919 verify_osdc_wrlocked(osdc); 1920 WARN_ON(!map->epoch); 1921 1922 if (req->r_attempts) { 1923 /* 1924 * We sent a request earlier, which means that 1925 * previously the pool existed, and now it does not 1926 * (i.e., it was deleted). 1927 */ 1928 req->r_map_dne_bound = map->epoch; 1929 dout("%s req %p tid %llu pool disappeared\n", __func__, req, 1930 req->r_tid); 1931 } else { 1932 dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__, 1933 req, req->r_tid, req->r_map_dne_bound, map->epoch); 1934 } 1935 1936 if (req->r_map_dne_bound) { 1937 if (map->epoch >= req->r_map_dne_bound) { 1938 /* we had a new enough map */ 1939 pr_info_ratelimited("tid %llu pool does not exist\n", 1940 req->r_tid); 1941 complete_request(req, -ENOENT); 1942 } 1943 } else { 1944 send_map_check(req); 1945 } 1946 } 1947 1948 static void map_check_cb(struct ceph_mon_generic_request *greq) 1949 { 1950 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 1951 struct ceph_osd_request *req; 1952 u64 tid = greq->private_data; 1953 1954 WARN_ON(greq->result || !greq->u.newest); 1955 1956 down_write(&osdc->lock); 1957 req = lookup_request_mc(&osdc->map_checks, tid); 1958 if (!req) { 1959 dout("%s tid %llu dne\n", __func__, tid); 1960 goto out_unlock; 1961 } 1962 1963 dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__, 1964 req, req->r_tid, req->r_map_dne_bound, greq->u.newest); 1965 if (!req->r_map_dne_bound) 1966 req->r_map_dne_bound = greq->u.newest; 1967 erase_request_mc(&osdc->map_checks, req); 1968 check_pool_dne(req); 1969 1970 ceph_osdc_put_request(req); 1971 out_unlock: 1972 up_write(&osdc->lock); 1973 } 1974 1975 static void send_map_check(struct ceph_osd_request *req) 1976 { 1977 struct ceph_osd_client *osdc = req->r_osdc; 1978 struct ceph_osd_request *lookup_req; 1979 int ret; 1980 1981 verify_osdc_wrlocked(osdc); 1982 1983 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 1984 if (lookup_req) { 1985 WARN_ON(lookup_req != req); 1986 return; 1987 } 1988 1989 ceph_osdc_get_request(req); 1990 insert_request_mc(&osdc->map_checks, req); 1991 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 1992 map_check_cb, req->r_tid); 1993 WARN_ON(ret); 1994 } 1995 1996 /* 1997 * lingering requests, watch/notify v2 infrastructure 1998 */ 1999 static void linger_release(struct kref *kref) 2000 { 2001 struct ceph_osd_linger_request *lreq = 2002 container_of(kref, struct ceph_osd_linger_request, kref); 2003 2004 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, 2005 lreq->reg_req, lreq->ping_req); 2006 WARN_ON(!RB_EMPTY_NODE(&lreq->node)); 2007 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); 2008 WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node)); 2009 WARN_ON(!list_empty(&lreq->scan_item)); 2010 WARN_ON(!list_empty(&lreq->pending_lworks)); 2011 WARN_ON(lreq->osd); 2012 2013 if (lreq->reg_req) 2014 ceph_osdc_put_request(lreq->reg_req); 2015 if (lreq->ping_req) 2016 ceph_osdc_put_request(lreq->ping_req); 2017 target_destroy(&lreq->t); 2018 kfree(lreq); 2019 } 2020 2021 static void linger_put(struct ceph_osd_linger_request *lreq) 2022 { 2023 if (lreq) 2024 kref_put(&lreq->kref, linger_release); 2025 } 2026 2027 static struct ceph_osd_linger_request * 2028 linger_get(struct ceph_osd_linger_request *lreq) 2029 { 2030 kref_get(&lreq->kref); 2031 return lreq; 2032 } 2033 2034 static struct ceph_osd_linger_request * 2035 linger_alloc(struct ceph_osd_client *osdc) 2036 { 2037 struct ceph_osd_linger_request *lreq; 2038 2039 lreq = kzalloc(sizeof(*lreq), GFP_NOIO); 2040 if (!lreq) 2041 return NULL; 2042 2043 kref_init(&lreq->kref); 2044 mutex_init(&lreq->lock); 2045 RB_CLEAR_NODE(&lreq->node); 2046 RB_CLEAR_NODE(&lreq->osdc_node); 2047 RB_CLEAR_NODE(&lreq->mc_node); 2048 INIT_LIST_HEAD(&lreq->scan_item); 2049 INIT_LIST_HEAD(&lreq->pending_lworks); 2050 init_completion(&lreq->reg_commit_wait); 2051 init_completion(&lreq->notify_finish_wait); 2052 2053 lreq->osdc = osdc; 2054 target_init(&lreq->t); 2055 2056 dout("%s lreq %p\n", __func__, lreq); 2057 return lreq; 2058 } 2059 2060 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) 2061 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) 2062 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node) 2063 2064 /* 2065 * Create linger request <-> OSD session relation. 2066 * 2067 * @lreq has to be registered, @osd may be homeless. 2068 */ 2069 static void link_linger(struct ceph_osd *osd, 2070 struct ceph_osd_linger_request *lreq) 2071 { 2072 verify_osd_locked(osd); 2073 WARN_ON(!lreq->linger_id || lreq->osd); 2074 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2075 osd->o_osd, lreq, lreq->linger_id); 2076 2077 if (!osd_homeless(osd)) 2078 __remove_osd_from_lru(osd); 2079 else 2080 atomic_inc(&osd->o_osdc->num_homeless); 2081 2082 get_osd(osd); 2083 insert_linger(&osd->o_linger_requests, lreq); 2084 lreq->osd = osd; 2085 } 2086 2087 static void unlink_linger(struct ceph_osd *osd, 2088 struct ceph_osd_linger_request *lreq) 2089 { 2090 verify_osd_locked(osd); 2091 WARN_ON(lreq->osd != osd); 2092 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2093 osd->o_osd, lreq, lreq->linger_id); 2094 2095 lreq->osd = NULL; 2096 erase_linger(&osd->o_linger_requests, lreq); 2097 put_osd(osd); 2098 2099 if (!osd_homeless(osd)) 2100 maybe_move_osd_to_lru(osd); 2101 else 2102 atomic_dec(&osd->o_osdc->num_homeless); 2103 } 2104 2105 static bool __linger_registered(struct ceph_osd_linger_request *lreq) 2106 { 2107 verify_osdc_locked(lreq->osdc); 2108 2109 return !RB_EMPTY_NODE(&lreq->osdc_node); 2110 } 2111 2112 static bool linger_registered(struct ceph_osd_linger_request *lreq) 2113 { 2114 struct ceph_osd_client *osdc = lreq->osdc; 2115 bool registered; 2116 2117 down_read(&osdc->lock); 2118 registered = __linger_registered(lreq); 2119 up_read(&osdc->lock); 2120 2121 return registered; 2122 } 2123 2124 static void linger_register(struct ceph_osd_linger_request *lreq) 2125 { 2126 struct ceph_osd_client *osdc = lreq->osdc; 2127 2128 verify_osdc_wrlocked(osdc); 2129 WARN_ON(lreq->linger_id); 2130 2131 linger_get(lreq); 2132 lreq->linger_id = ++osdc->last_linger_id; 2133 insert_linger_osdc(&osdc->linger_requests, lreq); 2134 } 2135 2136 static void linger_unregister(struct ceph_osd_linger_request *lreq) 2137 { 2138 struct ceph_osd_client *osdc = lreq->osdc; 2139 2140 verify_osdc_wrlocked(osdc); 2141 2142 erase_linger_osdc(&osdc->linger_requests, lreq); 2143 linger_put(lreq); 2144 } 2145 2146 static void cancel_linger_request(struct ceph_osd_request *req) 2147 { 2148 struct ceph_osd_linger_request *lreq = req->r_priv; 2149 2150 WARN_ON(!req->r_linger); 2151 cancel_request(req); 2152 linger_put(lreq); 2153 } 2154 2155 struct linger_work { 2156 struct work_struct work; 2157 struct ceph_osd_linger_request *lreq; 2158 struct list_head pending_item; 2159 unsigned long queued_stamp; 2160 2161 union { 2162 struct { 2163 u64 notify_id; 2164 u64 notifier_id; 2165 void *payload; /* points into @msg front */ 2166 size_t payload_len; 2167 2168 struct ceph_msg *msg; /* for ceph_msg_put() */ 2169 } notify; 2170 struct { 2171 int err; 2172 } error; 2173 }; 2174 }; 2175 2176 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, 2177 work_func_t workfn) 2178 { 2179 struct linger_work *lwork; 2180 2181 lwork = kzalloc(sizeof(*lwork), GFP_NOIO); 2182 if (!lwork) 2183 return NULL; 2184 2185 INIT_WORK(&lwork->work, workfn); 2186 INIT_LIST_HEAD(&lwork->pending_item); 2187 lwork->lreq = linger_get(lreq); 2188 2189 return lwork; 2190 } 2191 2192 static void lwork_free(struct linger_work *lwork) 2193 { 2194 struct ceph_osd_linger_request *lreq = lwork->lreq; 2195 2196 mutex_lock(&lreq->lock); 2197 list_del(&lwork->pending_item); 2198 mutex_unlock(&lreq->lock); 2199 2200 linger_put(lreq); 2201 kfree(lwork); 2202 } 2203 2204 static void lwork_queue(struct linger_work *lwork) 2205 { 2206 struct ceph_osd_linger_request *lreq = lwork->lreq; 2207 struct ceph_osd_client *osdc = lreq->osdc; 2208 2209 verify_lreq_locked(lreq); 2210 WARN_ON(!list_empty(&lwork->pending_item)); 2211 2212 lwork->queued_stamp = jiffies; 2213 list_add_tail(&lwork->pending_item, &lreq->pending_lworks); 2214 queue_work(osdc->notify_wq, &lwork->work); 2215 } 2216 2217 static void do_watch_notify(struct work_struct *w) 2218 { 2219 struct linger_work *lwork = container_of(w, struct linger_work, work); 2220 struct ceph_osd_linger_request *lreq = lwork->lreq; 2221 2222 if (!linger_registered(lreq)) { 2223 dout("%s lreq %p not registered\n", __func__, lreq); 2224 goto out; 2225 } 2226 2227 WARN_ON(!lreq->is_watch); 2228 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", 2229 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, 2230 lwork->notify.payload_len); 2231 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, 2232 lwork->notify.notifier_id, lwork->notify.payload, 2233 lwork->notify.payload_len); 2234 2235 out: 2236 ceph_msg_put(lwork->notify.msg); 2237 lwork_free(lwork); 2238 } 2239 2240 static void do_watch_error(struct work_struct *w) 2241 { 2242 struct linger_work *lwork = container_of(w, struct linger_work, work); 2243 struct ceph_osd_linger_request *lreq = lwork->lreq; 2244 2245 if (!linger_registered(lreq)) { 2246 dout("%s lreq %p not registered\n", __func__, lreq); 2247 goto out; 2248 } 2249 2250 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); 2251 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); 2252 2253 out: 2254 lwork_free(lwork); 2255 } 2256 2257 static void queue_watch_error(struct ceph_osd_linger_request *lreq) 2258 { 2259 struct linger_work *lwork; 2260 2261 lwork = lwork_alloc(lreq, do_watch_error); 2262 if (!lwork) { 2263 pr_err("failed to allocate error-lwork\n"); 2264 return; 2265 } 2266 2267 lwork->error.err = lreq->last_error; 2268 lwork_queue(lwork); 2269 } 2270 2271 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, 2272 int result) 2273 { 2274 if (!completion_done(&lreq->reg_commit_wait)) { 2275 lreq->reg_commit_error = (result <= 0 ? result : 0); 2276 complete_all(&lreq->reg_commit_wait); 2277 } 2278 } 2279 2280 static void linger_commit_cb(struct ceph_osd_request *req) 2281 { 2282 struct ceph_osd_linger_request *lreq = req->r_priv; 2283 2284 mutex_lock(&lreq->lock); 2285 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, 2286 lreq->linger_id, req->r_result); 2287 linger_reg_commit_complete(lreq, req->r_result); 2288 lreq->committed = true; 2289 2290 if (!lreq->is_watch) { 2291 struct ceph_osd_data *osd_data = 2292 osd_req_op_data(req, 0, notify, response_data); 2293 void *p = page_address(osd_data->pages[0]); 2294 2295 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY || 2296 osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 2297 2298 /* make note of the notify_id */ 2299 if (req->r_ops[0].outdata_len >= sizeof(u64)) { 2300 lreq->notify_id = ceph_decode_64(&p); 2301 dout("lreq %p notify_id %llu\n", lreq, 2302 lreq->notify_id); 2303 } else { 2304 dout("lreq %p no notify_id\n", lreq); 2305 } 2306 } 2307 2308 mutex_unlock(&lreq->lock); 2309 linger_put(lreq); 2310 } 2311 2312 static int normalize_watch_error(int err) 2313 { 2314 /* 2315 * Translate ENOENT -> ENOTCONN so that a delete->disconnection 2316 * notification and a failure to reconnect because we raced with 2317 * the delete appear the same to the user. 2318 */ 2319 if (err == -ENOENT) 2320 err = -ENOTCONN; 2321 2322 return err; 2323 } 2324 2325 static void linger_reconnect_cb(struct ceph_osd_request *req) 2326 { 2327 struct ceph_osd_linger_request *lreq = req->r_priv; 2328 2329 mutex_lock(&lreq->lock); 2330 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, 2331 lreq, lreq->linger_id, req->r_result, lreq->last_error); 2332 if (req->r_result < 0) { 2333 if (!lreq->last_error) { 2334 lreq->last_error = normalize_watch_error(req->r_result); 2335 queue_watch_error(lreq); 2336 } 2337 } 2338 2339 mutex_unlock(&lreq->lock); 2340 linger_put(lreq); 2341 } 2342 2343 static void send_linger(struct ceph_osd_linger_request *lreq) 2344 { 2345 struct ceph_osd_request *req = lreq->reg_req; 2346 struct ceph_osd_req_op *op = &req->r_ops[0]; 2347 2348 verify_osdc_wrlocked(req->r_osdc); 2349 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 2350 2351 if (req->r_osd) 2352 cancel_linger_request(req); 2353 2354 request_reinit(req); 2355 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 2356 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 2357 req->r_flags = lreq->t.flags; 2358 req->r_mtime = lreq->mtime; 2359 2360 mutex_lock(&lreq->lock); 2361 if (lreq->is_watch && lreq->committed) { 2362 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2363 op->watch.cookie != lreq->linger_id); 2364 op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT; 2365 op->watch.gen = ++lreq->register_gen; 2366 dout("lreq %p reconnect register_gen %u\n", lreq, 2367 op->watch.gen); 2368 req->r_callback = linger_reconnect_cb; 2369 } else { 2370 if (!lreq->is_watch) 2371 lreq->notify_id = 0; 2372 else 2373 WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH); 2374 dout("lreq %p register\n", lreq); 2375 req->r_callback = linger_commit_cb; 2376 } 2377 mutex_unlock(&lreq->lock); 2378 2379 req->r_priv = linger_get(lreq); 2380 req->r_linger = true; 2381 2382 submit_request(req, true); 2383 } 2384 2385 static void linger_ping_cb(struct ceph_osd_request *req) 2386 { 2387 struct ceph_osd_linger_request *lreq = req->r_priv; 2388 2389 mutex_lock(&lreq->lock); 2390 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", 2391 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, 2392 lreq->last_error); 2393 if (lreq->register_gen == req->r_ops[0].watch.gen) { 2394 if (!req->r_result) { 2395 lreq->watch_valid_thru = lreq->ping_sent; 2396 } else if (!lreq->last_error) { 2397 lreq->last_error = normalize_watch_error(req->r_result); 2398 queue_watch_error(lreq); 2399 } 2400 } else { 2401 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, 2402 lreq->register_gen, req->r_ops[0].watch.gen); 2403 } 2404 2405 mutex_unlock(&lreq->lock); 2406 linger_put(lreq); 2407 } 2408 2409 static void send_linger_ping(struct ceph_osd_linger_request *lreq) 2410 { 2411 struct ceph_osd_client *osdc = lreq->osdc; 2412 struct ceph_osd_request *req = lreq->ping_req; 2413 struct ceph_osd_req_op *op = &req->r_ops[0]; 2414 2415 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2416 dout("%s PAUSERD\n", __func__); 2417 return; 2418 } 2419 2420 lreq->ping_sent = jiffies; 2421 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", 2422 __func__, lreq, lreq->linger_id, lreq->ping_sent, 2423 lreq->register_gen); 2424 2425 if (req->r_osd) 2426 cancel_linger_request(req); 2427 2428 request_reinit(req); 2429 target_copy(&req->r_t, &lreq->t); 2430 2431 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2432 op->watch.cookie != lreq->linger_id || 2433 op->watch.op != CEPH_OSD_WATCH_OP_PING); 2434 op->watch.gen = lreq->register_gen; 2435 req->r_callback = linger_ping_cb; 2436 req->r_priv = linger_get(lreq); 2437 req->r_linger = true; 2438 2439 ceph_osdc_get_request(req); 2440 account_request(req); 2441 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2442 link_request(lreq->osd, req); 2443 send_request(req); 2444 } 2445 2446 static void linger_submit(struct ceph_osd_linger_request *lreq) 2447 { 2448 struct ceph_osd_client *osdc = lreq->osdc; 2449 struct ceph_osd *osd; 2450 2451 calc_target(osdc, &lreq->t, false); 2452 osd = lookup_create_osd(osdc, lreq->t.osd, true); 2453 link_linger(osd, lreq); 2454 2455 send_linger(lreq); 2456 } 2457 2458 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) 2459 { 2460 struct ceph_osd_client *osdc = lreq->osdc; 2461 struct ceph_osd_linger_request *lookup_lreq; 2462 2463 verify_osdc_wrlocked(osdc); 2464 2465 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 2466 lreq->linger_id); 2467 if (!lookup_lreq) 2468 return; 2469 2470 WARN_ON(lookup_lreq != lreq); 2471 erase_linger_mc(&osdc->linger_map_checks, lreq); 2472 linger_put(lreq); 2473 } 2474 2475 /* 2476 * @lreq has to be both registered and linked. 2477 */ 2478 static void __linger_cancel(struct ceph_osd_linger_request *lreq) 2479 { 2480 if (lreq->is_watch && lreq->ping_req->r_osd) 2481 cancel_linger_request(lreq->ping_req); 2482 if (lreq->reg_req->r_osd) 2483 cancel_linger_request(lreq->reg_req); 2484 cancel_linger_map_check(lreq); 2485 unlink_linger(lreq->osd, lreq); 2486 linger_unregister(lreq); 2487 } 2488 2489 static void linger_cancel(struct ceph_osd_linger_request *lreq) 2490 { 2491 struct ceph_osd_client *osdc = lreq->osdc; 2492 2493 down_write(&osdc->lock); 2494 if (__linger_registered(lreq)) 2495 __linger_cancel(lreq); 2496 up_write(&osdc->lock); 2497 } 2498 2499 static void send_linger_map_check(struct ceph_osd_linger_request *lreq); 2500 2501 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq) 2502 { 2503 struct ceph_osd_client *osdc = lreq->osdc; 2504 struct ceph_osdmap *map = osdc->osdmap; 2505 2506 verify_osdc_wrlocked(osdc); 2507 WARN_ON(!map->epoch); 2508 2509 if (lreq->register_gen) { 2510 lreq->map_dne_bound = map->epoch; 2511 dout("%s lreq %p linger_id %llu pool disappeared\n", __func__, 2512 lreq, lreq->linger_id); 2513 } else { 2514 dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n", 2515 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 2516 map->epoch); 2517 } 2518 2519 if (lreq->map_dne_bound) { 2520 if (map->epoch >= lreq->map_dne_bound) { 2521 /* we had a new enough map */ 2522 pr_info("linger_id %llu pool does not exist\n", 2523 lreq->linger_id); 2524 linger_reg_commit_complete(lreq, -ENOENT); 2525 __linger_cancel(lreq); 2526 } 2527 } else { 2528 send_linger_map_check(lreq); 2529 } 2530 } 2531 2532 static void linger_map_check_cb(struct ceph_mon_generic_request *greq) 2533 { 2534 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 2535 struct ceph_osd_linger_request *lreq; 2536 u64 linger_id = greq->private_data; 2537 2538 WARN_ON(greq->result || !greq->u.newest); 2539 2540 down_write(&osdc->lock); 2541 lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id); 2542 if (!lreq) { 2543 dout("%s linger_id %llu dne\n", __func__, linger_id); 2544 goto out_unlock; 2545 } 2546 2547 dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n", 2548 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 2549 greq->u.newest); 2550 if (!lreq->map_dne_bound) 2551 lreq->map_dne_bound = greq->u.newest; 2552 erase_linger_mc(&osdc->linger_map_checks, lreq); 2553 check_linger_pool_dne(lreq); 2554 2555 linger_put(lreq); 2556 out_unlock: 2557 up_write(&osdc->lock); 2558 } 2559 2560 static void send_linger_map_check(struct ceph_osd_linger_request *lreq) 2561 { 2562 struct ceph_osd_client *osdc = lreq->osdc; 2563 struct ceph_osd_linger_request *lookup_lreq; 2564 int ret; 2565 2566 verify_osdc_wrlocked(osdc); 2567 2568 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 2569 lreq->linger_id); 2570 if (lookup_lreq) { 2571 WARN_ON(lookup_lreq != lreq); 2572 return; 2573 } 2574 2575 linger_get(lreq); 2576 insert_linger_mc(&osdc->linger_map_checks, lreq); 2577 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 2578 linger_map_check_cb, lreq->linger_id); 2579 WARN_ON(ret); 2580 } 2581 2582 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) 2583 { 2584 int ret; 2585 2586 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 2587 ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); 2588 return ret ?: lreq->reg_commit_error; 2589 } 2590 2591 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq) 2592 { 2593 int ret; 2594 2595 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 2596 ret = wait_for_completion_interruptible(&lreq->notify_finish_wait); 2597 return ret ?: lreq->notify_finish_error; 2598 } 2599 2600 /* 2601 * Timeout callback, called every N seconds. When 1 or more OSD 2602 * requests has been active for more than N seconds, we send a keepalive 2603 * (tag + timestamp) to its OSD to ensure any communications channel 2604 * reset is detected. 2605 */ 2606 static void handle_timeout(struct work_struct *work) 2607 { 2608 struct ceph_osd_client *osdc = 2609 container_of(work, struct ceph_osd_client, timeout_work.work); 2610 struct ceph_options *opts = osdc->client->options; 2611 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout; 2612 unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout; 2613 LIST_HEAD(slow_osds); 2614 struct rb_node *n, *p; 2615 2616 dout("%s osdc %p\n", __func__, osdc); 2617 down_write(&osdc->lock); 2618 2619 /* 2620 * ping osds that are a bit slow. this ensures that if there 2621 * is a break in the TCP connection we will notice, and reopen 2622 * a connection with that osd (from the fault callback). 2623 */ 2624 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 2625 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 2626 bool found = false; 2627 2628 for (p = rb_first(&osd->o_requests); p; ) { 2629 struct ceph_osd_request *req = 2630 rb_entry(p, struct ceph_osd_request, r_node); 2631 2632 p = rb_next(p); /* abort_request() */ 2633 2634 if (time_before(req->r_stamp, cutoff)) { 2635 dout(" req %p tid %llu on osd%d is laggy\n", 2636 req, req->r_tid, osd->o_osd); 2637 found = true; 2638 } 2639 if (opts->osd_request_timeout && 2640 time_before(req->r_start_stamp, expiry_cutoff)) { 2641 pr_err_ratelimited("tid %llu on osd%d timeout\n", 2642 req->r_tid, osd->o_osd); 2643 abort_request(req, -ETIMEDOUT); 2644 } 2645 } 2646 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { 2647 struct ceph_osd_linger_request *lreq = 2648 rb_entry(p, struct ceph_osd_linger_request, node); 2649 2650 dout(" lreq %p linger_id %llu is served by osd%d\n", 2651 lreq, lreq->linger_id, osd->o_osd); 2652 found = true; 2653 2654 mutex_lock(&lreq->lock); 2655 if (lreq->is_watch && lreq->committed && !lreq->last_error) 2656 send_linger_ping(lreq); 2657 mutex_unlock(&lreq->lock); 2658 } 2659 2660 if (found) 2661 list_move_tail(&osd->o_keepalive_item, &slow_osds); 2662 } 2663 2664 if (opts->osd_request_timeout) { 2665 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { 2666 struct ceph_osd_request *req = 2667 rb_entry(p, struct ceph_osd_request, r_node); 2668 2669 p = rb_next(p); /* abort_request() */ 2670 2671 if (time_before(req->r_start_stamp, expiry_cutoff)) { 2672 pr_err_ratelimited("tid %llu on osd%d timeout\n", 2673 req->r_tid, osdc->homeless_osd.o_osd); 2674 abort_request(req, -ETIMEDOUT); 2675 } 2676 } 2677 } 2678 2679 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds)) 2680 maybe_request_map(osdc); 2681 2682 while (!list_empty(&slow_osds)) { 2683 struct ceph_osd *osd = list_first_entry(&slow_osds, 2684 struct ceph_osd, 2685 o_keepalive_item); 2686 list_del_init(&osd->o_keepalive_item); 2687 ceph_con_keepalive(&osd->o_con); 2688 } 2689 2690 up_write(&osdc->lock); 2691 schedule_delayed_work(&osdc->timeout_work, 2692 osdc->client->options->osd_keepalive_timeout); 2693 } 2694 2695 static void handle_osds_timeout(struct work_struct *work) 2696 { 2697 struct ceph_osd_client *osdc = 2698 container_of(work, struct ceph_osd_client, 2699 osds_timeout_work.work); 2700 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 2701 struct ceph_osd *osd, *nosd; 2702 2703 dout("%s osdc %p\n", __func__, osdc); 2704 down_write(&osdc->lock); 2705 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 2706 if (time_before(jiffies, osd->lru_ttl)) 2707 break; 2708 2709 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 2710 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 2711 close_osd(osd); 2712 } 2713 2714 up_write(&osdc->lock); 2715 schedule_delayed_work(&osdc->osds_timeout_work, 2716 round_jiffies_relative(delay)); 2717 } 2718 2719 static int ceph_oloc_decode(void **p, void *end, 2720 struct ceph_object_locator *oloc) 2721 { 2722 u8 struct_v, struct_cv; 2723 u32 len; 2724 void *struct_end; 2725 int ret = 0; 2726 2727 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 2728 struct_v = ceph_decode_8(p); 2729 struct_cv = ceph_decode_8(p); 2730 if (struct_v < 3) { 2731 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 2732 struct_v, struct_cv); 2733 goto e_inval; 2734 } 2735 if (struct_cv > 6) { 2736 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 2737 struct_v, struct_cv); 2738 goto e_inval; 2739 } 2740 len = ceph_decode_32(p); 2741 ceph_decode_need(p, end, len, e_inval); 2742 struct_end = *p + len; 2743 2744 oloc->pool = ceph_decode_64(p); 2745 *p += 4; /* skip preferred */ 2746 2747 len = ceph_decode_32(p); 2748 if (len > 0) { 2749 pr_warn("ceph_object_locator::key is set\n"); 2750 goto e_inval; 2751 } 2752 2753 if (struct_v >= 5) { 2754 bool changed = false; 2755 2756 len = ceph_decode_32(p); 2757 if (len > 0) { 2758 ceph_decode_need(p, end, len, e_inval); 2759 if (!oloc->pool_ns || 2760 ceph_compare_string(oloc->pool_ns, *p, len)) 2761 changed = true; 2762 *p += len; 2763 } else { 2764 if (oloc->pool_ns) 2765 changed = true; 2766 } 2767 if (changed) { 2768 /* redirect changes namespace */ 2769 pr_warn("ceph_object_locator::nspace is changed\n"); 2770 goto e_inval; 2771 } 2772 } 2773 2774 if (struct_v >= 6) { 2775 s64 hash = ceph_decode_64(p); 2776 if (hash != -1) { 2777 pr_warn("ceph_object_locator::hash is set\n"); 2778 goto e_inval; 2779 } 2780 } 2781 2782 /* skip the rest */ 2783 *p = struct_end; 2784 out: 2785 return ret; 2786 2787 e_inval: 2788 ret = -EINVAL; 2789 goto out; 2790 } 2791 2792 static int ceph_redirect_decode(void **p, void *end, 2793 struct ceph_request_redirect *redir) 2794 { 2795 u8 struct_v, struct_cv; 2796 u32 len; 2797 void *struct_end; 2798 int ret; 2799 2800 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 2801 struct_v = ceph_decode_8(p); 2802 struct_cv = ceph_decode_8(p); 2803 if (struct_cv > 1) { 2804 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 2805 struct_v, struct_cv); 2806 goto e_inval; 2807 } 2808 len = ceph_decode_32(p); 2809 ceph_decode_need(p, end, len, e_inval); 2810 struct_end = *p + len; 2811 2812 ret = ceph_oloc_decode(p, end, &redir->oloc); 2813 if (ret) 2814 goto out; 2815 2816 len = ceph_decode_32(p); 2817 if (len > 0) { 2818 pr_warn("ceph_request_redirect::object_name is set\n"); 2819 goto e_inval; 2820 } 2821 2822 len = ceph_decode_32(p); 2823 *p += len; /* skip osd_instructions */ 2824 2825 /* skip the rest */ 2826 *p = struct_end; 2827 out: 2828 return ret; 2829 2830 e_inval: 2831 ret = -EINVAL; 2832 goto out; 2833 } 2834 2835 struct MOSDOpReply { 2836 struct ceph_pg pgid; 2837 u64 flags; 2838 int result; 2839 u32 epoch; 2840 int num_ops; 2841 u32 outdata_len[CEPH_OSD_MAX_OPS]; 2842 s32 rval[CEPH_OSD_MAX_OPS]; 2843 int retry_attempt; 2844 struct ceph_eversion replay_version; 2845 u64 user_version; 2846 struct ceph_request_redirect redirect; 2847 }; 2848 2849 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m) 2850 { 2851 void *p = msg->front.iov_base; 2852 void *const end = p + msg->front.iov_len; 2853 u16 version = le16_to_cpu(msg->hdr.version); 2854 struct ceph_eversion bad_replay_version; 2855 u8 decode_redir; 2856 u32 len; 2857 int ret; 2858 int i; 2859 2860 ceph_decode_32_safe(&p, end, len, e_inval); 2861 ceph_decode_need(&p, end, len, e_inval); 2862 p += len; /* skip oid */ 2863 2864 ret = ceph_decode_pgid(&p, end, &m->pgid); 2865 if (ret) 2866 return ret; 2867 2868 ceph_decode_64_safe(&p, end, m->flags, e_inval); 2869 ceph_decode_32_safe(&p, end, m->result, e_inval); 2870 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval); 2871 memcpy(&bad_replay_version, p, sizeof(bad_replay_version)); 2872 p += sizeof(bad_replay_version); 2873 ceph_decode_32_safe(&p, end, m->epoch, e_inval); 2874 2875 ceph_decode_32_safe(&p, end, m->num_ops, e_inval); 2876 if (m->num_ops > ARRAY_SIZE(m->outdata_len)) 2877 goto e_inval; 2878 2879 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op), 2880 e_inval); 2881 for (i = 0; i < m->num_ops; i++) { 2882 struct ceph_osd_op *op = p; 2883 2884 m->outdata_len[i] = le32_to_cpu(op->payload_len); 2885 p += sizeof(*op); 2886 } 2887 2888 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval); 2889 for (i = 0; i < m->num_ops; i++) 2890 ceph_decode_32_safe(&p, end, m->rval[i], e_inval); 2891 2892 if (version >= 5) { 2893 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval); 2894 memcpy(&m->replay_version, p, sizeof(m->replay_version)); 2895 p += sizeof(m->replay_version); 2896 ceph_decode_64_safe(&p, end, m->user_version, e_inval); 2897 } else { 2898 m->replay_version = bad_replay_version; /* struct */ 2899 m->user_version = le64_to_cpu(m->replay_version.version); 2900 } 2901 2902 if (version >= 6) { 2903 if (version >= 7) 2904 ceph_decode_8_safe(&p, end, decode_redir, e_inval); 2905 else 2906 decode_redir = 1; 2907 } else { 2908 decode_redir = 0; 2909 } 2910 2911 if (decode_redir) { 2912 ret = ceph_redirect_decode(&p, end, &m->redirect); 2913 if (ret) 2914 return ret; 2915 } else { 2916 ceph_oloc_init(&m->redirect.oloc); 2917 } 2918 2919 return 0; 2920 2921 e_inval: 2922 return -EINVAL; 2923 } 2924 2925 /* 2926 * Handle MOSDOpReply. Set ->r_result and call the callback if it is 2927 * specified. 2928 */ 2929 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) 2930 { 2931 struct ceph_osd_client *osdc = osd->o_osdc; 2932 struct ceph_osd_request *req; 2933 struct MOSDOpReply m; 2934 u64 tid = le64_to_cpu(msg->hdr.tid); 2935 u32 data_len = 0; 2936 int ret; 2937 int i; 2938 2939 dout("%s msg %p tid %llu\n", __func__, msg, tid); 2940 2941 down_read(&osdc->lock); 2942 if (!osd_registered(osd)) { 2943 dout("%s osd%d unknown\n", __func__, osd->o_osd); 2944 goto out_unlock_osdc; 2945 } 2946 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 2947 2948 mutex_lock(&osd->lock); 2949 req = lookup_request(&osd->o_requests, tid); 2950 if (!req) { 2951 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid); 2952 goto out_unlock_session; 2953 } 2954 2955 m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns; 2956 ret = decode_MOSDOpReply(msg, &m); 2957 m.redirect.oloc.pool_ns = NULL; 2958 if (ret) { 2959 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n", 2960 req->r_tid, ret); 2961 ceph_msg_dump(msg); 2962 goto fail_request; 2963 } 2964 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n", 2965 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed, 2966 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch), 2967 le64_to_cpu(m.replay_version.version), m.user_version); 2968 2969 if (m.retry_attempt >= 0) { 2970 if (m.retry_attempt != req->r_attempts - 1) { 2971 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n", 2972 req, req->r_tid, m.retry_attempt, 2973 req->r_attempts - 1); 2974 goto out_unlock_session; 2975 } 2976 } else { 2977 WARN_ON(1); /* MOSDOpReply v4 is assumed */ 2978 } 2979 2980 if (!ceph_oloc_empty(&m.redirect.oloc)) { 2981 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid, 2982 m.redirect.oloc.pool); 2983 unlink_request(osd, req); 2984 mutex_unlock(&osd->lock); 2985 2986 /* 2987 * Not ceph_oloc_copy() - changing pool_ns is not 2988 * supported. 2989 */ 2990 req->r_t.target_oloc.pool = m.redirect.oloc.pool; 2991 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED; 2992 req->r_tid = 0; 2993 __submit_request(req, false); 2994 goto out_unlock_osdc; 2995 } 2996 2997 if (m.num_ops != req->r_num_ops) { 2998 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops, 2999 req->r_num_ops, req->r_tid); 3000 goto fail_request; 3001 } 3002 for (i = 0; i < req->r_num_ops; i++) { 3003 dout(" req %p tid %llu op %d rval %d len %u\n", req, 3004 req->r_tid, i, m.rval[i], m.outdata_len[i]); 3005 req->r_ops[i].rval = m.rval[i]; 3006 req->r_ops[i].outdata_len = m.outdata_len[i]; 3007 data_len += m.outdata_len[i]; 3008 } 3009 if (data_len != le32_to_cpu(msg->hdr.data_len)) { 3010 pr_err("sum of lens %u != %u for tid %llu\n", data_len, 3011 le32_to_cpu(msg->hdr.data_len), req->r_tid); 3012 goto fail_request; 3013 } 3014 dout("%s req %p tid %llu result %d data_len %u\n", __func__, 3015 req, req->r_tid, m.result, data_len); 3016 3017 /* 3018 * Since we only ever request ONDISK, we should only ever get 3019 * one (type of) reply back. 3020 */ 3021 WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); 3022 req->r_result = m.result ?: data_len; 3023 finish_request(req); 3024 mutex_unlock(&osd->lock); 3025 up_read(&osdc->lock); 3026 3027 __complete_request(req); 3028 complete_all(&req->r_completion); 3029 ceph_osdc_put_request(req); 3030 return; 3031 3032 fail_request: 3033 complete_request(req, -EIO); 3034 out_unlock_session: 3035 mutex_unlock(&osd->lock); 3036 out_unlock_osdc: 3037 up_read(&osdc->lock); 3038 } 3039 3040 static void set_pool_was_full(struct ceph_osd_client *osdc) 3041 { 3042 struct rb_node *n; 3043 3044 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 3045 struct ceph_pg_pool_info *pi = 3046 rb_entry(n, struct ceph_pg_pool_info, node); 3047 3048 pi->was_full = __pool_full(pi); 3049 } 3050 } 3051 3052 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) 3053 { 3054 struct ceph_pg_pool_info *pi; 3055 3056 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 3057 if (!pi) 3058 return false; 3059 3060 return pi->was_full && !__pool_full(pi); 3061 } 3062 3063 static enum calc_target_result 3064 recalc_linger_target(struct ceph_osd_linger_request *lreq) 3065 { 3066 struct ceph_osd_client *osdc = lreq->osdc; 3067 enum calc_target_result ct_res; 3068 3069 ct_res = calc_target(osdc, &lreq->t, true); 3070 if (ct_res == CALC_TARGET_NEED_RESEND) { 3071 struct ceph_osd *osd; 3072 3073 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3074 if (osd != lreq->osd) { 3075 unlink_linger(lreq->osd, lreq); 3076 link_linger(osd, lreq); 3077 } 3078 } 3079 3080 return ct_res; 3081 } 3082 3083 /* 3084 * Requeue requests whose mapping to an OSD has changed. 3085 */ 3086 static void scan_requests(struct ceph_osd *osd, 3087 bool force_resend, 3088 bool cleared_full, 3089 bool check_pool_cleared_full, 3090 struct rb_root *need_resend, 3091 struct list_head *need_resend_linger) 3092 { 3093 struct ceph_osd_client *osdc = osd->o_osdc; 3094 struct rb_node *n; 3095 bool force_resend_writes; 3096 3097 for (n = rb_first(&osd->o_linger_requests); n; ) { 3098 struct ceph_osd_linger_request *lreq = 3099 rb_entry(n, struct ceph_osd_linger_request, node); 3100 enum calc_target_result ct_res; 3101 3102 n = rb_next(n); /* recalc_linger_target() */ 3103 3104 dout("%s lreq %p linger_id %llu\n", __func__, lreq, 3105 lreq->linger_id); 3106 ct_res = recalc_linger_target(lreq); 3107 switch (ct_res) { 3108 case CALC_TARGET_NO_ACTION: 3109 force_resend_writes = cleared_full || 3110 (check_pool_cleared_full && 3111 pool_cleared_full(osdc, lreq->t.base_oloc.pool)); 3112 if (!force_resend && !force_resend_writes) 3113 break; 3114 3115 /* fall through */ 3116 case CALC_TARGET_NEED_RESEND: 3117 cancel_linger_map_check(lreq); 3118 /* 3119 * scan_requests() for the previous epoch(s) 3120 * may have already added it to the list, since 3121 * it's not unlinked here. 3122 */ 3123 if (list_empty(&lreq->scan_item)) 3124 list_add_tail(&lreq->scan_item, need_resend_linger); 3125 break; 3126 case CALC_TARGET_POOL_DNE: 3127 check_linger_pool_dne(lreq); 3128 break; 3129 } 3130 } 3131 3132 for (n = rb_first(&osd->o_requests); n; ) { 3133 struct ceph_osd_request *req = 3134 rb_entry(n, struct ceph_osd_request, r_node); 3135 enum calc_target_result ct_res; 3136 3137 n = rb_next(n); /* unlink_request(), check_pool_dne() */ 3138 3139 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 3140 ct_res = calc_target(osdc, &req->r_t, false); 3141 switch (ct_res) { 3142 case CALC_TARGET_NO_ACTION: 3143 force_resend_writes = cleared_full || 3144 (check_pool_cleared_full && 3145 pool_cleared_full(osdc, req->r_t.base_oloc.pool)); 3146 if (!force_resend && 3147 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) || 3148 !force_resend_writes)) 3149 break; 3150 3151 /* fall through */ 3152 case CALC_TARGET_NEED_RESEND: 3153 cancel_map_check(req); 3154 unlink_request(osd, req); 3155 insert_request(need_resend, req); 3156 break; 3157 case CALC_TARGET_POOL_DNE: 3158 check_pool_dne(req); 3159 break; 3160 } 3161 } 3162 } 3163 3164 static int handle_one_map(struct ceph_osd_client *osdc, 3165 void *p, void *end, bool incremental, 3166 struct rb_root *need_resend, 3167 struct list_head *need_resend_linger) 3168 { 3169 struct ceph_osdmap *newmap; 3170 struct rb_node *n; 3171 bool skipped_map = false; 3172 bool was_full; 3173 3174 was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3175 set_pool_was_full(osdc); 3176 3177 if (incremental) 3178 newmap = osdmap_apply_incremental(&p, end, osdc->osdmap); 3179 else 3180 newmap = ceph_osdmap_decode(&p, end); 3181 if (IS_ERR(newmap)) 3182 return PTR_ERR(newmap); 3183 3184 if (newmap != osdc->osdmap) { 3185 /* 3186 * Preserve ->was_full before destroying the old map. 3187 * For pools that weren't in the old map, ->was_full 3188 * should be false. 3189 */ 3190 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) { 3191 struct ceph_pg_pool_info *pi = 3192 rb_entry(n, struct ceph_pg_pool_info, node); 3193 struct ceph_pg_pool_info *old_pi; 3194 3195 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id); 3196 if (old_pi) 3197 pi->was_full = old_pi->was_full; 3198 else 3199 WARN_ON(pi->was_full); 3200 } 3201 3202 if (osdc->osdmap->epoch && 3203 osdc->osdmap->epoch + 1 < newmap->epoch) { 3204 WARN_ON(incremental); 3205 skipped_map = true; 3206 } 3207 3208 ceph_osdmap_destroy(osdc->osdmap); 3209 osdc->osdmap = newmap; 3210 } 3211 3212 was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3213 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true, 3214 need_resend, need_resend_linger); 3215 3216 for (n = rb_first(&osdc->osds); n; ) { 3217 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3218 3219 n = rb_next(n); /* close_osd() */ 3220 3221 scan_requests(osd, skipped_map, was_full, true, need_resend, 3222 need_resend_linger); 3223 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 3224 memcmp(&osd->o_con.peer_addr, 3225 ceph_osd_addr(osdc->osdmap, osd->o_osd), 3226 sizeof(struct ceph_entity_addr))) 3227 close_osd(osd); 3228 } 3229 3230 return 0; 3231 } 3232 3233 static void kick_requests(struct ceph_osd_client *osdc, 3234 struct rb_root *need_resend, 3235 struct list_head *need_resend_linger) 3236 { 3237 struct ceph_osd_linger_request *lreq, *nlreq; 3238 struct rb_node *n; 3239 3240 for (n = rb_first(need_resend); n; ) { 3241 struct ceph_osd_request *req = 3242 rb_entry(n, struct ceph_osd_request, r_node); 3243 struct ceph_osd *osd; 3244 3245 n = rb_next(n); 3246 erase_request(need_resend, req); /* before link_request() */ 3247 3248 WARN_ON(req->r_osd); 3249 calc_target(osdc, &req->r_t, false); 3250 osd = lookup_create_osd(osdc, req->r_t.osd, true); 3251 link_request(osd, req); 3252 if (!req->r_linger) { 3253 if (!osd_homeless(osd) && !req->r_t.paused) 3254 send_request(req); 3255 } else { 3256 cancel_linger_request(req); 3257 } 3258 } 3259 3260 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { 3261 if (!osd_homeless(lreq->osd)) 3262 send_linger(lreq); 3263 3264 list_del_init(&lreq->scan_item); 3265 } 3266 } 3267 3268 /* 3269 * Process updated osd map. 3270 * 3271 * The message contains any number of incremental and full maps, normally 3272 * indicating some sort of topology change in the cluster. Kick requests 3273 * off to different OSDs as needed. 3274 */ 3275 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 3276 { 3277 void *p = msg->front.iov_base; 3278 void *const end = p + msg->front.iov_len; 3279 u32 nr_maps, maplen; 3280 u32 epoch; 3281 struct ceph_fsid fsid; 3282 struct rb_root need_resend = RB_ROOT; 3283 LIST_HEAD(need_resend_linger); 3284 bool handled_incremental = false; 3285 bool was_pauserd, was_pausewr; 3286 bool pauserd, pausewr; 3287 int err; 3288 3289 dout("%s have %u\n", __func__, osdc->osdmap->epoch); 3290 down_write(&osdc->lock); 3291 3292 /* verify fsid */ 3293 ceph_decode_need(&p, end, sizeof(fsid), bad); 3294 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3295 if (ceph_check_fsid(osdc->client, &fsid) < 0) 3296 goto bad; 3297 3298 was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3299 was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3300 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3301 have_pool_full(osdc); 3302 3303 /* incremental maps */ 3304 ceph_decode_32_safe(&p, end, nr_maps, bad); 3305 dout(" %d inc maps\n", nr_maps); 3306 while (nr_maps > 0) { 3307 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3308 epoch = ceph_decode_32(&p); 3309 maplen = ceph_decode_32(&p); 3310 ceph_decode_need(&p, end, maplen, bad); 3311 if (osdc->osdmap->epoch && 3312 osdc->osdmap->epoch + 1 == epoch) { 3313 dout("applying incremental map %u len %d\n", 3314 epoch, maplen); 3315 err = handle_one_map(osdc, p, p + maplen, true, 3316 &need_resend, &need_resend_linger); 3317 if (err) 3318 goto bad; 3319 handled_incremental = true; 3320 } else { 3321 dout("ignoring incremental map %u len %d\n", 3322 epoch, maplen); 3323 } 3324 p += maplen; 3325 nr_maps--; 3326 } 3327 if (handled_incremental) 3328 goto done; 3329 3330 /* full maps */ 3331 ceph_decode_32_safe(&p, end, nr_maps, bad); 3332 dout(" %d full maps\n", nr_maps); 3333 while (nr_maps) { 3334 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3335 epoch = ceph_decode_32(&p); 3336 maplen = ceph_decode_32(&p); 3337 ceph_decode_need(&p, end, maplen, bad); 3338 if (nr_maps > 1) { 3339 dout("skipping non-latest full map %u len %d\n", 3340 epoch, maplen); 3341 } else if (osdc->osdmap->epoch >= epoch) { 3342 dout("skipping full map %u len %d, " 3343 "older than our %u\n", epoch, maplen, 3344 osdc->osdmap->epoch); 3345 } else { 3346 dout("taking full map %u len %d\n", epoch, maplen); 3347 err = handle_one_map(osdc, p, p + maplen, false, 3348 &need_resend, &need_resend_linger); 3349 if (err) 3350 goto bad; 3351 } 3352 p += maplen; 3353 nr_maps--; 3354 } 3355 3356 done: 3357 /* 3358 * subscribe to subsequent osdmap updates if full to ensure 3359 * we find out when we are no longer full and stop returning 3360 * ENOSPC. 3361 */ 3362 pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3363 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3364 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3365 have_pool_full(osdc); 3366 if (was_pauserd || was_pausewr || pauserd || pausewr || 3367 osdc->osdmap->epoch < osdc->epoch_barrier) 3368 maybe_request_map(osdc); 3369 3370 kick_requests(osdc, &need_resend, &need_resend_linger); 3371 3372 ceph_osdc_abort_on_full(osdc); 3373 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 3374 osdc->osdmap->epoch); 3375 up_write(&osdc->lock); 3376 wake_up_all(&osdc->client->auth_wq); 3377 return; 3378 3379 bad: 3380 pr_err("osdc handle_map corrupt msg\n"); 3381 ceph_msg_dump(msg); 3382 up_write(&osdc->lock); 3383 } 3384 3385 /* 3386 * Resubmit requests pending on the given osd. 3387 */ 3388 static void kick_osd_requests(struct ceph_osd *osd) 3389 { 3390 struct rb_node *n; 3391 3392 for (n = rb_first(&osd->o_requests); n; ) { 3393 struct ceph_osd_request *req = 3394 rb_entry(n, struct ceph_osd_request, r_node); 3395 3396 n = rb_next(n); /* cancel_linger_request() */ 3397 3398 if (!req->r_linger) { 3399 if (!req->r_t.paused) 3400 send_request(req); 3401 } else { 3402 cancel_linger_request(req); 3403 } 3404 } 3405 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { 3406 struct ceph_osd_linger_request *lreq = 3407 rb_entry(n, struct ceph_osd_linger_request, node); 3408 3409 send_linger(lreq); 3410 } 3411 } 3412 3413 /* 3414 * If the osd connection drops, we need to resubmit all requests. 3415 */ 3416 static void osd_fault(struct ceph_connection *con) 3417 { 3418 struct ceph_osd *osd = con->private; 3419 struct ceph_osd_client *osdc = osd->o_osdc; 3420 3421 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 3422 3423 down_write(&osdc->lock); 3424 if (!osd_registered(osd)) { 3425 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3426 goto out_unlock; 3427 } 3428 3429 if (!reopen_osd(osd)) 3430 kick_osd_requests(osd); 3431 maybe_request_map(osdc); 3432 3433 out_unlock: 3434 up_write(&osdc->lock); 3435 } 3436 3437 /* 3438 * Process osd watch notifications 3439 */ 3440 static void handle_watch_notify(struct ceph_osd_client *osdc, 3441 struct ceph_msg *msg) 3442 { 3443 void *p = msg->front.iov_base; 3444 void *const end = p + msg->front.iov_len; 3445 struct ceph_osd_linger_request *lreq; 3446 struct linger_work *lwork; 3447 u8 proto_ver, opcode; 3448 u64 cookie, notify_id; 3449 u64 notifier_id = 0; 3450 s32 return_code = 0; 3451 void *payload = NULL; 3452 u32 payload_len = 0; 3453 3454 ceph_decode_8_safe(&p, end, proto_ver, bad); 3455 ceph_decode_8_safe(&p, end, opcode, bad); 3456 ceph_decode_64_safe(&p, end, cookie, bad); 3457 p += 8; /* skip ver */ 3458 ceph_decode_64_safe(&p, end, notify_id, bad); 3459 3460 if (proto_ver >= 1) { 3461 ceph_decode_32_safe(&p, end, payload_len, bad); 3462 ceph_decode_need(&p, end, payload_len, bad); 3463 payload = p; 3464 p += payload_len; 3465 } 3466 3467 if (le16_to_cpu(msg->hdr.version) >= 2) 3468 ceph_decode_32_safe(&p, end, return_code, bad); 3469 3470 if (le16_to_cpu(msg->hdr.version) >= 3) 3471 ceph_decode_64_safe(&p, end, notifier_id, bad); 3472 3473 down_read(&osdc->lock); 3474 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); 3475 if (!lreq) { 3476 dout("%s opcode %d cookie %llu dne\n", __func__, opcode, 3477 cookie); 3478 goto out_unlock_osdc; 3479 } 3480 3481 mutex_lock(&lreq->lock); 3482 dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__, 3483 opcode, cookie, lreq, lreq->is_watch); 3484 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { 3485 if (!lreq->last_error) { 3486 lreq->last_error = -ENOTCONN; 3487 queue_watch_error(lreq); 3488 } 3489 } else if (!lreq->is_watch) { 3490 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */ 3491 if (lreq->notify_id && lreq->notify_id != notify_id) { 3492 dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq, 3493 lreq->notify_id, notify_id); 3494 } else if (!completion_done(&lreq->notify_finish_wait)) { 3495 struct ceph_msg_data *data = 3496 list_first_entry_or_null(&msg->data, 3497 struct ceph_msg_data, 3498 links); 3499 3500 if (data) { 3501 if (lreq->preply_pages) { 3502 WARN_ON(data->type != 3503 CEPH_MSG_DATA_PAGES); 3504 *lreq->preply_pages = data->pages; 3505 *lreq->preply_len = data->length; 3506 } else { 3507 ceph_release_page_vector(data->pages, 3508 calc_pages_for(0, data->length)); 3509 } 3510 } 3511 lreq->notify_finish_error = return_code; 3512 complete_all(&lreq->notify_finish_wait); 3513 } 3514 } else { 3515 /* CEPH_WATCH_EVENT_NOTIFY */ 3516 lwork = lwork_alloc(lreq, do_watch_notify); 3517 if (!lwork) { 3518 pr_err("failed to allocate notify-lwork\n"); 3519 goto out_unlock_lreq; 3520 } 3521 3522 lwork->notify.notify_id = notify_id; 3523 lwork->notify.notifier_id = notifier_id; 3524 lwork->notify.payload = payload; 3525 lwork->notify.payload_len = payload_len; 3526 lwork->notify.msg = ceph_msg_get(msg); 3527 lwork_queue(lwork); 3528 } 3529 3530 out_unlock_lreq: 3531 mutex_unlock(&lreq->lock); 3532 out_unlock_osdc: 3533 up_read(&osdc->lock); 3534 return; 3535 3536 bad: 3537 pr_err("osdc handle_watch_notify corrupt msg\n"); 3538 } 3539 3540 /* 3541 * Register request, send initial attempt. 3542 */ 3543 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 3544 struct ceph_osd_request *req, 3545 bool nofail) 3546 { 3547 down_read(&osdc->lock); 3548 submit_request(req, false); 3549 up_read(&osdc->lock); 3550 3551 return 0; 3552 } 3553 EXPORT_SYMBOL(ceph_osdc_start_request); 3554 3555 /* 3556 * Unregister a registered request. The request is not completed: 3557 * ->r_result isn't set and __complete_request() isn't called. 3558 */ 3559 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 3560 { 3561 struct ceph_osd_client *osdc = req->r_osdc; 3562 3563 down_write(&osdc->lock); 3564 if (req->r_osd) 3565 cancel_request(req); 3566 up_write(&osdc->lock); 3567 } 3568 EXPORT_SYMBOL(ceph_osdc_cancel_request); 3569 3570 /* 3571 * @timeout: in jiffies, 0 means "wait forever" 3572 */ 3573 static int wait_request_timeout(struct ceph_osd_request *req, 3574 unsigned long timeout) 3575 { 3576 long left; 3577 3578 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 3579 left = wait_for_completion_killable_timeout(&req->r_completion, 3580 ceph_timeout_jiffies(timeout)); 3581 if (left <= 0) { 3582 left = left ?: -ETIMEDOUT; 3583 ceph_osdc_cancel_request(req); 3584 } else { 3585 left = req->r_result; /* completed */ 3586 } 3587 3588 return left; 3589 } 3590 3591 /* 3592 * wait for a request to complete 3593 */ 3594 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 3595 struct ceph_osd_request *req) 3596 { 3597 return wait_request_timeout(req, 0); 3598 } 3599 EXPORT_SYMBOL(ceph_osdc_wait_request); 3600 3601 /* 3602 * sync - wait for all in-flight requests to flush. avoid starvation. 3603 */ 3604 void ceph_osdc_sync(struct ceph_osd_client *osdc) 3605 { 3606 struct rb_node *n, *p; 3607 u64 last_tid = atomic64_read(&osdc->last_tid); 3608 3609 again: 3610 down_read(&osdc->lock); 3611 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 3612 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3613 3614 mutex_lock(&osd->lock); 3615 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) { 3616 struct ceph_osd_request *req = 3617 rb_entry(p, struct ceph_osd_request, r_node); 3618 3619 if (req->r_tid > last_tid) 3620 break; 3621 3622 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE)) 3623 continue; 3624 3625 ceph_osdc_get_request(req); 3626 mutex_unlock(&osd->lock); 3627 up_read(&osdc->lock); 3628 dout("%s waiting on req %p tid %llu last_tid %llu\n", 3629 __func__, req, req->r_tid, last_tid); 3630 wait_for_completion(&req->r_completion); 3631 ceph_osdc_put_request(req); 3632 goto again; 3633 } 3634 3635 mutex_unlock(&osd->lock); 3636 } 3637 3638 up_read(&osdc->lock); 3639 dout("%s done last_tid %llu\n", __func__, last_tid); 3640 } 3641 EXPORT_SYMBOL(ceph_osdc_sync); 3642 3643 static struct ceph_osd_request * 3644 alloc_linger_request(struct ceph_osd_linger_request *lreq) 3645 { 3646 struct ceph_osd_request *req; 3647 3648 req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO); 3649 if (!req) 3650 return NULL; 3651 3652 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 3653 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 3654 3655 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { 3656 ceph_osdc_put_request(req); 3657 return NULL; 3658 } 3659 3660 return req; 3661 } 3662 3663 /* 3664 * Returns a handle, caller owns a ref. 3665 */ 3666 struct ceph_osd_linger_request * 3667 ceph_osdc_watch(struct ceph_osd_client *osdc, 3668 struct ceph_object_id *oid, 3669 struct ceph_object_locator *oloc, 3670 rados_watchcb2_t wcb, 3671 rados_watcherrcb_t errcb, 3672 void *data) 3673 { 3674 struct ceph_osd_linger_request *lreq; 3675 int ret; 3676 3677 lreq = linger_alloc(osdc); 3678 if (!lreq) 3679 return ERR_PTR(-ENOMEM); 3680 3681 lreq->is_watch = true; 3682 lreq->wcb = wcb; 3683 lreq->errcb = errcb; 3684 lreq->data = data; 3685 lreq->watch_valid_thru = jiffies; 3686 3687 ceph_oid_copy(&lreq->t.base_oid, oid); 3688 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 3689 lreq->t.flags = CEPH_OSD_FLAG_WRITE; 3690 ktime_get_real_ts(&lreq->mtime); 3691 3692 lreq->reg_req = alloc_linger_request(lreq); 3693 if (!lreq->reg_req) { 3694 ret = -ENOMEM; 3695 goto err_put_lreq; 3696 } 3697 3698 lreq->ping_req = alloc_linger_request(lreq); 3699 if (!lreq->ping_req) { 3700 ret = -ENOMEM; 3701 goto err_put_lreq; 3702 } 3703 3704 down_write(&osdc->lock); 3705 linger_register(lreq); /* before osd_req_op_* */ 3706 osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, 3707 CEPH_OSD_WATCH_OP_WATCH); 3708 osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, 3709 CEPH_OSD_WATCH_OP_PING); 3710 linger_submit(lreq); 3711 up_write(&osdc->lock); 3712 3713 ret = linger_reg_commit_wait(lreq); 3714 if (ret) { 3715 linger_cancel(lreq); 3716 goto err_put_lreq; 3717 } 3718 3719 return lreq; 3720 3721 err_put_lreq: 3722 linger_put(lreq); 3723 return ERR_PTR(ret); 3724 } 3725 EXPORT_SYMBOL(ceph_osdc_watch); 3726 3727 /* 3728 * Releases a ref. 3729 * 3730 * Times out after mount_timeout to preserve rbd unmap behaviour 3731 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap 3732 * with mount_timeout"). 3733 */ 3734 int ceph_osdc_unwatch(struct ceph_osd_client *osdc, 3735 struct ceph_osd_linger_request *lreq) 3736 { 3737 struct ceph_options *opts = osdc->client->options; 3738 struct ceph_osd_request *req; 3739 int ret; 3740 3741 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 3742 if (!req) 3743 return -ENOMEM; 3744 3745 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 3746 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 3747 req->r_flags = CEPH_OSD_FLAG_WRITE; 3748 ktime_get_real_ts(&req->r_mtime); 3749 osd_req_op_watch_init(req, 0, lreq->linger_id, 3750 CEPH_OSD_WATCH_OP_UNWATCH); 3751 3752 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 3753 if (ret) 3754 goto out_put_req; 3755 3756 ceph_osdc_start_request(osdc, req, false); 3757 linger_cancel(lreq); 3758 linger_put(lreq); 3759 ret = wait_request_timeout(req, opts->mount_timeout); 3760 3761 out_put_req: 3762 ceph_osdc_put_request(req); 3763 return ret; 3764 } 3765 EXPORT_SYMBOL(ceph_osdc_unwatch); 3766 3767 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, 3768 u64 notify_id, u64 cookie, void *payload, 3769 size_t payload_len) 3770 { 3771 struct ceph_osd_req_op *op; 3772 struct ceph_pagelist *pl; 3773 int ret; 3774 3775 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); 3776 3777 pl = kmalloc(sizeof(*pl), GFP_NOIO); 3778 if (!pl) 3779 return -ENOMEM; 3780 3781 ceph_pagelist_init(pl); 3782 ret = ceph_pagelist_encode_64(pl, notify_id); 3783 ret |= ceph_pagelist_encode_64(pl, cookie); 3784 if (payload) { 3785 ret |= ceph_pagelist_encode_32(pl, payload_len); 3786 ret |= ceph_pagelist_append(pl, payload, payload_len); 3787 } else { 3788 ret |= ceph_pagelist_encode_32(pl, 0); 3789 } 3790 if (ret) { 3791 ceph_pagelist_release(pl); 3792 return -ENOMEM; 3793 } 3794 3795 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); 3796 op->indata_len = pl->length; 3797 return 0; 3798 } 3799 3800 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, 3801 struct ceph_object_id *oid, 3802 struct ceph_object_locator *oloc, 3803 u64 notify_id, 3804 u64 cookie, 3805 void *payload, 3806 size_t payload_len) 3807 { 3808 struct ceph_osd_request *req; 3809 int ret; 3810 3811 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 3812 if (!req) 3813 return -ENOMEM; 3814 3815 ceph_oid_copy(&req->r_base_oid, oid); 3816 ceph_oloc_copy(&req->r_base_oloc, oloc); 3817 req->r_flags = CEPH_OSD_FLAG_READ; 3818 3819 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 3820 if (ret) 3821 goto out_put_req; 3822 3823 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, 3824 payload_len); 3825 if (ret) 3826 goto out_put_req; 3827 3828 ceph_osdc_start_request(osdc, req, false); 3829 ret = ceph_osdc_wait_request(osdc, req); 3830 3831 out_put_req: 3832 ceph_osdc_put_request(req); 3833 return ret; 3834 } 3835 EXPORT_SYMBOL(ceph_osdc_notify_ack); 3836 3837 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which, 3838 u64 cookie, u32 prot_ver, u32 timeout, 3839 void *payload, size_t payload_len) 3840 { 3841 struct ceph_osd_req_op *op; 3842 struct ceph_pagelist *pl; 3843 int ret; 3844 3845 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); 3846 op->notify.cookie = cookie; 3847 3848 pl = kmalloc(sizeof(*pl), GFP_NOIO); 3849 if (!pl) 3850 return -ENOMEM; 3851 3852 ceph_pagelist_init(pl); 3853 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ 3854 ret |= ceph_pagelist_encode_32(pl, timeout); 3855 ret |= ceph_pagelist_encode_32(pl, payload_len); 3856 ret |= ceph_pagelist_append(pl, payload, payload_len); 3857 if (ret) { 3858 ceph_pagelist_release(pl); 3859 return -ENOMEM; 3860 } 3861 3862 ceph_osd_data_pagelist_init(&op->notify.request_data, pl); 3863 op->indata_len = pl->length; 3864 return 0; 3865 } 3866 3867 /* 3868 * @timeout: in seconds 3869 * 3870 * @preply_{pages,len} are initialized both on success and error. 3871 * The caller is responsible for: 3872 * 3873 * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)) 3874 */ 3875 int ceph_osdc_notify(struct ceph_osd_client *osdc, 3876 struct ceph_object_id *oid, 3877 struct ceph_object_locator *oloc, 3878 void *payload, 3879 size_t payload_len, 3880 u32 timeout, 3881 struct page ***preply_pages, 3882 size_t *preply_len) 3883 { 3884 struct ceph_osd_linger_request *lreq; 3885 struct page **pages; 3886 int ret; 3887 3888 WARN_ON(!timeout); 3889 if (preply_pages) { 3890 *preply_pages = NULL; 3891 *preply_len = 0; 3892 } 3893 3894 lreq = linger_alloc(osdc); 3895 if (!lreq) 3896 return -ENOMEM; 3897 3898 lreq->preply_pages = preply_pages; 3899 lreq->preply_len = preply_len; 3900 3901 ceph_oid_copy(&lreq->t.base_oid, oid); 3902 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 3903 lreq->t.flags = CEPH_OSD_FLAG_READ; 3904 3905 lreq->reg_req = alloc_linger_request(lreq); 3906 if (!lreq->reg_req) { 3907 ret = -ENOMEM; 3908 goto out_put_lreq; 3909 } 3910 3911 /* for notify_id */ 3912 pages = ceph_alloc_page_vector(1, GFP_NOIO); 3913 if (IS_ERR(pages)) { 3914 ret = PTR_ERR(pages); 3915 goto out_put_lreq; 3916 } 3917 3918 down_write(&osdc->lock); 3919 linger_register(lreq); /* before osd_req_op_* */ 3920 ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1, 3921 timeout, payload, payload_len); 3922 if (ret) { 3923 linger_unregister(lreq); 3924 up_write(&osdc->lock); 3925 ceph_release_page_vector(pages, 1); 3926 goto out_put_lreq; 3927 } 3928 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, 3929 response_data), 3930 pages, PAGE_SIZE, 0, false, true); 3931 linger_submit(lreq); 3932 up_write(&osdc->lock); 3933 3934 ret = linger_reg_commit_wait(lreq); 3935 if (!ret) 3936 ret = linger_notify_finish_wait(lreq); 3937 else 3938 dout("lreq %p failed to initiate notify %d\n", lreq, ret); 3939 3940 linger_cancel(lreq); 3941 out_put_lreq: 3942 linger_put(lreq); 3943 return ret; 3944 } 3945 EXPORT_SYMBOL(ceph_osdc_notify); 3946 3947 /* 3948 * Return the number of milliseconds since the watch was last 3949 * confirmed, or an error. If there is an error, the watch is no 3950 * longer valid, and should be destroyed with ceph_osdc_unwatch(). 3951 */ 3952 int ceph_osdc_watch_check(struct ceph_osd_client *osdc, 3953 struct ceph_osd_linger_request *lreq) 3954 { 3955 unsigned long stamp, age; 3956 int ret; 3957 3958 down_read(&osdc->lock); 3959 mutex_lock(&lreq->lock); 3960 stamp = lreq->watch_valid_thru; 3961 if (!list_empty(&lreq->pending_lworks)) { 3962 struct linger_work *lwork = 3963 list_first_entry(&lreq->pending_lworks, 3964 struct linger_work, 3965 pending_item); 3966 3967 if (time_before(lwork->queued_stamp, stamp)) 3968 stamp = lwork->queued_stamp; 3969 } 3970 age = jiffies - stamp; 3971 dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__, 3972 lreq, lreq->linger_id, age, lreq->last_error); 3973 /* we are truncating to msecs, so return a safe upper bound */ 3974 ret = lreq->last_error ?: 1 + jiffies_to_msecs(age); 3975 3976 mutex_unlock(&lreq->lock); 3977 up_read(&osdc->lock); 3978 return ret; 3979 } 3980 3981 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) 3982 { 3983 u8 struct_v; 3984 u32 struct_len; 3985 int ret; 3986 3987 ret = ceph_start_decoding(p, end, 2, "watch_item_t", 3988 &struct_v, &struct_len); 3989 if (ret) 3990 return ret; 3991 3992 ceph_decode_copy(p, &item->name, sizeof(item->name)); 3993 item->cookie = ceph_decode_64(p); 3994 *p += 4; /* skip timeout_seconds */ 3995 if (struct_v >= 2) { 3996 ceph_decode_copy(p, &item->addr, sizeof(item->addr)); 3997 ceph_decode_addr(&item->addr); 3998 } 3999 4000 dout("%s %s%llu cookie %llu addr %s\n", __func__, 4001 ENTITY_NAME(item->name), item->cookie, 4002 ceph_pr_addr(&item->addr.in_addr)); 4003 return 0; 4004 } 4005 4006 static int decode_watchers(void **p, void *end, 4007 struct ceph_watch_item **watchers, 4008 u32 *num_watchers) 4009 { 4010 u8 struct_v; 4011 u32 struct_len; 4012 int i; 4013 int ret; 4014 4015 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t", 4016 &struct_v, &struct_len); 4017 if (ret) 4018 return ret; 4019 4020 *num_watchers = ceph_decode_32(p); 4021 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO); 4022 if (!*watchers) 4023 return -ENOMEM; 4024 4025 for (i = 0; i < *num_watchers; i++) { 4026 ret = decode_watcher(p, end, *watchers + i); 4027 if (ret) { 4028 kfree(*watchers); 4029 return ret; 4030 } 4031 } 4032 4033 return 0; 4034 } 4035 4036 /* 4037 * On success, the caller is responsible for: 4038 * 4039 * kfree(watchers); 4040 */ 4041 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, 4042 struct ceph_object_id *oid, 4043 struct ceph_object_locator *oloc, 4044 struct ceph_watch_item **watchers, 4045 u32 *num_watchers) 4046 { 4047 struct ceph_osd_request *req; 4048 struct page **pages; 4049 int ret; 4050 4051 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4052 if (!req) 4053 return -ENOMEM; 4054 4055 ceph_oid_copy(&req->r_base_oid, oid); 4056 ceph_oloc_copy(&req->r_base_oloc, oloc); 4057 req->r_flags = CEPH_OSD_FLAG_READ; 4058 4059 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4060 if (ret) 4061 goto out_put_req; 4062 4063 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4064 if (IS_ERR(pages)) { 4065 ret = PTR_ERR(pages); 4066 goto out_put_req; 4067 } 4068 4069 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0); 4070 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers, 4071 response_data), 4072 pages, PAGE_SIZE, 0, false, true); 4073 4074 ceph_osdc_start_request(osdc, req, false); 4075 ret = ceph_osdc_wait_request(osdc, req); 4076 if (ret >= 0) { 4077 void *p = page_address(pages[0]); 4078 void *const end = p + req->r_ops[0].outdata_len; 4079 4080 ret = decode_watchers(&p, end, watchers, num_watchers); 4081 } 4082 4083 out_put_req: 4084 ceph_osdc_put_request(req); 4085 return ret; 4086 } 4087 EXPORT_SYMBOL(ceph_osdc_list_watchers); 4088 4089 /* 4090 * Call all pending notify callbacks - for use after a watch is 4091 * unregistered, to make sure no more callbacks for it will be invoked 4092 */ 4093 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 4094 { 4095 dout("%s osdc %p\n", __func__, osdc); 4096 flush_workqueue(osdc->notify_wq); 4097 } 4098 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 4099 4100 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) 4101 { 4102 down_read(&osdc->lock); 4103 maybe_request_map(osdc); 4104 up_read(&osdc->lock); 4105 } 4106 EXPORT_SYMBOL(ceph_osdc_maybe_request_map); 4107 4108 /* 4109 * Execute an OSD class method on an object. 4110 * 4111 * @flags: CEPH_OSD_FLAG_* 4112 * @resp_len: in/out param for reply length 4113 */ 4114 int ceph_osdc_call(struct ceph_osd_client *osdc, 4115 struct ceph_object_id *oid, 4116 struct ceph_object_locator *oloc, 4117 const char *class, const char *method, 4118 unsigned int flags, 4119 struct page *req_page, size_t req_len, 4120 struct page *resp_page, size_t *resp_len) 4121 { 4122 struct ceph_osd_request *req; 4123 int ret; 4124 4125 if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE)) 4126 return -E2BIG; 4127 4128 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4129 if (!req) 4130 return -ENOMEM; 4131 4132 ceph_oid_copy(&req->r_base_oid, oid); 4133 ceph_oloc_copy(&req->r_base_oloc, oloc); 4134 req->r_flags = flags; 4135 4136 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4137 if (ret) 4138 goto out_put_req; 4139 4140 osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); 4141 if (req_page) 4142 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 4143 0, false, false); 4144 if (resp_page) 4145 osd_req_op_cls_response_data_pages(req, 0, &resp_page, 4146 *resp_len, 0, false, false); 4147 4148 ceph_osdc_start_request(osdc, req, false); 4149 ret = ceph_osdc_wait_request(osdc, req); 4150 if (ret >= 0) { 4151 ret = req->r_ops[0].rval; 4152 if (resp_page) 4153 *resp_len = req->r_ops[0].outdata_len; 4154 } 4155 4156 out_put_req: 4157 ceph_osdc_put_request(req); 4158 return ret; 4159 } 4160 EXPORT_SYMBOL(ceph_osdc_call); 4161 4162 /* 4163 * init, shutdown 4164 */ 4165 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 4166 { 4167 int err; 4168 4169 dout("init\n"); 4170 osdc->client = client; 4171 init_rwsem(&osdc->lock); 4172 osdc->osds = RB_ROOT; 4173 INIT_LIST_HEAD(&osdc->osd_lru); 4174 spin_lock_init(&osdc->osd_lru_lock); 4175 osd_init(&osdc->homeless_osd); 4176 osdc->homeless_osd.o_osdc = osdc; 4177 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; 4178 osdc->last_linger_id = CEPH_LINGER_ID_START; 4179 osdc->linger_requests = RB_ROOT; 4180 osdc->map_checks = RB_ROOT; 4181 osdc->linger_map_checks = RB_ROOT; 4182 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 4183 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 4184 4185 err = -ENOMEM; 4186 osdc->osdmap = ceph_osdmap_alloc(); 4187 if (!osdc->osdmap) 4188 goto out; 4189 4190 osdc->req_mempool = mempool_create_slab_pool(10, 4191 ceph_osd_request_cache); 4192 if (!osdc->req_mempool) 4193 goto out_map; 4194 4195 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 4196 PAGE_SIZE, 10, true, "osd_op"); 4197 if (err < 0) 4198 goto out_mempool; 4199 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 4200 PAGE_SIZE, 10, true, "osd_op_reply"); 4201 if (err < 0) 4202 goto out_msgpool; 4203 4204 err = -ENOMEM; 4205 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 4206 if (!osdc->notify_wq) 4207 goto out_msgpool_reply; 4208 4209 schedule_delayed_work(&osdc->timeout_work, 4210 osdc->client->options->osd_keepalive_timeout); 4211 schedule_delayed_work(&osdc->osds_timeout_work, 4212 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 4213 4214 return 0; 4215 4216 out_msgpool_reply: 4217 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 4218 out_msgpool: 4219 ceph_msgpool_destroy(&osdc->msgpool_op); 4220 out_mempool: 4221 mempool_destroy(osdc->req_mempool); 4222 out_map: 4223 ceph_osdmap_destroy(osdc->osdmap); 4224 out: 4225 return err; 4226 } 4227 4228 void ceph_osdc_stop(struct ceph_osd_client *osdc) 4229 { 4230 flush_workqueue(osdc->notify_wq); 4231 destroy_workqueue(osdc->notify_wq); 4232 cancel_delayed_work_sync(&osdc->timeout_work); 4233 cancel_delayed_work_sync(&osdc->osds_timeout_work); 4234 4235 down_write(&osdc->lock); 4236 while (!RB_EMPTY_ROOT(&osdc->osds)) { 4237 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 4238 struct ceph_osd, o_node); 4239 close_osd(osd); 4240 } 4241 up_write(&osdc->lock); 4242 WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1); 4243 osd_cleanup(&osdc->homeless_osd); 4244 4245 WARN_ON(!list_empty(&osdc->osd_lru)); 4246 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); 4247 WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks)); 4248 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks)); 4249 WARN_ON(atomic_read(&osdc->num_requests)); 4250 WARN_ON(atomic_read(&osdc->num_homeless)); 4251 4252 ceph_osdmap_destroy(osdc->osdmap); 4253 mempool_destroy(osdc->req_mempool); 4254 ceph_msgpool_destroy(&osdc->msgpool_op); 4255 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 4256 } 4257 4258 /* 4259 * Read some contiguous pages. If we cross a stripe boundary, shorten 4260 * *plen. Return number of bytes read, or error. 4261 */ 4262 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 4263 struct ceph_vino vino, struct ceph_file_layout *layout, 4264 u64 off, u64 *plen, 4265 u32 truncate_seq, u64 truncate_size, 4266 struct page **pages, int num_pages, int page_align) 4267 { 4268 struct ceph_osd_request *req; 4269 int rc = 0; 4270 4271 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 4272 vino.snap, off, *plen); 4273 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, 4274 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 4275 NULL, truncate_seq, truncate_size, 4276 false); 4277 if (IS_ERR(req)) 4278 return PTR_ERR(req); 4279 4280 /* it may be a short read due to an object boundary */ 4281 osd_req_op_extent_osd_data_pages(req, 0, 4282 pages, *plen, page_align, false, false); 4283 4284 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 4285 off, *plen, *plen, page_align); 4286 4287 rc = ceph_osdc_start_request(osdc, req, false); 4288 if (!rc) 4289 rc = ceph_osdc_wait_request(osdc, req); 4290 4291 ceph_osdc_put_request(req); 4292 dout("readpages result %d\n", rc); 4293 return rc; 4294 } 4295 EXPORT_SYMBOL(ceph_osdc_readpages); 4296 4297 /* 4298 * do a synchronous write on N pages 4299 */ 4300 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 4301 struct ceph_file_layout *layout, 4302 struct ceph_snap_context *snapc, 4303 u64 off, u64 len, 4304 u32 truncate_seq, u64 truncate_size, 4305 struct timespec *mtime, 4306 struct page **pages, int num_pages) 4307 { 4308 struct ceph_osd_request *req; 4309 int rc = 0; 4310 int page_align = off & ~PAGE_MASK; 4311 4312 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 4313 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 4314 snapc, truncate_seq, truncate_size, 4315 true); 4316 if (IS_ERR(req)) 4317 return PTR_ERR(req); 4318 4319 /* it may be a short write due to an object boundary */ 4320 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 4321 false, false); 4322 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 4323 4324 req->r_mtime = *mtime; 4325 rc = ceph_osdc_start_request(osdc, req, true); 4326 if (!rc) 4327 rc = ceph_osdc_wait_request(osdc, req); 4328 4329 ceph_osdc_put_request(req); 4330 if (rc == 0) 4331 rc = len; 4332 dout("writepages result %d\n", rc); 4333 return rc; 4334 } 4335 EXPORT_SYMBOL(ceph_osdc_writepages); 4336 4337 int ceph_osdc_setup(void) 4338 { 4339 size_t size = sizeof(struct ceph_osd_request) + 4340 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 4341 4342 BUG_ON(ceph_osd_request_cache); 4343 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 4344 0, 0, NULL); 4345 4346 return ceph_osd_request_cache ? 0 : -ENOMEM; 4347 } 4348 EXPORT_SYMBOL(ceph_osdc_setup); 4349 4350 void ceph_osdc_cleanup(void) 4351 { 4352 BUG_ON(!ceph_osd_request_cache); 4353 kmem_cache_destroy(ceph_osd_request_cache); 4354 ceph_osd_request_cache = NULL; 4355 } 4356 EXPORT_SYMBOL(ceph_osdc_cleanup); 4357 4358 /* 4359 * handle incoming message 4360 */ 4361 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 4362 { 4363 struct ceph_osd *osd = con->private; 4364 struct ceph_osd_client *osdc = osd->o_osdc; 4365 int type = le16_to_cpu(msg->hdr.type); 4366 4367 switch (type) { 4368 case CEPH_MSG_OSD_MAP: 4369 ceph_osdc_handle_map(osdc, msg); 4370 break; 4371 case CEPH_MSG_OSD_OPREPLY: 4372 handle_reply(osd, msg); 4373 break; 4374 case CEPH_MSG_WATCH_NOTIFY: 4375 handle_watch_notify(osdc, msg); 4376 break; 4377 4378 default: 4379 pr_err("received unknown message type %d %s\n", type, 4380 ceph_msg_type_name(type)); 4381 } 4382 4383 ceph_msg_put(msg); 4384 } 4385 4386 /* 4387 * Lookup and return message for incoming reply. Don't try to do 4388 * anything about a larger than preallocated data portion of the 4389 * message at the moment - for now, just skip the message. 4390 */ 4391 static struct ceph_msg *get_reply(struct ceph_connection *con, 4392 struct ceph_msg_header *hdr, 4393 int *skip) 4394 { 4395 struct ceph_osd *osd = con->private; 4396 struct ceph_osd_client *osdc = osd->o_osdc; 4397 struct ceph_msg *m = NULL; 4398 struct ceph_osd_request *req; 4399 int front_len = le32_to_cpu(hdr->front_len); 4400 int data_len = le32_to_cpu(hdr->data_len); 4401 u64 tid = le64_to_cpu(hdr->tid); 4402 4403 down_read(&osdc->lock); 4404 if (!osd_registered(osd)) { 4405 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd); 4406 *skip = 1; 4407 goto out_unlock_osdc; 4408 } 4409 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num)); 4410 4411 mutex_lock(&osd->lock); 4412 req = lookup_request(&osd->o_requests, tid); 4413 if (!req) { 4414 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 4415 osd->o_osd, tid); 4416 *skip = 1; 4417 goto out_unlock_session; 4418 } 4419 4420 ceph_msg_revoke_incoming(req->r_reply); 4421 4422 if (front_len > req->r_reply->front_alloc_len) { 4423 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 4424 __func__, osd->o_osd, req->r_tid, front_len, 4425 req->r_reply->front_alloc_len); 4426 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 4427 false); 4428 if (!m) 4429 goto out_unlock_session; 4430 ceph_msg_put(req->r_reply); 4431 req->r_reply = m; 4432 } 4433 4434 if (data_len > req->r_reply->data_length) { 4435 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 4436 __func__, osd->o_osd, req->r_tid, data_len, 4437 req->r_reply->data_length); 4438 m = NULL; 4439 *skip = 1; 4440 goto out_unlock_session; 4441 } 4442 4443 m = ceph_msg_get(req->r_reply); 4444 dout("get_reply tid %lld %p\n", tid, m); 4445 4446 out_unlock_session: 4447 mutex_unlock(&osd->lock); 4448 out_unlock_osdc: 4449 up_read(&osdc->lock); 4450 return m; 4451 } 4452 4453 /* 4454 * TODO: switch to a msg-owned pagelist 4455 */ 4456 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) 4457 { 4458 struct ceph_msg *m; 4459 int type = le16_to_cpu(hdr->type); 4460 u32 front_len = le32_to_cpu(hdr->front_len); 4461 u32 data_len = le32_to_cpu(hdr->data_len); 4462 4463 m = ceph_msg_new(type, front_len, GFP_NOIO, false); 4464 if (!m) 4465 return NULL; 4466 4467 if (data_len) { 4468 struct page **pages; 4469 struct ceph_osd_data osd_data; 4470 4471 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len), 4472 GFP_NOIO); 4473 if (IS_ERR(pages)) { 4474 ceph_msg_put(m); 4475 return NULL; 4476 } 4477 4478 ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false, 4479 false); 4480 ceph_osdc_msg_data_add(m, &osd_data); 4481 } 4482 4483 return m; 4484 } 4485 4486 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 4487 struct ceph_msg_header *hdr, 4488 int *skip) 4489 { 4490 struct ceph_osd *osd = con->private; 4491 int type = le16_to_cpu(hdr->type); 4492 4493 *skip = 0; 4494 switch (type) { 4495 case CEPH_MSG_OSD_MAP: 4496 case CEPH_MSG_WATCH_NOTIFY: 4497 return alloc_msg_with_page_vector(hdr); 4498 case CEPH_MSG_OSD_OPREPLY: 4499 return get_reply(con, hdr, skip); 4500 default: 4501 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__, 4502 osd->o_osd, type); 4503 *skip = 1; 4504 return NULL; 4505 } 4506 } 4507 4508 /* 4509 * Wrappers to refcount containing ceph_osd struct 4510 */ 4511 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 4512 { 4513 struct ceph_osd *osd = con->private; 4514 if (get_osd(osd)) 4515 return con; 4516 return NULL; 4517 } 4518 4519 static void put_osd_con(struct ceph_connection *con) 4520 { 4521 struct ceph_osd *osd = con->private; 4522 put_osd(osd); 4523 } 4524 4525 /* 4526 * authentication 4527 */ 4528 /* 4529 * Note: returned pointer is the address of a structure that's 4530 * managed separately. Caller must *not* attempt to free it. 4531 */ 4532 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 4533 int *proto, int force_new) 4534 { 4535 struct ceph_osd *o = con->private; 4536 struct ceph_osd_client *osdc = o->o_osdc; 4537 struct ceph_auth_client *ac = osdc->client->monc.auth; 4538 struct ceph_auth_handshake *auth = &o->o_auth; 4539 4540 if (force_new && auth->authorizer) { 4541 ceph_auth_destroy_authorizer(auth->authorizer); 4542 auth->authorizer = NULL; 4543 } 4544 if (!auth->authorizer) { 4545 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 4546 auth); 4547 if (ret) 4548 return ERR_PTR(ret); 4549 } else { 4550 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 4551 auth); 4552 if (ret) 4553 return ERR_PTR(ret); 4554 } 4555 *proto = ac->protocol; 4556 4557 return auth; 4558 } 4559 4560 4561 static int verify_authorizer_reply(struct ceph_connection *con) 4562 { 4563 struct ceph_osd *o = con->private; 4564 struct ceph_osd_client *osdc = o->o_osdc; 4565 struct ceph_auth_client *ac = osdc->client->monc.auth; 4566 4567 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer); 4568 } 4569 4570 static int invalidate_authorizer(struct ceph_connection *con) 4571 { 4572 struct ceph_osd *o = con->private; 4573 struct ceph_osd_client *osdc = o->o_osdc; 4574 struct ceph_auth_client *ac = osdc->client->monc.auth; 4575 4576 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 4577 return ceph_monc_validate_auth(&osdc->client->monc); 4578 } 4579 4580 static int osd_sign_message(struct ceph_msg *msg) 4581 { 4582 struct ceph_osd *o = msg->con->private; 4583 struct ceph_auth_handshake *auth = &o->o_auth; 4584 4585 return ceph_auth_sign_message(auth, msg); 4586 } 4587 4588 static int osd_check_message_signature(struct ceph_msg *msg) 4589 { 4590 struct ceph_osd *o = msg->con->private; 4591 struct ceph_auth_handshake *auth = &o->o_auth; 4592 4593 return ceph_auth_check_message_signature(auth, msg); 4594 } 4595 4596 static const struct ceph_connection_operations osd_con_ops = { 4597 .get = get_osd_con, 4598 .put = put_osd_con, 4599 .dispatch = dispatch, 4600 .get_authorizer = get_authorizer, 4601 .verify_authorizer_reply = verify_authorizer_reply, 4602 .invalidate_authorizer = invalidate_authorizer, 4603 .alloc_msg = alloc_msg, 4604 .sign_message = osd_sign_message, 4605 .check_message_signature = osd_check_message_signature, 4606 .fault = osd_fault, 4607 }; 4608