1 2 #include <linux/ceph/ceph_debug.h> 3 4 #include <linux/module.h> 5 #include <linux/err.h> 6 #include <linux/highmem.h> 7 #include <linux/mm.h> 8 #include <linux/pagemap.h> 9 #include <linux/slab.h> 10 #include <linux/uaccess.h> 11 #ifdef CONFIG_BLOCK 12 #include <linux/bio.h> 13 #endif 14 15 #include <linux/ceph/ceph_features.h> 16 #include <linux/ceph/libceph.h> 17 #include <linux/ceph/osd_client.h> 18 #include <linux/ceph/messenger.h> 19 #include <linux/ceph/decode.h> 20 #include <linux/ceph/auth.h> 21 #include <linux/ceph/pagelist.h> 22 23 #define OSD_OPREPLY_FRONT_LEN 512 24 25 static struct kmem_cache *ceph_osd_request_cache; 26 27 static const struct ceph_connection_operations osd_con_ops; 28 29 /* 30 * Implement client access to distributed object storage cluster. 31 * 32 * All data objects are stored within a cluster/cloud of OSDs, or 33 * "object storage devices." (Note that Ceph OSDs have _nothing_ to 34 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply 35 * remote daemons serving up and coordinating consistent and safe 36 * access to storage. 37 * 38 * Cluster membership and the mapping of data objects onto storage devices 39 * are described by the osd map. 40 * 41 * We keep track of pending OSD requests (read, write), resubmit 42 * requests to different OSDs when the cluster topology/data layout 43 * change, or retry the affected requests when the communications 44 * channel with an OSD is reset. 45 */ 46 47 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req); 48 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req); 49 static void link_linger(struct ceph_osd *osd, 50 struct ceph_osd_linger_request *lreq); 51 static void unlink_linger(struct ceph_osd *osd, 52 struct ceph_osd_linger_request *lreq); 53 static void clear_backoffs(struct ceph_osd *osd); 54 55 #if 1 56 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem) 57 { 58 bool wrlocked = true; 59 60 if (unlikely(down_read_trylock(sem))) { 61 wrlocked = false; 62 up_read(sem); 63 } 64 65 return wrlocked; 66 } 67 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) 68 { 69 WARN_ON(!rwsem_is_locked(&osdc->lock)); 70 } 71 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) 72 { 73 WARN_ON(!rwsem_is_wrlocked(&osdc->lock)); 74 } 75 static inline void verify_osd_locked(struct ceph_osd *osd) 76 { 77 struct ceph_osd_client *osdc = osd->o_osdc; 78 79 WARN_ON(!(mutex_is_locked(&osd->lock) && 80 rwsem_is_locked(&osdc->lock)) && 81 !rwsem_is_wrlocked(&osdc->lock)); 82 } 83 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) 84 { 85 WARN_ON(!mutex_is_locked(&lreq->lock)); 86 } 87 #else 88 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { } 89 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { } 90 static inline void verify_osd_locked(struct ceph_osd *osd) { } 91 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { } 92 #endif 93 94 /* 95 * calculate the mapping of a file extent onto an object, and fill out the 96 * request accordingly. shorten extent as necessary if it crosses an 97 * object boundary. 98 * 99 * fill osd op in request message. 100 */ 101 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen, 102 u64 *objnum, u64 *objoff, u64 *objlen) 103 { 104 u64 orig_len = *plen; 105 int r; 106 107 /* object extent? */ 108 r = ceph_calc_file_object_mapping(layout, off, orig_len, objnum, 109 objoff, objlen); 110 if (r < 0) 111 return r; 112 if (*objlen < orig_len) { 113 *plen = *objlen; 114 dout(" skipping last %llu, final file extent %llu~%llu\n", 115 orig_len - *plen, off, *plen); 116 } 117 118 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen); 119 120 return 0; 121 } 122 123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data) 124 { 125 memset(osd_data, 0, sizeof (*osd_data)); 126 osd_data->type = CEPH_OSD_DATA_TYPE_NONE; 127 } 128 129 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data, 130 struct page **pages, u64 length, u32 alignment, 131 bool pages_from_pool, bool own_pages) 132 { 133 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES; 134 osd_data->pages = pages; 135 osd_data->length = length; 136 osd_data->alignment = alignment; 137 osd_data->pages_from_pool = pages_from_pool; 138 osd_data->own_pages = own_pages; 139 } 140 141 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data, 142 struct ceph_pagelist *pagelist) 143 { 144 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST; 145 osd_data->pagelist = pagelist; 146 } 147 148 #ifdef CONFIG_BLOCK 149 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, 150 struct bio *bio, size_t bio_length) 151 { 152 osd_data->type = CEPH_OSD_DATA_TYPE_BIO; 153 osd_data->bio = bio; 154 osd_data->bio_length = bio_length; 155 } 156 #endif /* CONFIG_BLOCK */ 157 158 #define osd_req_op_data(oreq, whch, typ, fld) \ 159 ({ \ 160 struct ceph_osd_request *__oreq = (oreq); \ 161 unsigned int __whch = (whch); \ 162 BUG_ON(__whch >= __oreq->r_num_ops); \ 163 &__oreq->r_ops[__whch].typ.fld; \ 164 }) 165 166 static struct ceph_osd_data * 167 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) 168 { 169 BUG_ON(which >= osd_req->r_num_ops); 170 171 return &osd_req->r_ops[which].raw_data_in; 172 } 173 174 struct ceph_osd_data * 175 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req, 176 unsigned int which) 177 { 178 return osd_req_op_data(osd_req, which, extent, osd_data); 179 } 180 EXPORT_SYMBOL(osd_req_op_extent_osd_data); 181 182 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req, 183 unsigned int which, struct page **pages, 184 u64 length, u32 alignment, 185 bool pages_from_pool, bool own_pages) 186 { 187 struct ceph_osd_data *osd_data; 188 189 osd_data = osd_req_op_raw_data_in(osd_req, which); 190 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 191 pages_from_pool, own_pages); 192 } 193 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages); 194 195 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req, 196 unsigned int which, struct page **pages, 197 u64 length, u32 alignment, 198 bool pages_from_pool, bool own_pages) 199 { 200 struct ceph_osd_data *osd_data; 201 202 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 203 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 204 pages_from_pool, own_pages); 205 } 206 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages); 207 208 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req, 209 unsigned int which, struct ceph_pagelist *pagelist) 210 { 211 struct ceph_osd_data *osd_data; 212 213 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 214 ceph_osd_data_pagelist_init(osd_data, pagelist); 215 } 216 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist); 217 218 #ifdef CONFIG_BLOCK 219 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req, 220 unsigned int which, struct bio *bio, size_t bio_length) 221 { 222 struct ceph_osd_data *osd_data; 223 224 osd_data = osd_req_op_data(osd_req, which, extent, osd_data); 225 ceph_osd_data_bio_init(osd_data, bio, bio_length); 226 } 227 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio); 228 #endif /* CONFIG_BLOCK */ 229 230 static void osd_req_op_cls_request_info_pagelist( 231 struct ceph_osd_request *osd_req, 232 unsigned int which, struct ceph_pagelist *pagelist) 233 { 234 struct ceph_osd_data *osd_data; 235 236 osd_data = osd_req_op_data(osd_req, which, cls, request_info); 237 ceph_osd_data_pagelist_init(osd_data, pagelist); 238 } 239 240 void osd_req_op_cls_request_data_pagelist( 241 struct ceph_osd_request *osd_req, 242 unsigned int which, struct ceph_pagelist *pagelist) 243 { 244 struct ceph_osd_data *osd_data; 245 246 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 247 ceph_osd_data_pagelist_init(osd_data, pagelist); 248 osd_req->r_ops[which].cls.indata_len += pagelist->length; 249 osd_req->r_ops[which].indata_len += pagelist->length; 250 } 251 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist); 252 253 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req, 254 unsigned int which, struct page **pages, u64 length, 255 u32 alignment, bool pages_from_pool, bool own_pages) 256 { 257 struct ceph_osd_data *osd_data; 258 259 osd_data = osd_req_op_data(osd_req, which, cls, request_data); 260 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 261 pages_from_pool, own_pages); 262 osd_req->r_ops[which].cls.indata_len += length; 263 osd_req->r_ops[which].indata_len += length; 264 } 265 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages); 266 267 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req, 268 unsigned int which, struct page **pages, u64 length, 269 u32 alignment, bool pages_from_pool, bool own_pages) 270 { 271 struct ceph_osd_data *osd_data; 272 273 osd_data = osd_req_op_data(osd_req, which, cls, response_data); 274 ceph_osd_data_pages_init(osd_data, pages, length, alignment, 275 pages_from_pool, own_pages); 276 } 277 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages); 278 279 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data) 280 { 281 switch (osd_data->type) { 282 case CEPH_OSD_DATA_TYPE_NONE: 283 return 0; 284 case CEPH_OSD_DATA_TYPE_PAGES: 285 return osd_data->length; 286 case CEPH_OSD_DATA_TYPE_PAGELIST: 287 return (u64)osd_data->pagelist->length; 288 #ifdef CONFIG_BLOCK 289 case CEPH_OSD_DATA_TYPE_BIO: 290 return (u64)osd_data->bio_length; 291 #endif /* CONFIG_BLOCK */ 292 default: 293 WARN(true, "unrecognized data type %d\n", (int)osd_data->type); 294 return 0; 295 } 296 } 297 298 static void ceph_osd_data_release(struct ceph_osd_data *osd_data) 299 { 300 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) { 301 int num_pages; 302 303 num_pages = calc_pages_for((u64)osd_data->alignment, 304 (u64)osd_data->length); 305 ceph_release_page_vector(osd_data->pages, num_pages); 306 } 307 ceph_osd_data_init(osd_data); 308 } 309 310 static void osd_req_op_data_release(struct ceph_osd_request *osd_req, 311 unsigned int which) 312 { 313 struct ceph_osd_req_op *op; 314 315 BUG_ON(which >= osd_req->r_num_ops); 316 op = &osd_req->r_ops[which]; 317 318 switch (op->op) { 319 case CEPH_OSD_OP_READ: 320 case CEPH_OSD_OP_WRITE: 321 case CEPH_OSD_OP_WRITEFULL: 322 ceph_osd_data_release(&op->extent.osd_data); 323 break; 324 case CEPH_OSD_OP_CALL: 325 ceph_osd_data_release(&op->cls.request_info); 326 ceph_osd_data_release(&op->cls.request_data); 327 ceph_osd_data_release(&op->cls.response_data); 328 break; 329 case CEPH_OSD_OP_SETXATTR: 330 case CEPH_OSD_OP_CMPXATTR: 331 ceph_osd_data_release(&op->xattr.osd_data); 332 break; 333 case CEPH_OSD_OP_STAT: 334 ceph_osd_data_release(&op->raw_data_in); 335 break; 336 case CEPH_OSD_OP_NOTIFY_ACK: 337 ceph_osd_data_release(&op->notify_ack.request_data); 338 break; 339 case CEPH_OSD_OP_NOTIFY: 340 ceph_osd_data_release(&op->notify.request_data); 341 ceph_osd_data_release(&op->notify.response_data); 342 break; 343 case CEPH_OSD_OP_LIST_WATCHERS: 344 ceph_osd_data_release(&op->list_watchers.response_data); 345 break; 346 default: 347 break; 348 } 349 } 350 351 /* 352 * Assumes @t is zero-initialized. 353 */ 354 static void target_init(struct ceph_osd_request_target *t) 355 { 356 ceph_oid_init(&t->base_oid); 357 ceph_oloc_init(&t->base_oloc); 358 ceph_oid_init(&t->target_oid); 359 ceph_oloc_init(&t->target_oloc); 360 361 ceph_osds_init(&t->acting); 362 ceph_osds_init(&t->up); 363 t->size = -1; 364 t->min_size = -1; 365 366 t->osd = CEPH_HOMELESS_OSD; 367 } 368 369 static void target_copy(struct ceph_osd_request_target *dest, 370 const struct ceph_osd_request_target *src) 371 { 372 ceph_oid_copy(&dest->base_oid, &src->base_oid); 373 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc); 374 ceph_oid_copy(&dest->target_oid, &src->target_oid); 375 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc); 376 377 dest->pgid = src->pgid; /* struct */ 378 dest->spgid = src->spgid; /* struct */ 379 dest->pg_num = src->pg_num; 380 dest->pg_num_mask = src->pg_num_mask; 381 ceph_osds_copy(&dest->acting, &src->acting); 382 ceph_osds_copy(&dest->up, &src->up); 383 dest->size = src->size; 384 dest->min_size = src->min_size; 385 dest->sort_bitwise = src->sort_bitwise; 386 387 dest->flags = src->flags; 388 dest->paused = src->paused; 389 390 dest->epoch = src->epoch; 391 dest->last_force_resend = src->last_force_resend; 392 393 dest->osd = src->osd; 394 } 395 396 static void target_destroy(struct ceph_osd_request_target *t) 397 { 398 ceph_oid_destroy(&t->base_oid); 399 ceph_oloc_destroy(&t->base_oloc); 400 ceph_oid_destroy(&t->target_oid); 401 ceph_oloc_destroy(&t->target_oloc); 402 } 403 404 /* 405 * requests 406 */ 407 static void request_release_checks(struct ceph_osd_request *req) 408 { 409 WARN_ON(!RB_EMPTY_NODE(&req->r_node)); 410 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node)); 411 WARN_ON(!list_empty(&req->r_unsafe_item)); 412 WARN_ON(req->r_osd); 413 } 414 415 static void ceph_osdc_release_request(struct kref *kref) 416 { 417 struct ceph_osd_request *req = container_of(kref, 418 struct ceph_osd_request, r_kref); 419 unsigned int which; 420 421 dout("%s %p (r_request %p r_reply %p)\n", __func__, req, 422 req->r_request, req->r_reply); 423 request_release_checks(req); 424 425 if (req->r_request) 426 ceph_msg_put(req->r_request); 427 if (req->r_reply) 428 ceph_msg_put(req->r_reply); 429 430 for (which = 0; which < req->r_num_ops; which++) 431 osd_req_op_data_release(req, which); 432 433 target_destroy(&req->r_t); 434 ceph_put_snap_context(req->r_snapc); 435 436 if (req->r_mempool) 437 mempool_free(req, req->r_osdc->req_mempool); 438 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) 439 kmem_cache_free(ceph_osd_request_cache, req); 440 else 441 kfree(req); 442 } 443 444 void ceph_osdc_get_request(struct ceph_osd_request *req) 445 { 446 dout("%s %p (was %d)\n", __func__, req, 447 kref_read(&req->r_kref)); 448 kref_get(&req->r_kref); 449 } 450 EXPORT_SYMBOL(ceph_osdc_get_request); 451 452 void ceph_osdc_put_request(struct ceph_osd_request *req) 453 { 454 if (req) { 455 dout("%s %p (was %d)\n", __func__, req, 456 kref_read(&req->r_kref)); 457 kref_put(&req->r_kref, ceph_osdc_release_request); 458 } 459 } 460 EXPORT_SYMBOL(ceph_osdc_put_request); 461 462 static void request_init(struct ceph_osd_request *req) 463 { 464 /* req only, each op is zeroed in _osd_req_op_init() */ 465 memset(req, 0, sizeof(*req)); 466 467 kref_init(&req->r_kref); 468 init_completion(&req->r_completion); 469 RB_CLEAR_NODE(&req->r_node); 470 RB_CLEAR_NODE(&req->r_mc_node); 471 INIT_LIST_HEAD(&req->r_unsafe_item); 472 473 target_init(&req->r_t); 474 } 475 476 /* 477 * This is ugly, but it allows us to reuse linger registration and ping 478 * requests, keeping the structure of the code around send_linger{_ping}() 479 * reasonable. Setting up a min_nr=2 mempool for each linger request 480 * and dealing with copying ops (this blasts req only, watch op remains 481 * intact) isn't any better. 482 */ 483 static void request_reinit(struct ceph_osd_request *req) 484 { 485 struct ceph_osd_client *osdc = req->r_osdc; 486 bool mempool = req->r_mempool; 487 unsigned int num_ops = req->r_num_ops; 488 u64 snapid = req->r_snapid; 489 struct ceph_snap_context *snapc = req->r_snapc; 490 bool linger = req->r_linger; 491 struct ceph_msg *request_msg = req->r_request; 492 struct ceph_msg *reply_msg = req->r_reply; 493 494 dout("%s req %p\n", __func__, req); 495 WARN_ON(kref_read(&req->r_kref) != 1); 496 request_release_checks(req); 497 498 WARN_ON(kref_read(&request_msg->kref) != 1); 499 WARN_ON(kref_read(&reply_msg->kref) != 1); 500 target_destroy(&req->r_t); 501 502 request_init(req); 503 req->r_osdc = osdc; 504 req->r_mempool = mempool; 505 req->r_num_ops = num_ops; 506 req->r_snapid = snapid; 507 req->r_snapc = snapc; 508 req->r_linger = linger; 509 req->r_request = request_msg; 510 req->r_reply = reply_msg; 511 } 512 513 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 514 struct ceph_snap_context *snapc, 515 unsigned int num_ops, 516 bool use_mempool, 517 gfp_t gfp_flags) 518 { 519 struct ceph_osd_request *req; 520 521 if (use_mempool) { 522 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); 523 req = mempool_alloc(osdc->req_mempool, gfp_flags); 524 } else if (num_ops <= CEPH_OSD_SLAB_OPS) { 525 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); 526 } else { 527 BUG_ON(num_ops > CEPH_OSD_MAX_OPS); 528 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]), 529 gfp_flags); 530 } 531 if (unlikely(!req)) 532 return NULL; 533 534 request_init(req); 535 req->r_osdc = osdc; 536 req->r_mempool = use_mempool; 537 req->r_num_ops = num_ops; 538 req->r_snapid = CEPH_NOSNAP; 539 req->r_snapc = ceph_get_snap_context(snapc); 540 541 dout("%s req %p\n", __func__, req); 542 return req; 543 } 544 EXPORT_SYMBOL(ceph_osdc_alloc_request); 545 546 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc) 547 { 548 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0); 549 } 550 551 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp) 552 { 553 struct ceph_osd_client *osdc = req->r_osdc; 554 struct ceph_msg *msg; 555 int msg_size; 556 557 WARN_ON(ceph_oid_empty(&req->r_base_oid)); 558 WARN_ON(ceph_oloc_empty(&req->r_base_oloc)); 559 560 /* create request message */ 561 msg_size = CEPH_ENCODING_START_BLK_LEN + 562 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 563 msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */ 564 msg_size += CEPH_ENCODING_START_BLK_LEN + 565 sizeof(struct ceph_osd_reqid); /* reqid */ 566 msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */ 567 msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */ 568 msg_size += CEPH_ENCODING_START_BLK_LEN + 569 ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */ 570 msg_size += 4 + req->r_base_oid.name_len; /* oid */ 571 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op); 572 msg_size += 8; /* snapid */ 573 msg_size += 8; /* snap_seq */ 574 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0); 575 msg_size += 4 + 8; /* retry_attempt, features */ 576 577 if (req->r_mempool) 578 msg = ceph_msgpool_get(&osdc->msgpool_op, 0); 579 else 580 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true); 581 if (!msg) 582 return -ENOMEM; 583 584 memset(msg->front.iov_base, 0, msg->front.iov_len); 585 req->r_request = msg; 586 587 /* create reply message */ 588 msg_size = OSD_OPREPLY_FRONT_LEN; 589 msg_size += req->r_base_oid.name_len; 590 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op); 591 592 if (req->r_mempool) 593 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); 594 else 595 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true); 596 if (!msg) 597 return -ENOMEM; 598 599 req->r_reply = msg; 600 601 return 0; 602 } 603 EXPORT_SYMBOL(ceph_osdc_alloc_messages); 604 605 static bool osd_req_opcode_valid(u16 opcode) 606 { 607 switch (opcode) { 608 #define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true; 609 __CEPH_FORALL_OSD_OPS(GENERATE_CASE) 610 #undef GENERATE_CASE 611 default: 612 return false; 613 } 614 } 615 616 /* 617 * This is an osd op init function for opcodes that have no data or 618 * other information associated with them. It also serves as a 619 * common init routine for all the other init functions, below. 620 */ 621 static struct ceph_osd_req_op * 622 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, 623 u16 opcode, u32 flags) 624 { 625 struct ceph_osd_req_op *op; 626 627 BUG_ON(which >= osd_req->r_num_ops); 628 BUG_ON(!osd_req_opcode_valid(opcode)); 629 630 op = &osd_req->r_ops[which]; 631 memset(op, 0, sizeof (*op)); 632 op->op = opcode; 633 op->flags = flags; 634 635 return op; 636 } 637 638 void osd_req_op_init(struct ceph_osd_request *osd_req, 639 unsigned int which, u16 opcode, u32 flags) 640 { 641 (void)_osd_req_op_init(osd_req, which, opcode, flags); 642 } 643 EXPORT_SYMBOL(osd_req_op_init); 644 645 void osd_req_op_extent_init(struct ceph_osd_request *osd_req, 646 unsigned int which, u16 opcode, 647 u64 offset, u64 length, 648 u64 truncate_size, u32 truncate_seq) 649 { 650 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 651 opcode, 0); 652 size_t payload_len = 0; 653 654 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 655 opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && 656 opcode != CEPH_OSD_OP_TRUNCATE); 657 658 op->extent.offset = offset; 659 op->extent.length = length; 660 op->extent.truncate_size = truncate_size; 661 op->extent.truncate_seq = truncate_seq; 662 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) 663 payload_len += length; 664 665 op->indata_len = payload_len; 666 } 667 EXPORT_SYMBOL(osd_req_op_extent_init); 668 669 void osd_req_op_extent_update(struct ceph_osd_request *osd_req, 670 unsigned int which, u64 length) 671 { 672 struct ceph_osd_req_op *op; 673 u64 previous; 674 675 BUG_ON(which >= osd_req->r_num_ops); 676 op = &osd_req->r_ops[which]; 677 previous = op->extent.length; 678 679 if (length == previous) 680 return; /* Nothing to do */ 681 BUG_ON(length > previous); 682 683 op->extent.length = length; 684 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 685 op->indata_len -= previous - length; 686 } 687 EXPORT_SYMBOL(osd_req_op_extent_update); 688 689 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, 690 unsigned int which, u64 offset_inc) 691 { 692 struct ceph_osd_req_op *op, *prev_op; 693 694 BUG_ON(which + 1 >= osd_req->r_num_ops); 695 696 prev_op = &osd_req->r_ops[which]; 697 op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags); 698 /* dup previous one */ 699 op->indata_len = prev_op->indata_len; 700 op->outdata_len = prev_op->outdata_len; 701 op->extent = prev_op->extent; 702 /* adjust offset */ 703 op->extent.offset += offset_inc; 704 op->extent.length -= offset_inc; 705 706 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) 707 op->indata_len -= offset_inc; 708 } 709 EXPORT_SYMBOL(osd_req_op_extent_dup_last); 710 711 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, 712 u16 opcode, const char *class, const char *method) 713 { 714 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 715 opcode, 0); 716 struct ceph_pagelist *pagelist; 717 size_t payload_len = 0; 718 size_t size; 719 720 BUG_ON(opcode != CEPH_OSD_OP_CALL); 721 722 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); 723 BUG_ON(!pagelist); 724 ceph_pagelist_init(pagelist); 725 726 op->cls.class_name = class; 727 size = strlen(class); 728 BUG_ON(size > (size_t) U8_MAX); 729 op->cls.class_len = size; 730 ceph_pagelist_append(pagelist, class, size); 731 payload_len += size; 732 733 op->cls.method_name = method; 734 size = strlen(method); 735 BUG_ON(size > (size_t) U8_MAX); 736 op->cls.method_len = size; 737 ceph_pagelist_append(pagelist, method, size); 738 payload_len += size; 739 740 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); 741 742 op->indata_len = payload_len; 743 } 744 EXPORT_SYMBOL(osd_req_op_cls_init); 745 746 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, 747 u16 opcode, const char *name, const void *value, 748 size_t size, u8 cmp_op, u8 cmp_mode) 749 { 750 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 751 opcode, 0); 752 struct ceph_pagelist *pagelist; 753 size_t payload_len; 754 755 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR); 756 757 pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); 758 if (!pagelist) 759 return -ENOMEM; 760 761 ceph_pagelist_init(pagelist); 762 763 payload_len = strlen(name); 764 op->xattr.name_len = payload_len; 765 ceph_pagelist_append(pagelist, name, payload_len); 766 767 op->xattr.value_len = size; 768 ceph_pagelist_append(pagelist, value, size); 769 payload_len += size; 770 771 op->xattr.cmp_op = cmp_op; 772 op->xattr.cmp_mode = cmp_mode; 773 774 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist); 775 op->indata_len = payload_len; 776 return 0; 777 } 778 EXPORT_SYMBOL(osd_req_op_xattr_init); 779 780 /* 781 * @watch_opcode: CEPH_OSD_WATCH_OP_* 782 */ 783 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, 784 u64 cookie, u8 watch_opcode) 785 { 786 struct ceph_osd_req_op *op; 787 788 op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0); 789 op->watch.cookie = cookie; 790 op->watch.op = watch_opcode; 791 op->watch.gen = 0; 792 } 793 794 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, 795 unsigned int which, 796 u64 expected_object_size, 797 u64 expected_write_size) 798 { 799 struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, 800 CEPH_OSD_OP_SETALLOCHINT, 801 0); 802 803 op->alloc_hint.expected_object_size = expected_object_size; 804 op->alloc_hint.expected_write_size = expected_write_size; 805 806 /* 807 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed 808 * not worth a feature bit. Set FAILOK per-op flag to make 809 * sure older osds don't trip over an unsupported opcode. 810 */ 811 op->flags |= CEPH_OSD_OP_FLAG_FAILOK; 812 } 813 EXPORT_SYMBOL(osd_req_op_alloc_hint_init); 814 815 static void ceph_osdc_msg_data_add(struct ceph_msg *msg, 816 struct ceph_osd_data *osd_data) 817 { 818 u64 length = ceph_osd_data_length(osd_data); 819 820 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { 821 BUG_ON(length > (u64) SIZE_MAX); 822 if (length) 823 ceph_msg_data_add_pages(msg, osd_data->pages, 824 length, osd_data->alignment); 825 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) { 826 BUG_ON(!length); 827 ceph_msg_data_add_pagelist(msg, osd_data->pagelist); 828 #ifdef CONFIG_BLOCK 829 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) { 830 ceph_msg_data_add_bio(msg, osd_data->bio, length); 831 #endif 832 } else { 833 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE); 834 } 835 } 836 837 static u32 osd_req_encode_op(struct ceph_osd_op *dst, 838 const struct ceph_osd_req_op *src) 839 { 840 if (WARN_ON(!osd_req_opcode_valid(src->op))) { 841 pr_err("unrecognized osd opcode %d\n", src->op); 842 843 return 0; 844 } 845 846 switch (src->op) { 847 case CEPH_OSD_OP_STAT: 848 break; 849 case CEPH_OSD_OP_READ: 850 case CEPH_OSD_OP_WRITE: 851 case CEPH_OSD_OP_WRITEFULL: 852 case CEPH_OSD_OP_ZERO: 853 case CEPH_OSD_OP_TRUNCATE: 854 dst->extent.offset = cpu_to_le64(src->extent.offset); 855 dst->extent.length = cpu_to_le64(src->extent.length); 856 dst->extent.truncate_size = 857 cpu_to_le64(src->extent.truncate_size); 858 dst->extent.truncate_seq = 859 cpu_to_le32(src->extent.truncate_seq); 860 break; 861 case CEPH_OSD_OP_CALL: 862 dst->cls.class_len = src->cls.class_len; 863 dst->cls.method_len = src->cls.method_len; 864 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len); 865 break; 866 case CEPH_OSD_OP_WATCH: 867 dst->watch.cookie = cpu_to_le64(src->watch.cookie); 868 dst->watch.ver = cpu_to_le64(0); 869 dst->watch.op = src->watch.op; 870 dst->watch.gen = cpu_to_le32(src->watch.gen); 871 break; 872 case CEPH_OSD_OP_NOTIFY_ACK: 873 break; 874 case CEPH_OSD_OP_NOTIFY: 875 dst->notify.cookie = cpu_to_le64(src->notify.cookie); 876 break; 877 case CEPH_OSD_OP_LIST_WATCHERS: 878 break; 879 case CEPH_OSD_OP_SETALLOCHINT: 880 dst->alloc_hint.expected_object_size = 881 cpu_to_le64(src->alloc_hint.expected_object_size); 882 dst->alloc_hint.expected_write_size = 883 cpu_to_le64(src->alloc_hint.expected_write_size); 884 break; 885 case CEPH_OSD_OP_SETXATTR: 886 case CEPH_OSD_OP_CMPXATTR: 887 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len); 888 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len); 889 dst->xattr.cmp_op = src->xattr.cmp_op; 890 dst->xattr.cmp_mode = src->xattr.cmp_mode; 891 break; 892 case CEPH_OSD_OP_CREATE: 893 case CEPH_OSD_OP_DELETE: 894 break; 895 default: 896 pr_err("unsupported osd opcode %s\n", 897 ceph_osd_op_name(src->op)); 898 WARN_ON(1); 899 900 return 0; 901 } 902 903 dst->op = cpu_to_le16(src->op); 904 dst->flags = cpu_to_le32(src->flags); 905 dst->payload_len = cpu_to_le32(src->indata_len); 906 907 return src->indata_len; 908 } 909 910 /* 911 * build new request AND message, calculate layout, and adjust file 912 * extent as needed. 913 * 914 * if the file was recently truncated, we include information about its 915 * old and new size so that the object can be updated appropriately. (we 916 * avoid synchronously deleting truncated objects because it's slow.) 917 */ 918 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, 919 struct ceph_file_layout *layout, 920 struct ceph_vino vino, 921 u64 off, u64 *plen, 922 unsigned int which, int num_ops, 923 int opcode, int flags, 924 struct ceph_snap_context *snapc, 925 u32 truncate_seq, 926 u64 truncate_size, 927 bool use_mempool) 928 { 929 struct ceph_osd_request *req; 930 u64 objnum = 0; 931 u64 objoff = 0; 932 u64 objlen = 0; 933 int r; 934 935 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && 936 opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE && 937 opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE); 938 939 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool, 940 GFP_NOFS); 941 if (!req) { 942 r = -ENOMEM; 943 goto fail; 944 } 945 946 /* calculate max write size */ 947 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen); 948 if (r) 949 goto fail; 950 951 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { 952 osd_req_op_init(req, which, opcode, 0); 953 } else { 954 u32 object_size = layout->object_size; 955 u32 object_base = off - objoff; 956 if (!(truncate_seq == 1 && truncate_size == -1ULL)) { 957 if (truncate_size <= object_base) { 958 truncate_size = 0; 959 } else { 960 truncate_size -= object_base; 961 if (truncate_size > object_size) 962 truncate_size = object_size; 963 } 964 } 965 osd_req_op_extent_init(req, which, opcode, objoff, objlen, 966 truncate_size, truncate_seq); 967 } 968 969 req->r_abort_on_full = true; 970 req->r_flags = flags; 971 req->r_base_oloc.pool = layout->pool_id; 972 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); 973 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum); 974 975 req->r_snapid = vino.snap; 976 if (flags & CEPH_OSD_FLAG_WRITE) 977 req->r_data_offset = off; 978 979 r = ceph_osdc_alloc_messages(req, GFP_NOFS); 980 if (r) 981 goto fail; 982 983 return req; 984 985 fail: 986 ceph_osdc_put_request(req); 987 return ERR_PTR(r); 988 } 989 EXPORT_SYMBOL(ceph_osdc_new_request); 990 991 /* 992 * We keep osd requests in an rbtree, sorted by ->r_tid. 993 */ 994 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) 995 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) 996 997 static bool osd_homeless(struct ceph_osd *osd) 998 { 999 return osd->o_osd == CEPH_HOMELESS_OSD; 1000 } 1001 1002 static bool osd_registered(struct ceph_osd *osd) 1003 { 1004 verify_osdc_locked(osd->o_osdc); 1005 1006 return !RB_EMPTY_NODE(&osd->o_node); 1007 } 1008 1009 /* 1010 * Assumes @osd is zero-initialized. 1011 */ 1012 static void osd_init(struct ceph_osd *osd) 1013 { 1014 refcount_set(&osd->o_ref, 1); 1015 RB_CLEAR_NODE(&osd->o_node); 1016 osd->o_requests = RB_ROOT; 1017 osd->o_linger_requests = RB_ROOT; 1018 osd->o_backoff_mappings = RB_ROOT; 1019 osd->o_backoffs_by_id = RB_ROOT; 1020 INIT_LIST_HEAD(&osd->o_osd_lru); 1021 INIT_LIST_HEAD(&osd->o_keepalive_item); 1022 osd->o_incarnation = 1; 1023 mutex_init(&osd->lock); 1024 } 1025 1026 static void osd_cleanup(struct ceph_osd *osd) 1027 { 1028 WARN_ON(!RB_EMPTY_NODE(&osd->o_node)); 1029 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 1030 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 1031 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings)); 1032 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id)); 1033 WARN_ON(!list_empty(&osd->o_osd_lru)); 1034 WARN_ON(!list_empty(&osd->o_keepalive_item)); 1035 1036 if (osd->o_auth.authorizer) { 1037 WARN_ON(osd_homeless(osd)); 1038 ceph_auth_destroy_authorizer(osd->o_auth.authorizer); 1039 } 1040 } 1041 1042 /* 1043 * Track open sessions with osds. 1044 */ 1045 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum) 1046 { 1047 struct ceph_osd *osd; 1048 1049 WARN_ON(onum == CEPH_HOMELESS_OSD); 1050 1051 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL); 1052 osd_init(osd); 1053 osd->o_osdc = osdc; 1054 osd->o_osd = onum; 1055 1056 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr); 1057 1058 return osd; 1059 } 1060 1061 static struct ceph_osd *get_osd(struct ceph_osd *osd) 1062 { 1063 if (refcount_inc_not_zero(&osd->o_ref)) { 1064 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1, 1065 refcount_read(&osd->o_ref)); 1066 return osd; 1067 } else { 1068 dout("get_osd %p FAIL\n", osd); 1069 return NULL; 1070 } 1071 } 1072 1073 static void put_osd(struct ceph_osd *osd) 1074 { 1075 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref), 1076 refcount_read(&osd->o_ref) - 1); 1077 if (refcount_dec_and_test(&osd->o_ref)) { 1078 osd_cleanup(osd); 1079 kfree(osd); 1080 } 1081 } 1082 1083 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node) 1084 1085 static void __move_osd_to_lru(struct ceph_osd *osd) 1086 { 1087 struct ceph_osd_client *osdc = osd->o_osdc; 1088 1089 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1090 BUG_ON(!list_empty(&osd->o_osd_lru)); 1091 1092 spin_lock(&osdc->osd_lru_lock); 1093 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); 1094 spin_unlock(&osdc->osd_lru_lock); 1095 1096 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; 1097 } 1098 1099 static void maybe_move_osd_to_lru(struct ceph_osd *osd) 1100 { 1101 if (RB_EMPTY_ROOT(&osd->o_requests) && 1102 RB_EMPTY_ROOT(&osd->o_linger_requests)) 1103 __move_osd_to_lru(osd); 1104 } 1105 1106 static void __remove_osd_from_lru(struct ceph_osd *osd) 1107 { 1108 struct ceph_osd_client *osdc = osd->o_osdc; 1109 1110 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1111 1112 spin_lock(&osdc->osd_lru_lock); 1113 if (!list_empty(&osd->o_osd_lru)) 1114 list_del_init(&osd->o_osd_lru); 1115 spin_unlock(&osdc->osd_lru_lock); 1116 } 1117 1118 /* 1119 * Close the connection and assign any leftover requests to the 1120 * homeless session. 1121 */ 1122 static void close_osd(struct ceph_osd *osd) 1123 { 1124 struct ceph_osd_client *osdc = osd->o_osdc; 1125 struct rb_node *n; 1126 1127 verify_osdc_wrlocked(osdc); 1128 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1129 1130 ceph_con_close(&osd->o_con); 1131 1132 for (n = rb_first(&osd->o_requests); n; ) { 1133 struct ceph_osd_request *req = 1134 rb_entry(n, struct ceph_osd_request, r_node); 1135 1136 n = rb_next(n); /* unlink_request() */ 1137 1138 dout(" reassigning req %p tid %llu\n", req, req->r_tid); 1139 unlink_request(osd, req); 1140 link_request(&osdc->homeless_osd, req); 1141 } 1142 for (n = rb_first(&osd->o_linger_requests); n; ) { 1143 struct ceph_osd_linger_request *lreq = 1144 rb_entry(n, struct ceph_osd_linger_request, node); 1145 1146 n = rb_next(n); /* unlink_linger() */ 1147 1148 dout(" reassigning lreq %p linger_id %llu\n", lreq, 1149 lreq->linger_id); 1150 unlink_linger(osd, lreq); 1151 link_linger(&osdc->homeless_osd, lreq); 1152 } 1153 clear_backoffs(osd); 1154 1155 __remove_osd_from_lru(osd); 1156 erase_osd(&osdc->osds, osd); 1157 put_osd(osd); 1158 } 1159 1160 /* 1161 * reset osd connect 1162 */ 1163 static int reopen_osd(struct ceph_osd *osd) 1164 { 1165 struct ceph_entity_addr *peer_addr; 1166 1167 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 1168 1169 if (RB_EMPTY_ROOT(&osd->o_requests) && 1170 RB_EMPTY_ROOT(&osd->o_linger_requests)) { 1171 close_osd(osd); 1172 return -ENODEV; 1173 } 1174 1175 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd]; 1176 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) && 1177 !ceph_con_opened(&osd->o_con)) { 1178 struct rb_node *n; 1179 1180 dout("osd addr hasn't changed and connection never opened, " 1181 "letting msgr retry\n"); 1182 /* touch each r_stamp for handle_timeout()'s benfit */ 1183 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 1184 struct ceph_osd_request *req = 1185 rb_entry(n, struct ceph_osd_request, r_node); 1186 req->r_stamp = jiffies; 1187 } 1188 1189 return -EAGAIN; 1190 } 1191 1192 ceph_con_close(&osd->o_con); 1193 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr); 1194 osd->o_incarnation++; 1195 1196 return 0; 1197 } 1198 1199 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o, 1200 bool wrlocked) 1201 { 1202 struct ceph_osd *osd; 1203 1204 if (wrlocked) 1205 verify_osdc_wrlocked(osdc); 1206 else 1207 verify_osdc_locked(osdc); 1208 1209 if (o != CEPH_HOMELESS_OSD) 1210 osd = lookup_osd(&osdc->osds, o); 1211 else 1212 osd = &osdc->homeless_osd; 1213 if (!osd) { 1214 if (!wrlocked) 1215 return ERR_PTR(-EAGAIN); 1216 1217 osd = create_osd(osdc, o); 1218 insert_osd(&osdc->osds, osd); 1219 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, 1220 &osdc->osdmap->osd_addr[osd->o_osd]); 1221 } 1222 1223 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd); 1224 return osd; 1225 } 1226 1227 /* 1228 * Create request <-> OSD session relation. 1229 * 1230 * @req has to be assigned a tid, @osd may be homeless. 1231 */ 1232 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1233 { 1234 verify_osd_locked(osd); 1235 WARN_ON(!req->r_tid || req->r_osd); 1236 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1237 req, req->r_tid); 1238 1239 if (!osd_homeless(osd)) 1240 __remove_osd_from_lru(osd); 1241 else 1242 atomic_inc(&osd->o_osdc->num_homeless); 1243 1244 get_osd(osd); 1245 insert_request(&osd->o_requests, req); 1246 req->r_osd = osd; 1247 } 1248 1249 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req) 1250 { 1251 verify_osd_locked(osd); 1252 WARN_ON(req->r_osd != osd); 1253 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd, 1254 req, req->r_tid); 1255 1256 req->r_osd = NULL; 1257 erase_request(&osd->o_requests, req); 1258 put_osd(osd); 1259 1260 if (!osd_homeless(osd)) 1261 maybe_move_osd_to_lru(osd); 1262 else 1263 atomic_dec(&osd->o_osdc->num_homeless); 1264 } 1265 1266 static bool __pool_full(struct ceph_pg_pool_info *pi) 1267 { 1268 return pi->flags & CEPH_POOL_FLAG_FULL; 1269 } 1270 1271 static bool have_pool_full(struct ceph_osd_client *osdc) 1272 { 1273 struct rb_node *n; 1274 1275 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 1276 struct ceph_pg_pool_info *pi = 1277 rb_entry(n, struct ceph_pg_pool_info, node); 1278 1279 if (__pool_full(pi)) 1280 return true; 1281 } 1282 1283 return false; 1284 } 1285 1286 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id) 1287 { 1288 struct ceph_pg_pool_info *pi; 1289 1290 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 1291 if (!pi) 1292 return false; 1293 1294 return __pool_full(pi); 1295 } 1296 1297 /* 1298 * Returns whether a request should be blocked from being sent 1299 * based on the current osdmap and osd_client settings. 1300 */ 1301 static bool target_should_be_paused(struct ceph_osd_client *osdc, 1302 const struct ceph_osd_request_target *t, 1303 struct ceph_pg_pool_info *pi) 1304 { 1305 bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 1306 bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 1307 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 1308 __pool_full(pi); 1309 1310 WARN_ON(pi->id != t->target_oloc.pool); 1311 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || 1312 ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || 1313 (osdc->osdmap->epoch < osdc->epoch_barrier); 1314 } 1315 1316 enum calc_target_result { 1317 CALC_TARGET_NO_ACTION = 0, 1318 CALC_TARGET_NEED_RESEND, 1319 CALC_TARGET_POOL_DNE, 1320 }; 1321 1322 static enum calc_target_result calc_target(struct ceph_osd_client *osdc, 1323 struct ceph_osd_request_target *t, 1324 struct ceph_connection *con, 1325 bool any_change) 1326 { 1327 struct ceph_pg_pool_info *pi; 1328 struct ceph_pg pgid, last_pgid; 1329 struct ceph_osds up, acting; 1330 bool force_resend = false; 1331 bool unpaused = false; 1332 bool legacy_change; 1333 bool split = false; 1334 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE); 1335 bool recovery_deletes = ceph_osdmap_flag(osdc, 1336 CEPH_OSDMAP_RECOVERY_DELETES); 1337 enum calc_target_result ct_res; 1338 int ret; 1339 1340 t->epoch = osdc->osdmap->epoch; 1341 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); 1342 if (!pi) { 1343 t->osd = CEPH_HOMELESS_OSD; 1344 ct_res = CALC_TARGET_POOL_DNE; 1345 goto out; 1346 } 1347 1348 if (osdc->osdmap->epoch == pi->last_force_request_resend) { 1349 if (t->last_force_resend < pi->last_force_request_resend) { 1350 t->last_force_resend = pi->last_force_request_resend; 1351 force_resend = true; 1352 } else if (t->last_force_resend == 0) { 1353 force_resend = true; 1354 } 1355 } 1356 1357 /* apply tiering */ 1358 ceph_oid_copy(&t->target_oid, &t->base_oid); 1359 ceph_oloc_copy(&t->target_oloc, &t->base_oloc); 1360 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { 1361 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0) 1362 t->target_oloc.pool = pi->read_tier; 1363 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0) 1364 t->target_oloc.pool = pi->write_tier; 1365 1366 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool); 1367 if (!pi) { 1368 t->osd = CEPH_HOMELESS_OSD; 1369 ct_res = CALC_TARGET_POOL_DNE; 1370 goto out; 1371 } 1372 } 1373 1374 ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, 1375 &pgid); 1376 if (ret) { 1377 WARN_ON(ret != -ENOENT); 1378 t->osd = CEPH_HOMELESS_OSD; 1379 ct_res = CALC_TARGET_POOL_DNE; 1380 goto out; 1381 } 1382 last_pgid.pool = pgid.pool; 1383 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); 1384 1385 ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting); 1386 if (any_change && 1387 ceph_is_new_interval(&t->acting, 1388 &acting, 1389 &t->up, 1390 &up, 1391 t->size, 1392 pi->size, 1393 t->min_size, 1394 pi->min_size, 1395 t->pg_num, 1396 pi->pg_num, 1397 t->sort_bitwise, 1398 sort_bitwise, 1399 t->recovery_deletes, 1400 recovery_deletes, 1401 &last_pgid)) 1402 force_resend = true; 1403 1404 if (t->paused && !target_should_be_paused(osdc, t, pi)) { 1405 t->paused = false; 1406 unpaused = true; 1407 } 1408 legacy_change = ceph_pg_compare(&t->pgid, &pgid) || 1409 ceph_osds_changed(&t->acting, &acting, any_change); 1410 if (t->pg_num) 1411 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num); 1412 1413 if (legacy_change || force_resend || split) { 1414 t->pgid = pgid; /* struct */ 1415 ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid); 1416 ceph_osds_copy(&t->acting, &acting); 1417 ceph_osds_copy(&t->up, &up); 1418 t->size = pi->size; 1419 t->min_size = pi->min_size; 1420 t->pg_num = pi->pg_num; 1421 t->pg_num_mask = pi->pg_num_mask; 1422 t->sort_bitwise = sort_bitwise; 1423 t->recovery_deletes = recovery_deletes; 1424 1425 t->osd = acting.primary; 1426 } 1427 1428 if (unpaused || legacy_change || force_resend || 1429 (split && con && CEPH_HAVE_FEATURE(con->peer_features, 1430 RESEND_ON_SPLIT))) 1431 ct_res = CALC_TARGET_NEED_RESEND; 1432 else 1433 ct_res = CALC_TARGET_NO_ACTION; 1434 1435 out: 1436 dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd); 1437 return ct_res; 1438 } 1439 1440 static struct ceph_spg_mapping *alloc_spg_mapping(void) 1441 { 1442 struct ceph_spg_mapping *spg; 1443 1444 spg = kmalloc(sizeof(*spg), GFP_NOIO); 1445 if (!spg) 1446 return NULL; 1447 1448 RB_CLEAR_NODE(&spg->node); 1449 spg->backoffs = RB_ROOT; 1450 return spg; 1451 } 1452 1453 static void free_spg_mapping(struct ceph_spg_mapping *spg) 1454 { 1455 WARN_ON(!RB_EMPTY_NODE(&spg->node)); 1456 WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs)); 1457 1458 kfree(spg); 1459 } 1460 1461 /* 1462 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to 1463 * ceph_pg_mapping. Used to track OSD backoffs -- a backoff [range] is 1464 * defined only within a specific spgid; it does not pass anything to 1465 * children on split, or to another primary. 1466 */ 1467 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare, 1468 RB_BYPTR, const struct ceph_spg *, node) 1469 1470 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid) 1471 { 1472 return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits; 1473 } 1474 1475 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid, 1476 void **pkey, size_t *pkey_len) 1477 { 1478 if (hoid->key_len) { 1479 *pkey = hoid->key; 1480 *pkey_len = hoid->key_len; 1481 } else { 1482 *pkey = hoid->oid; 1483 *pkey_len = hoid->oid_len; 1484 } 1485 } 1486 1487 static int compare_names(const void *name1, size_t name1_len, 1488 const void *name2, size_t name2_len) 1489 { 1490 int ret; 1491 1492 ret = memcmp(name1, name2, min(name1_len, name2_len)); 1493 if (!ret) { 1494 if (name1_len < name2_len) 1495 ret = -1; 1496 else if (name1_len > name2_len) 1497 ret = 1; 1498 } 1499 return ret; 1500 } 1501 1502 static int hoid_compare(const struct ceph_hobject_id *lhs, 1503 const struct ceph_hobject_id *rhs) 1504 { 1505 void *effective_key1, *effective_key2; 1506 size_t effective_key1_len, effective_key2_len; 1507 int ret; 1508 1509 if (lhs->is_max < rhs->is_max) 1510 return -1; 1511 if (lhs->is_max > rhs->is_max) 1512 return 1; 1513 1514 if (lhs->pool < rhs->pool) 1515 return -1; 1516 if (lhs->pool > rhs->pool) 1517 return 1; 1518 1519 if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs)) 1520 return -1; 1521 if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs)) 1522 return 1; 1523 1524 ret = compare_names(lhs->nspace, lhs->nspace_len, 1525 rhs->nspace, rhs->nspace_len); 1526 if (ret) 1527 return ret; 1528 1529 hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len); 1530 hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len); 1531 ret = compare_names(effective_key1, effective_key1_len, 1532 effective_key2, effective_key2_len); 1533 if (ret) 1534 return ret; 1535 1536 ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len); 1537 if (ret) 1538 return ret; 1539 1540 if (lhs->snapid < rhs->snapid) 1541 return -1; 1542 if (lhs->snapid > rhs->snapid) 1543 return 1; 1544 1545 return 0; 1546 } 1547 1548 /* 1549 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX 1550 * compat stuff here. 1551 * 1552 * Assumes @hoid is zero-initialized. 1553 */ 1554 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid) 1555 { 1556 u8 struct_v; 1557 u32 struct_len; 1558 int ret; 1559 1560 ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v, 1561 &struct_len); 1562 if (ret) 1563 return ret; 1564 1565 if (struct_v < 4) { 1566 pr_err("got struct_v %d < 4 of hobject_t\n", struct_v); 1567 goto e_inval; 1568 } 1569 1570 hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len, 1571 GFP_NOIO); 1572 if (IS_ERR(hoid->key)) { 1573 ret = PTR_ERR(hoid->key); 1574 hoid->key = NULL; 1575 return ret; 1576 } 1577 1578 hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len, 1579 GFP_NOIO); 1580 if (IS_ERR(hoid->oid)) { 1581 ret = PTR_ERR(hoid->oid); 1582 hoid->oid = NULL; 1583 return ret; 1584 } 1585 1586 ceph_decode_64_safe(p, end, hoid->snapid, e_inval); 1587 ceph_decode_32_safe(p, end, hoid->hash, e_inval); 1588 ceph_decode_8_safe(p, end, hoid->is_max, e_inval); 1589 1590 hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len, 1591 GFP_NOIO); 1592 if (IS_ERR(hoid->nspace)) { 1593 ret = PTR_ERR(hoid->nspace); 1594 hoid->nspace = NULL; 1595 return ret; 1596 } 1597 1598 ceph_decode_64_safe(p, end, hoid->pool, e_inval); 1599 1600 ceph_hoid_build_hash_cache(hoid); 1601 return 0; 1602 1603 e_inval: 1604 return -EINVAL; 1605 } 1606 1607 static int hoid_encoding_size(const struct ceph_hobject_id *hoid) 1608 { 1609 return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */ 1610 4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len; 1611 } 1612 1613 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid) 1614 { 1615 ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid)); 1616 ceph_encode_string(p, end, hoid->key, hoid->key_len); 1617 ceph_encode_string(p, end, hoid->oid, hoid->oid_len); 1618 ceph_encode_64(p, hoid->snapid); 1619 ceph_encode_32(p, hoid->hash); 1620 ceph_encode_8(p, hoid->is_max); 1621 ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len); 1622 ceph_encode_64(p, hoid->pool); 1623 } 1624 1625 static void free_hoid(struct ceph_hobject_id *hoid) 1626 { 1627 if (hoid) { 1628 kfree(hoid->key); 1629 kfree(hoid->oid); 1630 kfree(hoid->nspace); 1631 kfree(hoid); 1632 } 1633 } 1634 1635 static struct ceph_osd_backoff *alloc_backoff(void) 1636 { 1637 struct ceph_osd_backoff *backoff; 1638 1639 backoff = kzalloc(sizeof(*backoff), GFP_NOIO); 1640 if (!backoff) 1641 return NULL; 1642 1643 RB_CLEAR_NODE(&backoff->spg_node); 1644 RB_CLEAR_NODE(&backoff->id_node); 1645 return backoff; 1646 } 1647 1648 static void free_backoff(struct ceph_osd_backoff *backoff) 1649 { 1650 WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node)); 1651 WARN_ON(!RB_EMPTY_NODE(&backoff->id_node)); 1652 1653 free_hoid(backoff->begin); 1654 free_hoid(backoff->end); 1655 kfree(backoff); 1656 } 1657 1658 /* 1659 * Within a specific spgid, backoffs are managed by ->begin hoid. 1660 */ 1661 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare, 1662 RB_BYVAL, spg_node); 1663 1664 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root, 1665 const struct ceph_hobject_id *hoid) 1666 { 1667 struct rb_node *n = root->rb_node; 1668 1669 while (n) { 1670 struct ceph_osd_backoff *cur = 1671 rb_entry(n, struct ceph_osd_backoff, spg_node); 1672 int cmp; 1673 1674 cmp = hoid_compare(hoid, cur->begin); 1675 if (cmp < 0) { 1676 n = n->rb_left; 1677 } else if (cmp > 0) { 1678 if (hoid_compare(hoid, cur->end) < 0) 1679 return cur; 1680 1681 n = n->rb_right; 1682 } else { 1683 return cur; 1684 } 1685 } 1686 1687 return NULL; 1688 } 1689 1690 /* 1691 * Each backoff has a unique id within its OSD session. 1692 */ 1693 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node) 1694 1695 static void clear_backoffs(struct ceph_osd *osd) 1696 { 1697 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) { 1698 struct ceph_spg_mapping *spg = 1699 rb_entry(rb_first(&osd->o_backoff_mappings), 1700 struct ceph_spg_mapping, node); 1701 1702 while (!RB_EMPTY_ROOT(&spg->backoffs)) { 1703 struct ceph_osd_backoff *backoff = 1704 rb_entry(rb_first(&spg->backoffs), 1705 struct ceph_osd_backoff, spg_node); 1706 1707 erase_backoff(&spg->backoffs, backoff); 1708 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 1709 free_backoff(backoff); 1710 } 1711 erase_spg_mapping(&osd->o_backoff_mappings, spg); 1712 free_spg_mapping(spg); 1713 } 1714 } 1715 1716 /* 1717 * Set up a temporary, non-owning view into @t. 1718 */ 1719 static void hoid_fill_from_target(struct ceph_hobject_id *hoid, 1720 const struct ceph_osd_request_target *t) 1721 { 1722 hoid->key = NULL; 1723 hoid->key_len = 0; 1724 hoid->oid = t->target_oid.name; 1725 hoid->oid_len = t->target_oid.name_len; 1726 hoid->snapid = CEPH_NOSNAP; 1727 hoid->hash = t->pgid.seed; 1728 hoid->is_max = false; 1729 if (t->target_oloc.pool_ns) { 1730 hoid->nspace = t->target_oloc.pool_ns->str; 1731 hoid->nspace_len = t->target_oloc.pool_ns->len; 1732 } else { 1733 hoid->nspace = NULL; 1734 hoid->nspace_len = 0; 1735 } 1736 hoid->pool = t->target_oloc.pool; 1737 ceph_hoid_build_hash_cache(hoid); 1738 } 1739 1740 static bool should_plug_request(struct ceph_osd_request *req) 1741 { 1742 struct ceph_osd *osd = req->r_osd; 1743 struct ceph_spg_mapping *spg; 1744 struct ceph_osd_backoff *backoff; 1745 struct ceph_hobject_id hoid; 1746 1747 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid); 1748 if (!spg) 1749 return false; 1750 1751 hoid_fill_from_target(&hoid, &req->r_t); 1752 backoff = lookup_containing_backoff(&spg->backoffs, &hoid); 1753 if (!backoff) 1754 return false; 1755 1756 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n", 1757 __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool, 1758 backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id); 1759 return true; 1760 } 1761 1762 static void setup_request_data(struct ceph_osd_request *req, 1763 struct ceph_msg *msg) 1764 { 1765 u32 data_len = 0; 1766 int i; 1767 1768 if (!list_empty(&msg->data)) 1769 return; 1770 1771 WARN_ON(msg->data_length); 1772 for (i = 0; i < req->r_num_ops; i++) { 1773 struct ceph_osd_req_op *op = &req->r_ops[i]; 1774 1775 switch (op->op) { 1776 /* request */ 1777 case CEPH_OSD_OP_WRITE: 1778 case CEPH_OSD_OP_WRITEFULL: 1779 WARN_ON(op->indata_len != op->extent.length); 1780 ceph_osdc_msg_data_add(msg, &op->extent.osd_data); 1781 break; 1782 case CEPH_OSD_OP_SETXATTR: 1783 case CEPH_OSD_OP_CMPXATTR: 1784 WARN_ON(op->indata_len != op->xattr.name_len + 1785 op->xattr.value_len); 1786 ceph_osdc_msg_data_add(msg, &op->xattr.osd_data); 1787 break; 1788 case CEPH_OSD_OP_NOTIFY_ACK: 1789 ceph_osdc_msg_data_add(msg, 1790 &op->notify_ack.request_data); 1791 break; 1792 1793 /* reply */ 1794 case CEPH_OSD_OP_STAT: 1795 ceph_osdc_msg_data_add(req->r_reply, 1796 &op->raw_data_in); 1797 break; 1798 case CEPH_OSD_OP_READ: 1799 ceph_osdc_msg_data_add(req->r_reply, 1800 &op->extent.osd_data); 1801 break; 1802 case CEPH_OSD_OP_LIST_WATCHERS: 1803 ceph_osdc_msg_data_add(req->r_reply, 1804 &op->list_watchers.response_data); 1805 break; 1806 1807 /* both */ 1808 case CEPH_OSD_OP_CALL: 1809 WARN_ON(op->indata_len != op->cls.class_len + 1810 op->cls.method_len + 1811 op->cls.indata_len); 1812 ceph_osdc_msg_data_add(msg, &op->cls.request_info); 1813 /* optional, can be NONE */ 1814 ceph_osdc_msg_data_add(msg, &op->cls.request_data); 1815 /* optional, can be NONE */ 1816 ceph_osdc_msg_data_add(req->r_reply, 1817 &op->cls.response_data); 1818 break; 1819 case CEPH_OSD_OP_NOTIFY: 1820 ceph_osdc_msg_data_add(msg, 1821 &op->notify.request_data); 1822 ceph_osdc_msg_data_add(req->r_reply, 1823 &op->notify.response_data); 1824 break; 1825 } 1826 1827 data_len += op->indata_len; 1828 } 1829 1830 WARN_ON(data_len != msg->data_length); 1831 } 1832 1833 static void encode_pgid(void **p, const struct ceph_pg *pgid) 1834 { 1835 ceph_encode_8(p, 1); 1836 ceph_encode_64(p, pgid->pool); 1837 ceph_encode_32(p, pgid->seed); 1838 ceph_encode_32(p, -1); /* preferred */ 1839 } 1840 1841 static void encode_spgid(void **p, const struct ceph_spg *spgid) 1842 { 1843 ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1); 1844 encode_pgid(p, &spgid->pgid); 1845 ceph_encode_8(p, spgid->shard); 1846 } 1847 1848 static void encode_oloc(void **p, void *end, 1849 const struct ceph_object_locator *oloc) 1850 { 1851 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc)); 1852 ceph_encode_64(p, oloc->pool); 1853 ceph_encode_32(p, -1); /* preferred */ 1854 ceph_encode_32(p, 0); /* key len */ 1855 if (oloc->pool_ns) 1856 ceph_encode_string(p, end, oloc->pool_ns->str, 1857 oloc->pool_ns->len); 1858 else 1859 ceph_encode_32(p, 0); 1860 } 1861 1862 static void encode_request_partial(struct ceph_osd_request *req, 1863 struct ceph_msg *msg) 1864 { 1865 void *p = msg->front.iov_base; 1866 void *const end = p + msg->front_alloc_len; 1867 u32 data_len = 0; 1868 int i; 1869 1870 if (req->r_flags & CEPH_OSD_FLAG_WRITE) { 1871 /* snapshots aren't writeable */ 1872 WARN_ON(req->r_snapid != CEPH_NOSNAP); 1873 } else { 1874 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec || 1875 req->r_data_offset || req->r_snapc); 1876 } 1877 1878 setup_request_data(req, msg); 1879 1880 encode_spgid(&p, &req->r_t.spgid); /* actual spg */ 1881 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */ 1882 ceph_encode_32(&p, req->r_osdc->osdmap->epoch); 1883 ceph_encode_32(&p, req->r_flags); 1884 1885 /* reqid */ 1886 ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid)); 1887 memset(p, 0, sizeof(struct ceph_osd_reqid)); 1888 p += sizeof(struct ceph_osd_reqid); 1889 1890 /* trace */ 1891 memset(p, 0, sizeof(struct ceph_blkin_trace_info)); 1892 p += sizeof(struct ceph_blkin_trace_info); 1893 1894 ceph_encode_32(&p, 0); /* client_inc, always 0 */ 1895 ceph_encode_timespec(p, &req->r_mtime); 1896 p += sizeof(struct ceph_timespec); 1897 1898 encode_oloc(&p, end, &req->r_t.target_oloc); 1899 ceph_encode_string(&p, end, req->r_t.target_oid.name, 1900 req->r_t.target_oid.name_len); 1901 1902 /* ops, can imply data */ 1903 ceph_encode_16(&p, req->r_num_ops); 1904 for (i = 0; i < req->r_num_ops; i++) { 1905 data_len += osd_req_encode_op(p, &req->r_ops[i]); 1906 p += sizeof(struct ceph_osd_op); 1907 } 1908 1909 ceph_encode_64(&p, req->r_snapid); /* snapid */ 1910 if (req->r_snapc) { 1911 ceph_encode_64(&p, req->r_snapc->seq); 1912 ceph_encode_32(&p, req->r_snapc->num_snaps); 1913 for (i = 0; i < req->r_snapc->num_snaps; i++) 1914 ceph_encode_64(&p, req->r_snapc->snaps[i]); 1915 } else { 1916 ceph_encode_64(&p, 0); /* snap_seq */ 1917 ceph_encode_32(&p, 0); /* snaps len */ 1918 } 1919 1920 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */ 1921 BUG_ON(p > end - 8); /* space for features */ 1922 1923 msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */ 1924 /* front_len is finalized in encode_request_finish() */ 1925 msg->front.iov_len = p - msg->front.iov_base; 1926 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 1927 msg->hdr.data_len = cpu_to_le32(data_len); 1928 /* 1929 * The header "data_off" is a hint to the receiver allowing it 1930 * to align received data into its buffers such that there's no 1931 * need to re-copy it before writing it to disk (direct I/O). 1932 */ 1933 msg->hdr.data_off = cpu_to_le16(req->r_data_offset); 1934 1935 dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg, 1936 req->r_t.target_oid.name, req->r_t.target_oid.name_len); 1937 } 1938 1939 static void encode_request_finish(struct ceph_msg *msg) 1940 { 1941 void *p = msg->front.iov_base; 1942 void *const partial_end = p + msg->front.iov_len; 1943 void *const end = p + msg->front_alloc_len; 1944 1945 if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) { 1946 /* luminous OSD -- encode features and be done */ 1947 p = partial_end; 1948 ceph_encode_64(&p, msg->con->peer_features); 1949 } else { 1950 struct { 1951 char spgid[CEPH_ENCODING_START_BLK_LEN + 1952 CEPH_PGID_ENCODING_LEN + 1]; 1953 __le32 hash; 1954 __le32 epoch; 1955 __le32 flags; 1956 char reqid[CEPH_ENCODING_START_BLK_LEN + 1957 sizeof(struct ceph_osd_reqid)]; 1958 char trace[sizeof(struct ceph_blkin_trace_info)]; 1959 __le32 client_inc; 1960 struct ceph_timespec mtime; 1961 } __packed head; 1962 struct ceph_pg pgid; 1963 void *oloc, *oid, *tail; 1964 int oloc_len, oid_len, tail_len; 1965 int len; 1966 1967 /* 1968 * Pre-luminous OSD -- reencode v8 into v4 using @head 1969 * as a temporary buffer. Encode the raw PG; the rest 1970 * is just a matter of moving oloc, oid and tail blobs 1971 * around. 1972 */ 1973 memcpy(&head, p, sizeof(head)); 1974 p += sizeof(head); 1975 1976 oloc = p; 1977 p += CEPH_ENCODING_START_BLK_LEN; 1978 pgid.pool = ceph_decode_64(&p); 1979 p += 4 + 4; /* preferred, key len */ 1980 len = ceph_decode_32(&p); 1981 p += len; /* nspace */ 1982 oloc_len = p - oloc; 1983 1984 oid = p; 1985 len = ceph_decode_32(&p); 1986 p += len; 1987 oid_len = p - oid; 1988 1989 tail = p; 1990 tail_len = partial_end - p; 1991 1992 p = msg->front.iov_base; 1993 ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc)); 1994 ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch)); 1995 ceph_encode_copy(&p, &head.flags, sizeof(head.flags)); 1996 ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime)); 1997 1998 /* reassert_version */ 1999 memset(p, 0, sizeof(struct ceph_eversion)); 2000 p += sizeof(struct ceph_eversion); 2001 2002 BUG_ON(p >= oloc); 2003 memmove(p, oloc, oloc_len); 2004 p += oloc_len; 2005 2006 pgid.seed = le32_to_cpu(head.hash); 2007 encode_pgid(&p, &pgid); /* raw pg */ 2008 2009 BUG_ON(p >= oid); 2010 memmove(p, oid, oid_len); 2011 p += oid_len; 2012 2013 /* tail -- ops, snapid, snapc, retry_attempt */ 2014 BUG_ON(p >= tail); 2015 memmove(p, tail, tail_len); 2016 p += tail_len; 2017 2018 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */ 2019 } 2020 2021 BUG_ON(p > end); 2022 msg->front.iov_len = p - msg->front.iov_base; 2023 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 2024 2025 dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg, 2026 le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len), 2027 le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len), 2028 le16_to_cpu(msg->hdr.version)); 2029 } 2030 2031 /* 2032 * @req has to be assigned a tid and registered. 2033 */ 2034 static void send_request(struct ceph_osd_request *req) 2035 { 2036 struct ceph_osd *osd = req->r_osd; 2037 2038 verify_osd_locked(osd); 2039 WARN_ON(osd->o_osd != req->r_t.osd); 2040 2041 /* backoff? */ 2042 if (should_plug_request(req)) 2043 return; 2044 2045 /* 2046 * We may have a previously queued request message hanging 2047 * around. Cancel it to avoid corrupting the msgr. 2048 */ 2049 if (req->r_sent) 2050 ceph_msg_revoke(req->r_request); 2051 2052 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR; 2053 if (req->r_attempts) 2054 req->r_flags |= CEPH_OSD_FLAG_RETRY; 2055 else 2056 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY); 2057 2058 encode_request_partial(req, req->r_request); 2059 2060 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n", 2061 __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, 2062 req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed, 2063 req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags, 2064 req->r_attempts); 2065 2066 req->r_t.paused = false; 2067 req->r_stamp = jiffies; 2068 req->r_attempts++; 2069 2070 req->r_sent = osd->o_incarnation; 2071 req->r_request->hdr.tid = cpu_to_le64(req->r_tid); 2072 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request)); 2073 } 2074 2075 static void maybe_request_map(struct ceph_osd_client *osdc) 2076 { 2077 bool continuous = false; 2078 2079 verify_osdc_locked(osdc); 2080 WARN_ON(!osdc->osdmap->epoch); 2081 2082 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2083 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) || 2084 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2085 dout("%s osdc %p continuous\n", __func__, osdc); 2086 continuous = true; 2087 } else { 2088 dout("%s osdc %p onetime\n", __func__, osdc); 2089 } 2090 2091 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 2092 osdc->osdmap->epoch + 1, continuous)) 2093 ceph_monc_renew_subs(&osdc->client->monc); 2094 } 2095 2096 static void complete_request(struct ceph_osd_request *req, int err); 2097 static void send_map_check(struct ceph_osd_request *req); 2098 2099 static void __submit_request(struct ceph_osd_request *req, bool wrlocked) 2100 { 2101 struct ceph_osd_client *osdc = req->r_osdc; 2102 struct ceph_osd *osd; 2103 enum calc_target_result ct_res; 2104 bool need_send = false; 2105 bool promoted = false; 2106 bool need_abort = false; 2107 2108 WARN_ON(req->r_tid); 2109 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); 2110 2111 again: 2112 ct_res = calc_target(osdc, &req->r_t, NULL, false); 2113 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked) 2114 goto promote; 2115 2116 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked); 2117 if (IS_ERR(osd)) { 2118 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked); 2119 goto promote; 2120 } 2121 2122 if (osdc->osdmap->epoch < osdc->epoch_barrier) { 2123 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, 2124 osdc->epoch_barrier); 2125 req->r_t.paused = true; 2126 maybe_request_map(osdc); 2127 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2128 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { 2129 dout("req %p pausewr\n", req); 2130 req->r_t.paused = true; 2131 maybe_request_map(osdc); 2132 } else if ((req->r_flags & CEPH_OSD_FLAG_READ) && 2133 ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2134 dout("req %p pauserd\n", req); 2135 req->r_t.paused = true; 2136 maybe_request_map(osdc); 2137 } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && 2138 !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY | 2139 CEPH_OSD_FLAG_FULL_FORCE)) && 2140 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2141 pool_full(osdc, req->r_t.base_oloc.pool))) { 2142 dout("req %p full/pool_full\n", req); 2143 pr_warn_ratelimited("FULL or reached pool quota\n"); 2144 req->r_t.paused = true; 2145 maybe_request_map(osdc); 2146 if (req->r_abort_on_full) 2147 need_abort = true; 2148 } else if (!osd_homeless(osd)) { 2149 need_send = true; 2150 } else { 2151 maybe_request_map(osdc); 2152 } 2153 2154 mutex_lock(&osd->lock); 2155 /* 2156 * Assign the tid atomically with send_request() to protect 2157 * multiple writes to the same object from racing with each 2158 * other, resulting in out of order ops on the OSDs. 2159 */ 2160 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2161 link_request(osd, req); 2162 if (need_send) 2163 send_request(req); 2164 else if (need_abort) 2165 complete_request(req, -ENOSPC); 2166 mutex_unlock(&osd->lock); 2167 2168 if (ct_res == CALC_TARGET_POOL_DNE) 2169 send_map_check(req); 2170 2171 if (promoted) 2172 downgrade_write(&osdc->lock); 2173 return; 2174 2175 promote: 2176 up_read(&osdc->lock); 2177 down_write(&osdc->lock); 2178 wrlocked = true; 2179 promoted = true; 2180 goto again; 2181 } 2182 2183 static void account_request(struct ceph_osd_request *req) 2184 { 2185 WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK)); 2186 WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); 2187 2188 req->r_flags |= CEPH_OSD_FLAG_ONDISK; 2189 atomic_inc(&req->r_osdc->num_requests); 2190 2191 req->r_start_stamp = jiffies; 2192 } 2193 2194 static void submit_request(struct ceph_osd_request *req, bool wrlocked) 2195 { 2196 ceph_osdc_get_request(req); 2197 account_request(req); 2198 __submit_request(req, wrlocked); 2199 } 2200 2201 static void finish_request(struct ceph_osd_request *req) 2202 { 2203 struct ceph_osd_client *osdc = req->r_osdc; 2204 2205 WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); 2206 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2207 2208 if (req->r_osd) 2209 unlink_request(req->r_osd, req); 2210 atomic_dec(&osdc->num_requests); 2211 2212 /* 2213 * If an OSD has failed or returned and a request has been sent 2214 * twice, it's possible to get a reply and end up here while the 2215 * request message is queued for delivery. We will ignore the 2216 * reply, so not a big deal, but better to try and catch it. 2217 */ 2218 ceph_msg_revoke(req->r_request); 2219 ceph_msg_revoke_incoming(req->r_reply); 2220 } 2221 2222 static void __complete_request(struct ceph_osd_request *req) 2223 { 2224 if (req->r_callback) { 2225 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, 2226 req->r_tid, req->r_callback, req->r_result); 2227 req->r_callback(req); 2228 } 2229 } 2230 2231 /* 2232 * This is open-coded in handle_reply(). 2233 */ 2234 static void complete_request(struct ceph_osd_request *req, int err) 2235 { 2236 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2237 2238 req->r_result = err; 2239 finish_request(req); 2240 __complete_request(req); 2241 complete_all(&req->r_completion); 2242 ceph_osdc_put_request(req); 2243 } 2244 2245 static void cancel_map_check(struct ceph_osd_request *req) 2246 { 2247 struct ceph_osd_client *osdc = req->r_osdc; 2248 struct ceph_osd_request *lookup_req; 2249 2250 verify_osdc_wrlocked(osdc); 2251 2252 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2253 if (!lookup_req) 2254 return; 2255 2256 WARN_ON(lookup_req != req); 2257 erase_request_mc(&osdc->map_checks, req); 2258 ceph_osdc_put_request(req); 2259 } 2260 2261 static void cancel_request(struct ceph_osd_request *req) 2262 { 2263 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 2264 2265 cancel_map_check(req); 2266 finish_request(req); 2267 complete_all(&req->r_completion); 2268 ceph_osdc_put_request(req); 2269 } 2270 2271 static void abort_request(struct ceph_osd_request *req, int err) 2272 { 2273 dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err); 2274 2275 cancel_map_check(req); 2276 complete_request(req, err); 2277 } 2278 2279 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2280 { 2281 if (likely(eb > osdc->epoch_barrier)) { 2282 dout("updating epoch_barrier from %u to %u\n", 2283 osdc->epoch_barrier, eb); 2284 osdc->epoch_barrier = eb; 2285 /* Request map if we're not to the barrier yet */ 2286 if (eb > osdc->osdmap->epoch) 2287 maybe_request_map(osdc); 2288 } 2289 } 2290 2291 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) 2292 { 2293 down_read(&osdc->lock); 2294 if (unlikely(eb > osdc->epoch_barrier)) { 2295 up_read(&osdc->lock); 2296 down_write(&osdc->lock); 2297 update_epoch_barrier(osdc, eb); 2298 up_write(&osdc->lock); 2299 } else { 2300 up_read(&osdc->lock); 2301 } 2302 } 2303 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); 2304 2305 /* 2306 * Drop all pending requests that are stalled waiting on a full condition to 2307 * clear, and complete them with ENOSPC as the return code. Set the 2308 * osdc->epoch_barrier to the latest map epoch that we've seen if any were 2309 * cancelled. 2310 */ 2311 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) 2312 { 2313 struct rb_node *n; 2314 bool victims = false; 2315 2316 dout("enter abort_on_full\n"); 2317 2318 if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) 2319 goto out; 2320 2321 /* Scan list and see if there is anything to abort */ 2322 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 2323 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 2324 struct rb_node *m; 2325 2326 m = rb_first(&osd->o_requests); 2327 while (m) { 2328 struct ceph_osd_request *req = rb_entry(m, 2329 struct ceph_osd_request, r_node); 2330 m = rb_next(m); 2331 2332 if (req->r_abort_on_full) { 2333 victims = true; 2334 break; 2335 } 2336 } 2337 if (victims) 2338 break; 2339 } 2340 2341 if (!victims) 2342 goto out; 2343 2344 /* 2345 * Update the barrier to current epoch if it's behind that point, 2346 * since we know we have some calls to be aborted in the tree. 2347 */ 2348 update_epoch_barrier(osdc, osdc->osdmap->epoch); 2349 2350 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 2351 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 2352 struct rb_node *m; 2353 2354 m = rb_first(&osd->o_requests); 2355 while (m) { 2356 struct ceph_osd_request *req = rb_entry(m, 2357 struct ceph_osd_request, r_node); 2358 m = rb_next(m); 2359 2360 if (req->r_abort_on_full && 2361 (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 2362 pool_full(osdc, req->r_t.target_oloc.pool))) 2363 abort_request(req, -ENOSPC); 2364 } 2365 } 2366 out: 2367 dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); 2368 } 2369 2370 static void check_pool_dne(struct ceph_osd_request *req) 2371 { 2372 struct ceph_osd_client *osdc = req->r_osdc; 2373 struct ceph_osdmap *map = osdc->osdmap; 2374 2375 verify_osdc_wrlocked(osdc); 2376 WARN_ON(!map->epoch); 2377 2378 if (req->r_attempts) { 2379 /* 2380 * We sent a request earlier, which means that 2381 * previously the pool existed, and now it does not 2382 * (i.e., it was deleted). 2383 */ 2384 req->r_map_dne_bound = map->epoch; 2385 dout("%s req %p tid %llu pool disappeared\n", __func__, req, 2386 req->r_tid); 2387 } else { 2388 dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__, 2389 req, req->r_tid, req->r_map_dne_bound, map->epoch); 2390 } 2391 2392 if (req->r_map_dne_bound) { 2393 if (map->epoch >= req->r_map_dne_bound) { 2394 /* we had a new enough map */ 2395 pr_info_ratelimited("tid %llu pool does not exist\n", 2396 req->r_tid); 2397 complete_request(req, -ENOENT); 2398 } 2399 } else { 2400 send_map_check(req); 2401 } 2402 } 2403 2404 static void map_check_cb(struct ceph_mon_generic_request *greq) 2405 { 2406 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 2407 struct ceph_osd_request *req; 2408 u64 tid = greq->private_data; 2409 2410 WARN_ON(greq->result || !greq->u.newest); 2411 2412 down_write(&osdc->lock); 2413 req = lookup_request_mc(&osdc->map_checks, tid); 2414 if (!req) { 2415 dout("%s tid %llu dne\n", __func__, tid); 2416 goto out_unlock; 2417 } 2418 2419 dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__, 2420 req, req->r_tid, req->r_map_dne_bound, greq->u.newest); 2421 if (!req->r_map_dne_bound) 2422 req->r_map_dne_bound = greq->u.newest; 2423 erase_request_mc(&osdc->map_checks, req); 2424 check_pool_dne(req); 2425 2426 ceph_osdc_put_request(req); 2427 out_unlock: 2428 up_write(&osdc->lock); 2429 } 2430 2431 static void send_map_check(struct ceph_osd_request *req) 2432 { 2433 struct ceph_osd_client *osdc = req->r_osdc; 2434 struct ceph_osd_request *lookup_req; 2435 int ret; 2436 2437 verify_osdc_wrlocked(osdc); 2438 2439 lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid); 2440 if (lookup_req) { 2441 WARN_ON(lookup_req != req); 2442 return; 2443 } 2444 2445 ceph_osdc_get_request(req); 2446 insert_request_mc(&osdc->map_checks, req); 2447 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 2448 map_check_cb, req->r_tid); 2449 WARN_ON(ret); 2450 } 2451 2452 /* 2453 * lingering requests, watch/notify v2 infrastructure 2454 */ 2455 static void linger_release(struct kref *kref) 2456 { 2457 struct ceph_osd_linger_request *lreq = 2458 container_of(kref, struct ceph_osd_linger_request, kref); 2459 2460 dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq, 2461 lreq->reg_req, lreq->ping_req); 2462 WARN_ON(!RB_EMPTY_NODE(&lreq->node)); 2463 WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node)); 2464 WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node)); 2465 WARN_ON(!list_empty(&lreq->scan_item)); 2466 WARN_ON(!list_empty(&lreq->pending_lworks)); 2467 WARN_ON(lreq->osd); 2468 2469 if (lreq->reg_req) 2470 ceph_osdc_put_request(lreq->reg_req); 2471 if (lreq->ping_req) 2472 ceph_osdc_put_request(lreq->ping_req); 2473 target_destroy(&lreq->t); 2474 kfree(lreq); 2475 } 2476 2477 static void linger_put(struct ceph_osd_linger_request *lreq) 2478 { 2479 if (lreq) 2480 kref_put(&lreq->kref, linger_release); 2481 } 2482 2483 static struct ceph_osd_linger_request * 2484 linger_get(struct ceph_osd_linger_request *lreq) 2485 { 2486 kref_get(&lreq->kref); 2487 return lreq; 2488 } 2489 2490 static struct ceph_osd_linger_request * 2491 linger_alloc(struct ceph_osd_client *osdc) 2492 { 2493 struct ceph_osd_linger_request *lreq; 2494 2495 lreq = kzalloc(sizeof(*lreq), GFP_NOIO); 2496 if (!lreq) 2497 return NULL; 2498 2499 kref_init(&lreq->kref); 2500 mutex_init(&lreq->lock); 2501 RB_CLEAR_NODE(&lreq->node); 2502 RB_CLEAR_NODE(&lreq->osdc_node); 2503 RB_CLEAR_NODE(&lreq->mc_node); 2504 INIT_LIST_HEAD(&lreq->scan_item); 2505 INIT_LIST_HEAD(&lreq->pending_lworks); 2506 init_completion(&lreq->reg_commit_wait); 2507 init_completion(&lreq->notify_finish_wait); 2508 2509 lreq->osdc = osdc; 2510 target_init(&lreq->t); 2511 2512 dout("%s lreq %p\n", __func__, lreq); 2513 return lreq; 2514 } 2515 2516 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node) 2517 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node) 2518 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node) 2519 2520 /* 2521 * Create linger request <-> OSD session relation. 2522 * 2523 * @lreq has to be registered, @osd may be homeless. 2524 */ 2525 static void link_linger(struct ceph_osd *osd, 2526 struct ceph_osd_linger_request *lreq) 2527 { 2528 verify_osd_locked(osd); 2529 WARN_ON(!lreq->linger_id || lreq->osd); 2530 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2531 osd->o_osd, lreq, lreq->linger_id); 2532 2533 if (!osd_homeless(osd)) 2534 __remove_osd_from_lru(osd); 2535 else 2536 atomic_inc(&osd->o_osdc->num_homeless); 2537 2538 get_osd(osd); 2539 insert_linger(&osd->o_linger_requests, lreq); 2540 lreq->osd = osd; 2541 } 2542 2543 static void unlink_linger(struct ceph_osd *osd, 2544 struct ceph_osd_linger_request *lreq) 2545 { 2546 verify_osd_locked(osd); 2547 WARN_ON(lreq->osd != osd); 2548 dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd, 2549 osd->o_osd, lreq, lreq->linger_id); 2550 2551 lreq->osd = NULL; 2552 erase_linger(&osd->o_linger_requests, lreq); 2553 put_osd(osd); 2554 2555 if (!osd_homeless(osd)) 2556 maybe_move_osd_to_lru(osd); 2557 else 2558 atomic_dec(&osd->o_osdc->num_homeless); 2559 } 2560 2561 static bool __linger_registered(struct ceph_osd_linger_request *lreq) 2562 { 2563 verify_osdc_locked(lreq->osdc); 2564 2565 return !RB_EMPTY_NODE(&lreq->osdc_node); 2566 } 2567 2568 static bool linger_registered(struct ceph_osd_linger_request *lreq) 2569 { 2570 struct ceph_osd_client *osdc = lreq->osdc; 2571 bool registered; 2572 2573 down_read(&osdc->lock); 2574 registered = __linger_registered(lreq); 2575 up_read(&osdc->lock); 2576 2577 return registered; 2578 } 2579 2580 static void linger_register(struct ceph_osd_linger_request *lreq) 2581 { 2582 struct ceph_osd_client *osdc = lreq->osdc; 2583 2584 verify_osdc_wrlocked(osdc); 2585 WARN_ON(lreq->linger_id); 2586 2587 linger_get(lreq); 2588 lreq->linger_id = ++osdc->last_linger_id; 2589 insert_linger_osdc(&osdc->linger_requests, lreq); 2590 } 2591 2592 static void linger_unregister(struct ceph_osd_linger_request *lreq) 2593 { 2594 struct ceph_osd_client *osdc = lreq->osdc; 2595 2596 verify_osdc_wrlocked(osdc); 2597 2598 erase_linger_osdc(&osdc->linger_requests, lreq); 2599 linger_put(lreq); 2600 } 2601 2602 static void cancel_linger_request(struct ceph_osd_request *req) 2603 { 2604 struct ceph_osd_linger_request *lreq = req->r_priv; 2605 2606 WARN_ON(!req->r_linger); 2607 cancel_request(req); 2608 linger_put(lreq); 2609 } 2610 2611 struct linger_work { 2612 struct work_struct work; 2613 struct ceph_osd_linger_request *lreq; 2614 struct list_head pending_item; 2615 unsigned long queued_stamp; 2616 2617 union { 2618 struct { 2619 u64 notify_id; 2620 u64 notifier_id; 2621 void *payload; /* points into @msg front */ 2622 size_t payload_len; 2623 2624 struct ceph_msg *msg; /* for ceph_msg_put() */ 2625 } notify; 2626 struct { 2627 int err; 2628 } error; 2629 }; 2630 }; 2631 2632 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq, 2633 work_func_t workfn) 2634 { 2635 struct linger_work *lwork; 2636 2637 lwork = kzalloc(sizeof(*lwork), GFP_NOIO); 2638 if (!lwork) 2639 return NULL; 2640 2641 INIT_WORK(&lwork->work, workfn); 2642 INIT_LIST_HEAD(&lwork->pending_item); 2643 lwork->lreq = linger_get(lreq); 2644 2645 return lwork; 2646 } 2647 2648 static void lwork_free(struct linger_work *lwork) 2649 { 2650 struct ceph_osd_linger_request *lreq = lwork->lreq; 2651 2652 mutex_lock(&lreq->lock); 2653 list_del(&lwork->pending_item); 2654 mutex_unlock(&lreq->lock); 2655 2656 linger_put(lreq); 2657 kfree(lwork); 2658 } 2659 2660 static void lwork_queue(struct linger_work *lwork) 2661 { 2662 struct ceph_osd_linger_request *lreq = lwork->lreq; 2663 struct ceph_osd_client *osdc = lreq->osdc; 2664 2665 verify_lreq_locked(lreq); 2666 WARN_ON(!list_empty(&lwork->pending_item)); 2667 2668 lwork->queued_stamp = jiffies; 2669 list_add_tail(&lwork->pending_item, &lreq->pending_lworks); 2670 queue_work(osdc->notify_wq, &lwork->work); 2671 } 2672 2673 static void do_watch_notify(struct work_struct *w) 2674 { 2675 struct linger_work *lwork = container_of(w, struct linger_work, work); 2676 struct ceph_osd_linger_request *lreq = lwork->lreq; 2677 2678 if (!linger_registered(lreq)) { 2679 dout("%s lreq %p not registered\n", __func__, lreq); 2680 goto out; 2681 } 2682 2683 WARN_ON(!lreq->is_watch); 2684 dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n", 2685 __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id, 2686 lwork->notify.payload_len); 2687 lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id, 2688 lwork->notify.notifier_id, lwork->notify.payload, 2689 lwork->notify.payload_len); 2690 2691 out: 2692 ceph_msg_put(lwork->notify.msg); 2693 lwork_free(lwork); 2694 } 2695 2696 static void do_watch_error(struct work_struct *w) 2697 { 2698 struct linger_work *lwork = container_of(w, struct linger_work, work); 2699 struct ceph_osd_linger_request *lreq = lwork->lreq; 2700 2701 if (!linger_registered(lreq)) { 2702 dout("%s lreq %p not registered\n", __func__, lreq); 2703 goto out; 2704 } 2705 2706 dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err); 2707 lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err); 2708 2709 out: 2710 lwork_free(lwork); 2711 } 2712 2713 static void queue_watch_error(struct ceph_osd_linger_request *lreq) 2714 { 2715 struct linger_work *lwork; 2716 2717 lwork = lwork_alloc(lreq, do_watch_error); 2718 if (!lwork) { 2719 pr_err("failed to allocate error-lwork\n"); 2720 return; 2721 } 2722 2723 lwork->error.err = lreq->last_error; 2724 lwork_queue(lwork); 2725 } 2726 2727 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq, 2728 int result) 2729 { 2730 if (!completion_done(&lreq->reg_commit_wait)) { 2731 lreq->reg_commit_error = (result <= 0 ? result : 0); 2732 complete_all(&lreq->reg_commit_wait); 2733 } 2734 } 2735 2736 static void linger_commit_cb(struct ceph_osd_request *req) 2737 { 2738 struct ceph_osd_linger_request *lreq = req->r_priv; 2739 2740 mutex_lock(&lreq->lock); 2741 dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, 2742 lreq->linger_id, req->r_result); 2743 linger_reg_commit_complete(lreq, req->r_result); 2744 lreq->committed = true; 2745 2746 if (!lreq->is_watch) { 2747 struct ceph_osd_data *osd_data = 2748 osd_req_op_data(req, 0, notify, response_data); 2749 void *p = page_address(osd_data->pages[0]); 2750 2751 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY || 2752 osd_data->type != CEPH_OSD_DATA_TYPE_PAGES); 2753 2754 /* make note of the notify_id */ 2755 if (req->r_ops[0].outdata_len >= sizeof(u64)) { 2756 lreq->notify_id = ceph_decode_64(&p); 2757 dout("lreq %p notify_id %llu\n", lreq, 2758 lreq->notify_id); 2759 } else { 2760 dout("lreq %p no notify_id\n", lreq); 2761 } 2762 } 2763 2764 mutex_unlock(&lreq->lock); 2765 linger_put(lreq); 2766 } 2767 2768 static int normalize_watch_error(int err) 2769 { 2770 /* 2771 * Translate ENOENT -> ENOTCONN so that a delete->disconnection 2772 * notification and a failure to reconnect because we raced with 2773 * the delete appear the same to the user. 2774 */ 2775 if (err == -ENOENT) 2776 err = -ENOTCONN; 2777 2778 return err; 2779 } 2780 2781 static void linger_reconnect_cb(struct ceph_osd_request *req) 2782 { 2783 struct ceph_osd_linger_request *lreq = req->r_priv; 2784 2785 mutex_lock(&lreq->lock); 2786 dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__, 2787 lreq, lreq->linger_id, req->r_result, lreq->last_error); 2788 if (req->r_result < 0) { 2789 if (!lreq->last_error) { 2790 lreq->last_error = normalize_watch_error(req->r_result); 2791 queue_watch_error(lreq); 2792 } 2793 } 2794 2795 mutex_unlock(&lreq->lock); 2796 linger_put(lreq); 2797 } 2798 2799 static void send_linger(struct ceph_osd_linger_request *lreq) 2800 { 2801 struct ceph_osd_request *req = lreq->reg_req; 2802 struct ceph_osd_req_op *op = &req->r_ops[0]; 2803 2804 verify_osdc_wrlocked(req->r_osdc); 2805 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 2806 2807 if (req->r_osd) 2808 cancel_linger_request(req); 2809 2810 request_reinit(req); 2811 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 2812 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 2813 req->r_flags = lreq->t.flags; 2814 req->r_mtime = lreq->mtime; 2815 2816 mutex_lock(&lreq->lock); 2817 if (lreq->is_watch && lreq->committed) { 2818 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2819 op->watch.cookie != lreq->linger_id); 2820 op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT; 2821 op->watch.gen = ++lreq->register_gen; 2822 dout("lreq %p reconnect register_gen %u\n", lreq, 2823 op->watch.gen); 2824 req->r_callback = linger_reconnect_cb; 2825 } else { 2826 if (!lreq->is_watch) 2827 lreq->notify_id = 0; 2828 else 2829 WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH); 2830 dout("lreq %p register\n", lreq); 2831 req->r_callback = linger_commit_cb; 2832 } 2833 mutex_unlock(&lreq->lock); 2834 2835 req->r_priv = linger_get(lreq); 2836 req->r_linger = true; 2837 2838 submit_request(req, true); 2839 } 2840 2841 static void linger_ping_cb(struct ceph_osd_request *req) 2842 { 2843 struct ceph_osd_linger_request *lreq = req->r_priv; 2844 2845 mutex_lock(&lreq->lock); 2846 dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n", 2847 __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent, 2848 lreq->last_error); 2849 if (lreq->register_gen == req->r_ops[0].watch.gen) { 2850 if (!req->r_result) { 2851 lreq->watch_valid_thru = lreq->ping_sent; 2852 } else if (!lreq->last_error) { 2853 lreq->last_error = normalize_watch_error(req->r_result); 2854 queue_watch_error(lreq); 2855 } 2856 } else { 2857 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq, 2858 lreq->register_gen, req->r_ops[0].watch.gen); 2859 } 2860 2861 mutex_unlock(&lreq->lock); 2862 linger_put(lreq); 2863 } 2864 2865 static void send_linger_ping(struct ceph_osd_linger_request *lreq) 2866 { 2867 struct ceph_osd_client *osdc = lreq->osdc; 2868 struct ceph_osd_request *req = lreq->ping_req; 2869 struct ceph_osd_req_op *op = &req->r_ops[0]; 2870 2871 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) { 2872 dout("%s PAUSERD\n", __func__); 2873 return; 2874 } 2875 2876 lreq->ping_sent = jiffies; 2877 dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n", 2878 __func__, lreq, lreq->linger_id, lreq->ping_sent, 2879 lreq->register_gen); 2880 2881 if (req->r_osd) 2882 cancel_linger_request(req); 2883 2884 request_reinit(req); 2885 target_copy(&req->r_t, &lreq->t); 2886 2887 WARN_ON(op->op != CEPH_OSD_OP_WATCH || 2888 op->watch.cookie != lreq->linger_id || 2889 op->watch.op != CEPH_OSD_WATCH_OP_PING); 2890 op->watch.gen = lreq->register_gen; 2891 req->r_callback = linger_ping_cb; 2892 req->r_priv = linger_get(lreq); 2893 req->r_linger = true; 2894 2895 ceph_osdc_get_request(req); 2896 account_request(req); 2897 req->r_tid = atomic64_inc_return(&osdc->last_tid); 2898 link_request(lreq->osd, req); 2899 send_request(req); 2900 } 2901 2902 static void linger_submit(struct ceph_osd_linger_request *lreq) 2903 { 2904 struct ceph_osd_client *osdc = lreq->osdc; 2905 struct ceph_osd *osd; 2906 2907 calc_target(osdc, &lreq->t, NULL, false); 2908 osd = lookup_create_osd(osdc, lreq->t.osd, true); 2909 link_linger(osd, lreq); 2910 2911 send_linger(lreq); 2912 } 2913 2914 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq) 2915 { 2916 struct ceph_osd_client *osdc = lreq->osdc; 2917 struct ceph_osd_linger_request *lookup_lreq; 2918 2919 verify_osdc_wrlocked(osdc); 2920 2921 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 2922 lreq->linger_id); 2923 if (!lookup_lreq) 2924 return; 2925 2926 WARN_ON(lookup_lreq != lreq); 2927 erase_linger_mc(&osdc->linger_map_checks, lreq); 2928 linger_put(lreq); 2929 } 2930 2931 /* 2932 * @lreq has to be both registered and linked. 2933 */ 2934 static void __linger_cancel(struct ceph_osd_linger_request *lreq) 2935 { 2936 if (lreq->is_watch && lreq->ping_req->r_osd) 2937 cancel_linger_request(lreq->ping_req); 2938 if (lreq->reg_req->r_osd) 2939 cancel_linger_request(lreq->reg_req); 2940 cancel_linger_map_check(lreq); 2941 unlink_linger(lreq->osd, lreq); 2942 linger_unregister(lreq); 2943 } 2944 2945 static void linger_cancel(struct ceph_osd_linger_request *lreq) 2946 { 2947 struct ceph_osd_client *osdc = lreq->osdc; 2948 2949 down_write(&osdc->lock); 2950 if (__linger_registered(lreq)) 2951 __linger_cancel(lreq); 2952 up_write(&osdc->lock); 2953 } 2954 2955 static void send_linger_map_check(struct ceph_osd_linger_request *lreq); 2956 2957 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq) 2958 { 2959 struct ceph_osd_client *osdc = lreq->osdc; 2960 struct ceph_osdmap *map = osdc->osdmap; 2961 2962 verify_osdc_wrlocked(osdc); 2963 WARN_ON(!map->epoch); 2964 2965 if (lreq->register_gen) { 2966 lreq->map_dne_bound = map->epoch; 2967 dout("%s lreq %p linger_id %llu pool disappeared\n", __func__, 2968 lreq, lreq->linger_id); 2969 } else { 2970 dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n", 2971 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 2972 map->epoch); 2973 } 2974 2975 if (lreq->map_dne_bound) { 2976 if (map->epoch >= lreq->map_dne_bound) { 2977 /* we had a new enough map */ 2978 pr_info("linger_id %llu pool does not exist\n", 2979 lreq->linger_id); 2980 linger_reg_commit_complete(lreq, -ENOENT); 2981 __linger_cancel(lreq); 2982 } 2983 } else { 2984 send_linger_map_check(lreq); 2985 } 2986 } 2987 2988 static void linger_map_check_cb(struct ceph_mon_generic_request *greq) 2989 { 2990 struct ceph_osd_client *osdc = &greq->monc->client->osdc; 2991 struct ceph_osd_linger_request *lreq; 2992 u64 linger_id = greq->private_data; 2993 2994 WARN_ON(greq->result || !greq->u.newest); 2995 2996 down_write(&osdc->lock); 2997 lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id); 2998 if (!lreq) { 2999 dout("%s linger_id %llu dne\n", __func__, linger_id); 3000 goto out_unlock; 3001 } 3002 3003 dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n", 3004 __func__, lreq, lreq->linger_id, lreq->map_dne_bound, 3005 greq->u.newest); 3006 if (!lreq->map_dne_bound) 3007 lreq->map_dne_bound = greq->u.newest; 3008 erase_linger_mc(&osdc->linger_map_checks, lreq); 3009 check_linger_pool_dne(lreq); 3010 3011 linger_put(lreq); 3012 out_unlock: 3013 up_write(&osdc->lock); 3014 } 3015 3016 static void send_linger_map_check(struct ceph_osd_linger_request *lreq) 3017 { 3018 struct ceph_osd_client *osdc = lreq->osdc; 3019 struct ceph_osd_linger_request *lookup_lreq; 3020 int ret; 3021 3022 verify_osdc_wrlocked(osdc); 3023 3024 lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks, 3025 lreq->linger_id); 3026 if (lookup_lreq) { 3027 WARN_ON(lookup_lreq != lreq); 3028 return; 3029 } 3030 3031 linger_get(lreq); 3032 insert_linger_mc(&osdc->linger_map_checks, lreq); 3033 ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap", 3034 linger_map_check_cb, lreq->linger_id); 3035 WARN_ON(ret); 3036 } 3037 3038 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq) 3039 { 3040 int ret; 3041 3042 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3043 ret = wait_for_completion_interruptible(&lreq->reg_commit_wait); 3044 return ret ?: lreq->reg_commit_error; 3045 } 3046 3047 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq) 3048 { 3049 int ret; 3050 3051 dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id); 3052 ret = wait_for_completion_interruptible(&lreq->notify_finish_wait); 3053 return ret ?: lreq->notify_finish_error; 3054 } 3055 3056 /* 3057 * Timeout callback, called every N seconds. When 1 or more OSD 3058 * requests has been active for more than N seconds, we send a keepalive 3059 * (tag + timestamp) to its OSD to ensure any communications channel 3060 * reset is detected. 3061 */ 3062 static void handle_timeout(struct work_struct *work) 3063 { 3064 struct ceph_osd_client *osdc = 3065 container_of(work, struct ceph_osd_client, timeout_work.work); 3066 struct ceph_options *opts = osdc->client->options; 3067 unsigned long cutoff = jiffies - opts->osd_keepalive_timeout; 3068 unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout; 3069 LIST_HEAD(slow_osds); 3070 struct rb_node *n, *p; 3071 3072 dout("%s osdc %p\n", __func__, osdc); 3073 down_write(&osdc->lock); 3074 3075 /* 3076 * ping osds that are a bit slow. this ensures that if there 3077 * is a break in the TCP connection we will notice, and reopen 3078 * a connection with that osd (from the fault callback). 3079 */ 3080 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 3081 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3082 bool found = false; 3083 3084 for (p = rb_first(&osd->o_requests); p; ) { 3085 struct ceph_osd_request *req = 3086 rb_entry(p, struct ceph_osd_request, r_node); 3087 3088 p = rb_next(p); /* abort_request() */ 3089 3090 if (time_before(req->r_stamp, cutoff)) { 3091 dout(" req %p tid %llu on osd%d is laggy\n", 3092 req, req->r_tid, osd->o_osd); 3093 found = true; 3094 } 3095 if (opts->osd_request_timeout && 3096 time_before(req->r_start_stamp, expiry_cutoff)) { 3097 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3098 req->r_tid, osd->o_osd); 3099 abort_request(req, -ETIMEDOUT); 3100 } 3101 } 3102 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) { 3103 struct ceph_osd_linger_request *lreq = 3104 rb_entry(p, struct ceph_osd_linger_request, node); 3105 3106 dout(" lreq %p linger_id %llu is served by osd%d\n", 3107 lreq, lreq->linger_id, osd->o_osd); 3108 found = true; 3109 3110 mutex_lock(&lreq->lock); 3111 if (lreq->is_watch && lreq->committed && !lreq->last_error) 3112 send_linger_ping(lreq); 3113 mutex_unlock(&lreq->lock); 3114 } 3115 3116 if (found) 3117 list_move_tail(&osd->o_keepalive_item, &slow_osds); 3118 } 3119 3120 if (opts->osd_request_timeout) { 3121 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { 3122 struct ceph_osd_request *req = 3123 rb_entry(p, struct ceph_osd_request, r_node); 3124 3125 p = rb_next(p); /* abort_request() */ 3126 3127 if (time_before(req->r_start_stamp, expiry_cutoff)) { 3128 pr_err_ratelimited("tid %llu on osd%d timeout\n", 3129 req->r_tid, osdc->homeless_osd.o_osd); 3130 abort_request(req, -ETIMEDOUT); 3131 } 3132 } 3133 } 3134 3135 if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds)) 3136 maybe_request_map(osdc); 3137 3138 while (!list_empty(&slow_osds)) { 3139 struct ceph_osd *osd = list_first_entry(&slow_osds, 3140 struct ceph_osd, 3141 o_keepalive_item); 3142 list_del_init(&osd->o_keepalive_item); 3143 ceph_con_keepalive(&osd->o_con); 3144 } 3145 3146 up_write(&osdc->lock); 3147 schedule_delayed_work(&osdc->timeout_work, 3148 osdc->client->options->osd_keepalive_timeout); 3149 } 3150 3151 static void handle_osds_timeout(struct work_struct *work) 3152 { 3153 struct ceph_osd_client *osdc = 3154 container_of(work, struct ceph_osd_client, 3155 osds_timeout_work.work); 3156 unsigned long delay = osdc->client->options->osd_idle_ttl / 4; 3157 struct ceph_osd *osd, *nosd; 3158 3159 dout("%s osdc %p\n", __func__, osdc); 3160 down_write(&osdc->lock); 3161 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) { 3162 if (time_before(jiffies, osd->lru_ttl)) 3163 break; 3164 3165 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests)); 3166 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests)); 3167 close_osd(osd); 3168 } 3169 3170 up_write(&osdc->lock); 3171 schedule_delayed_work(&osdc->osds_timeout_work, 3172 round_jiffies_relative(delay)); 3173 } 3174 3175 static int ceph_oloc_decode(void **p, void *end, 3176 struct ceph_object_locator *oloc) 3177 { 3178 u8 struct_v, struct_cv; 3179 u32 len; 3180 void *struct_end; 3181 int ret = 0; 3182 3183 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3184 struct_v = ceph_decode_8(p); 3185 struct_cv = ceph_decode_8(p); 3186 if (struct_v < 3) { 3187 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n", 3188 struct_v, struct_cv); 3189 goto e_inval; 3190 } 3191 if (struct_cv > 6) { 3192 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n", 3193 struct_v, struct_cv); 3194 goto e_inval; 3195 } 3196 len = ceph_decode_32(p); 3197 ceph_decode_need(p, end, len, e_inval); 3198 struct_end = *p + len; 3199 3200 oloc->pool = ceph_decode_64(p); 3201 *p += 4; /* skip preferred */ 3202 3203 len = ceph_decode_32(p); 3204 if (len > 0) { 3205 pr_warn("ceph_object_locator::key is set\n"); 3206 goto e_inval; 3207 } 3208 3209 if (struct_v >= 5) { 3210 bool changed = false; 3211 3212 len = ceph_decode_32(p); 3213 if (len > 0) { 3214 ceph_decode_need(p, end, len, e_inval); 3215 if (!oloc->pool_ns || 3216 ceph_compare_string(oloc->pool_ns, *p, len)) 3217 changed = true; 3218 *p += len; 3219 } else { 3220 if (oloc->pool_ns) 3221 changed = true; 3222 } 3223 if (changed) { 3224 /* redirect changes namespace */ 3225 pr_warn("ceph_object_locator::nspace is changed\n"); 3226 goto e_inval; 3227 } 3228 } 3229 3230 if (struct_v >= 6) { 3231 s64 hash = ceph_decode_64(p); 3232 if (hash != -1) { 3233 pr_warn("ceph_object_locator::hash is set\n"); 3234 goto e_inval; 3235 } 3236 } 3237 3238 /* skip the rest */ 3239 *p = struct_end; 3240 out: 3241 return ret; 3242 3243 e_inval: 3244 ret = -EINVAL; 3245 goto out; 3246 } 3247 3248 static int ceph_redirect_decode(void **p, void *end, 3249 struct ceph_request_redirect *redir) 3250 { 3251 u8 struct_v, struct_cv; 3252 u32 len; 3253 void *struct_end; 3254 int ret; 3255 3256 ceph_decode_need(p, end, 1 + 1 + 4, e_inval); 3257 struct_v = ceph_decode_8(p); 3258 struct_cv = ceph_decode_8(p); 3259 if (struct_cv > 1) { 3260 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n", 3261 struct_v, struct_cv); 3262 goto e_inval; 3263 } 3264 len = ceph_decode_32(p); 3265 ceph_decode_need(p, end, len, e_inval); 3266 struct_end = *p + len; 3267 3268 ret = ceph_oloc_decode(p, end, &redir->oloc); 3269 if (ret) 3270 goto out; 3271 3272 len = ceph_decode_32(p); 3273 if (len > 0) { 3274 pr_warn("ceph_request_redirect::object_name is set\n"); 3275 goto e_inval; 3276 } 3277 3278 len = ceph_decode_32(p); 3279 *p += len; /* skip osd_instructions */ 3280 3281 /* skip the rest */ 3282 *p = struct_end; 3283 out: 3284 return ret; 3285 3286 e_inval: 3287 ret = -EINVAL; 3288 goto out; 3289 } 3290 3291 struct MOSDOpReply { 3292 struct ceph_pg pgid; 3293 u64 flags; 3294 int result; 3295 u32 epoch; 3296 int num_ops; 3297 u32 outdata_len[CEPH_OSD_MAX_OPS]; 3298 s32 rval[CEPH_OSD_MAX_OPS]; 3299 int retry_attempt; 3300 struct ceph_eversion replay_version; 3301 u64 user_version; 3302 struct ceph_request_redirect redirect; 3303 }; 3304 3305 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m) 3306 { 3307 void *p = msg->front.iov_base; 3308 void *const end = p + msg->front.iov_len; 3309 u16 version = le16_to_cpu(msg->hdr.version); 3310 struct ceph_eversion bad_replay_version; 3311 u8 decode_redir; 3312 u32 len; 3313 int ret; 3314 int i; 3315 3316 ceph_decode_32_safe(&p, end, len, e_inval); 3317 ceph_decode_need(&p, end, len, e_inval); 3318 p += len; /* skip oid */ 3319 3320 ret = ceph_decode_pgid(&p, end, &m->pgid); 3321 if (ret) 3322 return ret; 3323 3324 ceph_decode_64_safe(&p, end, m->flags, e_inval); 3325 ceph_decode_32_safe(&p, end, m->result, e_inval); 3326 ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval); 3327 memcpy(&bad_replay_version, p, sizeof(bad_replay_version)); 3328 p += sizeof(bad_replay_version); 3329 ceph_decode_32_safe(&p, end, m->epoch, e_inval); 3330 3331 ceph_decode_32_safe(&p, end, m->num_ops, e_inval); 3332 if (m->num_ops > ARRAY_SIZE(m->outdata_len)) 3333 goto e_inval; 3334 3335 ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op), 3336 e_inval); 3337 for (i = 0; i < m->num_ops; i++) { 3338 struct ceph_osd_op *op = p; 3339 3340 m->outdata_len[i] = le32_to_cpu(op->payload_len); 3341 p += sizeof(*op); 3342 } 3343 3344 ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval); 3345 for (i = 0; i < m->num_ops; i++) 3346 ceph_decode_32_safe(&p, end, m->rval[i], e_inval); 3347 3348 if (version >= 5) { 3349 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval); 3350 memcpy(&m->replay_version, p, sizeof(m->replay_version)); 3351 p += sizeof(m->replay_version); 3352 ceph_decode_64_safe(&p, end, m->user_version, e_inval); 3353 } else { 3354 m->replay_version = bad_replay_version; /* struct */ 3355 m->user_version = le64_to_cpu(m->replay_version.version); 3356 } 3357 3358 if (version >= 6) { 3359 if (version >= 7) 3360 ceph_decode_8_safe(&p, end, decode_redir, e_inval); 3361 else 3362 decode_redir = 1; 3363 } else { 3364 decode_redir = 0; 3365 } 3366 3367 if (decode_redir) { 3368 ret = ceph_redirect_decode(&p, end, &m->redirect); 3369 if (ret) 3370 return ret; 3371 } else { 3372 ceph_oloc_init(&m->redirect.oloc); 3373 } 3374 3375 return 0; 3376 3377 e_inval: 3378 return -EINVAL; 3379 } 3380 3381 /* 3382 * Handle MOSDOpReply. Set ->r_result and call the callback if it is 3383 * specified. 3384 */ 3385 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) 3386 { 3387 struct ceph_osd_client *osdc = osd->o_osdc; 3388 struct ceph_osd_request *req; 3389 struct MOSDOpReply m; 3390 u64 tid = le64_to_cpu(msg->hdr.tid); 3391 u32 data_len = 0; 3392 int ret; 3393 int i; 3394 3395 dout("%s msg %p tid %llu\n", __func__, msg, tid); 3396 3397 down_read(&osdc->lock); 3398 if (!osd_registered(osd)) { 3399 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3400 goto out_unlock_osdc; 3401 } 3402 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 3403 3404 mutex_lock(&osd->lock); 3405 req = lookup_request(&osd->o_requests, tid); 3406 if (!req) { 3407 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid); 3408 goto out_unlock_session; 3409 } 3410 3411 m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns; 3412 ret = decode_MOSDOpReply(msg, &m); 3413 m.redirect.oloc.pool_ns = NULL; 3414 if (ret) { 3415 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n", 3416 req->r_tid, ret); 3417 ceph_msg_dump(msg); 3418 goto fail_request; 3419 } 3420 dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n", 3421 __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed, 3422 m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch), 3423 le64_to_cpu(m.replay_version.version), m.user_version); 3424 3425 if (m.retry_attempt >= 0) { 3426 if (m.retry_attempt != req->r_attempts - 1) { 3427 dout("req %p tid %llu retry_attempt %d != %d, ignoring\n", 3428 req, req->r_tid, m.retry_attempt, 3429 req->r_attempts - 1); 3430 goto out_unlock_session; 3431 } 3432 } else { 3433 WARN_ON(1); /* MOSDOpReply v4 is assumed */ 3434 } 3435 3436 if (!ceph_oloc_empty(&m.redirect.oloc)) { 3437 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid, 3438 m.redirect.oloc.pool); 3439 unlink_request(osd, req); 3440 mutex_unlock(&osd->lock); 3441 3442 /* 3443 * Not ceph_oloc_copy() - changing pool_ns is not 3444 * supported. 3445 */ 3446 req->r_t.target_oloc.pool = m.redirect.oloc.pool; 3447 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED; 3448 req->r_tid = 0; 3449 __submit_request(req, false); 3450 goto out_unlock_osdc; 3451 } 3452 3453 if (m.num_ops != req->r_num_ops) { 3454 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops, 3455 req->r_num_ops, req->r_tid); 3456 goto fail_request; 3457 } 3458 for (i = 0; i < req->r_num_ops; i++) { 3459 dout(" req %p tid %llu op %d rval %d len %u\n", req, 3460 req->r_tid, i, m.rval[i], m.outdata_len[i]); 3461 req->r_ops[i].rval = m.rval[i]; 3462 req->r_ops[i].outdata_len = m.outdata_len[i]; 3463 data_len += m.outdata_len[i]; 3464 } 3465 if (data_len != le32_to_cpu(msg->hdr.data_len)) { 3466 pr_err("sum of lens %u != %u for tid %llu\n", data_len, 3467 le32_to_cpu(msg->hdr.data_len), req->r_tid); 3468 goto fail_request; 3469 } 3470 dout("%s req %p tid %llu result %d data_len %u\n", __func__, 3471 req, req->r_tid, m.result, data_len); 3472 3473 /* 3474 * Since we only ever request ONDISK, we should only ever get 3475 * one (type of) reply back. 3476 */ 3477 WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); 3478 req->r_result = m.result ?: data_len; 3479 finish_request(req); 3480 mutex_unlock(&osd->lock); 3481 up_read(&osdc->lock); 3482 3483 __complete_request(req); 3484 complete_all(&req->r_completion); 3485 ceph_osdc_put_request(req); 3486 return; 3487 3488 fail_request: 3489 complete_request(req, -EIO); 3490 out_unlock_session: 3491 mutex_unlock(&osd->lock); 3492 out_unlock_osdc: 3493 up_read(&osdc->lock); 3494 } 3495 3496 static void set_pool_was_full(struct ceph_osd_client *osdc) 3497 { 3498 struct rb_node *n; 3499 3500 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) { 3501 struct ceph_pg_pool_info *pi = 3502 rb_entry(n, struct ceph_pg_pool_info, node); 3503 3504 pi->was_full = __pool_full(pi); 3505 } 3506 } 3507 3508 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id) 3509 { 3510 struct ceph_pg_pool_info *pi; 3511 3512 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id); 3513 if (!pi) 3514 return false; 3515 3516 return pi->was_full && !__pool_full(pi); 3517 } 3518 3519 static enum calc_target_result 3520 recalc_linger_target(struct ceph_osd_linger_request *lreq) 3521 { 3522 struct ceph_osd_client *osdc = lreq->osdc; 3523 enum calc_target_result ct_res; 3524 3525 ct_res = calc_target(osdc, &lreq->t, NULL, true); 3526 if (ct_res == CALC_TARGET_NEED_RESEND) { 3527 struct ceph_osd *osd; 3528 3529 osd = lookup_create_osd(osdc, lreq->t.osd, true); 3530 if (osd != lreq->osd) { 3531 unlink_linger(lreq->osd, lreq); 3532 link_linger(osd, lreq); 3533 } 3534 } 3535 3536 return ct_res; 3537 } 3538 3539 /* 3540 * Requeue requests whose mapping to an OSD has changed. 3541 */ 3542 static void scan_requests(struct ceph_osd *osd, 3543 bool force_resend, 3544 bool cleared_full, 3545 bool check_pool_cleared_full, 3546 struct rb_root *need_resend, 3547 struct list_head *need_resend_linger) 3548 { 3549 struct ceph_osd_client *osdc = osd->o_osdc; 3550 struct rb_node *n; 3551 bool force_resend_writes; 3552 3553 for (n = rb_first(&osd->o_linger_requests); n; ) { 3554 struct ceph_osd_linger_request *lreq = 3555 rb_entry(n, struct ceph_osd_linger_request, node); 3556 enum calc_target_result ct_res; 3557 3558 n = rb_next(n); /* recalc_linger_target() */ 3559 3560 dout("%s lreq %p linger_id %llu\n", __func__, lreq, 3561 lreq->linger_id); 3562 ct_res = recalc_linger_target(lreq); 3563 switch (ct_res) { 3564 case CALC_TARGET_NO_ACTION: 3565 force_resend_writes = cleared_full || 3566 (check_pool_cleared_full && 3567 pool_cleared_full(osdc, lreq->t.base_oloc.pool)); 3568 if (!force_resend && !force_resend_writes) 3569 break; 3570 3571 /* fall through */ 3572 case CALC_TARGET_NEED_RESEND: 3573 cancel_linger_map_check(lreq); 3574 /* 3575 * scan_requests() for the previous epoch(s) 3576 * may have already added it to the list, since 3577 * it's not unlinked here. 3578 */ 3579 if (list_empty(&lreq->scan_item)) 3580 list_add_tail(&lreq->scan_item, need_resend_linger); 3581 break; 3582 case CALC_TARGET_POOL_DNE: 3583 list_del_init(&lreq->scan_item); 3584 check_linger_pool_dne(lreq); 3585 break; 3586 } 3587 } 3588 3589 for (n = rb_first(&osd->o_requests); n; ) { 3590 struct ceph_osd_request *req = 3591 rb_entry(n, struct ceph_osd_request, r_node); 3592 enum calc_target_result ct_res; 3593 3594 n = rb_next(n); /* unlink_request(), check_pool_dne() */ 3595 3596 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 3597 ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con, 3598 false); 3599 switch (ct_res) { 3600 case CALC_TARGET_NO_ACTION: 3601 force_resend_writes = cleared_full || 3602 (check_pool_cleared_full && 3603 pool_cleared_full(osdc, req->r_t.base_oloc.pool)); 3604 if (!force_resend && 3605 (!(req->r_flags & CEPH_OSD_FLAG_WRITE) || 3606 !force_resend_writes)) 3607 break; 3608 3609 /* fall through */ 3610 case CALC_TARGET_NEED_RESEND: 3611 cancel_map_check(req); 3612 unlink_request(osd, req); 3613 insert_request(need_resend, req); 3614 break; 3615 case CALC_TARGET_POOL_DNE: 3616 check_pool_dne(req); 3617 break; 3618 } 3619 } 3620 } 3621 3622 static int handle_one_map(struct ceph_osd_client *osdc, 3623 void *p, void *end, bool incremental, 3624 struct rb_root *need_resend, 3625 struct list_head *need_resend_linger) 3626 { 3627 struct ceph_osdmap *newmap; 3628 struct rb_node *n; 3629 bool skipped_map = false; 3630 bool was_full; 3631 3632 was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3633 set_pool_was_full(osdc); 3634 3635 if (incremental) 3636 newmap = osdmap_apply_incremental(&p, end, osdc->osdmap); 3637 else 3638 newmap = ceph_osdmap_decode(&p, end); 3639 if (IS_ERR(newmap)) 3640 return PTR_ERR(newmap); 3641 3642 if (newmap != osdc->osdmap) { 3643 /* 3644 * Preserve ->was_full before destroying the old map. 3645 * For pools that weren't in the old map, ->was_full 3646 * should be false. 3647 */ 3648 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) { 3649 struct ceph_pg_pool_info *pi = 3650 rb_entry(n, struct ceph_pg_pool_info, node); 3651 struct ceph_pg_pool_info *old_pi; 3652 3653 old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id); 3654 if (old_pi) 3655 pi->was_full = old_pi->was_full; 3656 else 3657 WARN_ON(pi->was_full); 3658 } 3659 3660 if (osdc->osdmap->epoch && 3661 osdc->osdmap->epoch + 1 < newmap->epoch) { 3662 WARN_ON(incremental); 3663 skipped_map = true; 3664 } 3665 3666 ceph_osdmap_destroy(osdc->osdmap); 3667 osdc->osdmap = newmap; 3668 } 3669 3670 was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL); 3671 scan_requests(&osdc->homeless_osd, skipped_map, was_full, true, 3672 need_resend, need_resend_linger); 3673 3674 for (n = rb_first(&osdc->osds); n; ) { 3675 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 3676 3677 n = rb_next(n); /* close_osd() */ 3678 3679 scan_requests(osd, skipped_map, was_full, true, need_resend, 3680 need_resend_linger); 3681 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) || 3682 memcmp(&osd->o_con.peer_addr, 3683 ceph_osd_addr(osdc->osdmap, osd->o_osd), 3684 sizeof(struct ceph_entity_addr))) 3685 close_osd(osd); 3686 } 3687 3688 return 0; 3689 } 3690 3691 static void kick_requests(struct ceph_osd_client *osdc, 3692 struct rb_root *need_resend, 3693 struct list_head *need_resend_linger) 3694 { 3695 struct ceph_osd_linger_request *lreq, *nlreq; 3696 enum calc_target_result ct_res; 3697 struct rb_node *n; 3698 3699 /* make sure need_resend targets reflect latest map */ 3700 for (n = rb_first(need_resend); n; ) { 3701 struct ceph_osd_request *req = 3702 rb_entry(n, struct ceph_osd_request, r_node); 3703 3704 n = rb_next(n); 3705 3706 if (req->r_t.epoch < osdc->osdmap->epoch) { 3707 ct_res = calc_target(osdc, &req->r_t, NULL, false); 3708 if (ct_res == CALC_TARGET_POOL_DNE) { 3709 erase_request(need_resend, req); 3710 check_pool_dne(req); 3711 } 3712 } 3713 } 3714 3715 for (n = rb_first(need_resend); n; ) { 3716 struct ceph_osd_request *req = 3717 rb_entry(n, struct ceph_osd_request, r_node); 3718 struct ceph_osd *osd; 3719 3720 n = rb_next(n); 3721 erase_request(need_resend, req); /* before link_request() */ 3722 3723 osd = lookup_create_osd(osdc, req->r_t.osd, true); 3724 link_request(osd, req); 3725 if (!req->r_linger) { 3726 if (!osd_homeless(osd) && !req->r_t.paused) 3727 send_request(req); 3728 } else { 3729 cancel_linger_request(req); 3730 } 3731 } 3732 3733 list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) { 3734 if (!osd_homeless(lreq->osd)) 3735 send_linger(lreq); 3736 3737 list_del_init(&lreq->scan_item); 3738 } 3739 } 3740 3741 /* 3742 * Process updated osd map. 3743 * 3744 * The message contains any number of incremental and full maps, normally 3745 * indicating some sort of topology change in the cluster. Kick requests 3746 * off to different OSDs as needed. 3747 */ 3748 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg) 3749 { 3750 void *p = msg->front.iov_base; 3751 void *const end = p + msg->front.iov_len; 3752 u32 nr_maps, maplen; 3753 u32 epoch; 3754 struct ceph_fsid fsid; 3755 struct rb_root need_resend = RB_ROOT; 3756 LIST_HEAD(need_resend_linger); 3757 bool handled_incremental = false; 3758 bool was_pauserd, was_pausewr; 3759 bool pauserd, pausewr; 3760 int err; 3761 3762 dout("%s have %u\n", __func__, osdc->osdmap->epoch); 3763 down_write(&osdc->lock); 3764 3765 /* verify fsid */ 3766 ceph_decode_need(&p, end, sizeof(fsid), bad); 3767 ceph_decode_copy(&p, &fsid, sizeof(fsid)); 3768 if (ceph_check_fsid(osdc->client, &fsid) < 0) 3769 goto bad; 3770 3771 was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3772 was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3773 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3774 have_pool_full(osdc); 3775 3776 /* incremental maps */ 3777 ceph_decode_32_safe(&p, end, nr_maps, bad); 3778 dout(" %d inc maps\n", nr_maps); 3779 while (nr_maps > 0) { 3780 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3781 epoch = ceph_decode_32(&p); 3782 maplen = ceph_decode_32(&p); 3783 ceph_decode_need(&p, end, maplen, bad); 3784 if (osdc->osdmap->epoch && 3785 osdc->osdmap->epoch + 1 == epoch) { 3786 dout("applying incremental map %u len %d\n", 3787 epoch, maplen); 3788 err = handle_one_map(osdc, p, p + maplen, true, 3789 &need_resend, &need_resend_linger); 3790 if (err) 3791 goto bad; 3792 handled_incremental = true; 3793 } else { 3794 dout("ignoring incremental map %u len %d\n", 3795 epoch, maplen); 3796 } 3797 p += maplen; 3798 nr_maps--; 3799 } 3800 if (handled_incremental) 3801 goto done; 3802 3803 /* full maps */ 3804 ceph_decode_32_safe(&p, end, nr_maps, bad); 3805 dout(" %d full maps\n", nr_maps); 3806 while (nr_maps) { 3807 ceph_decode_need(&p, end, 2*sizeof(u32), bad); 3808 epoch = ceph_decode_32(&p); 3809 maplen = ceph_decode_32(&p); 3810 ceph_decode_need(&p, end, maplen, bad); 3811 if (nr_maps > 1) { 3812 dout("skipping non-latest full map %u len %d\n", 3813 epoch, maplen); 3814 } else if (osdc->osdmap->epoch >= epoch) { 3815 dout("skipping full map %u len %d, " 3816 "older than our %u\n", epoch, maplen, 3817 osdc->osdmap->epoch); 3818 } else { 3819 dout("taking full map %u len %d\n", epoch, maplen); 3820 err = handle_one_map(osdc, p, p + maplen, false, 3821 &need_resend, &need_resend_linger); 3822 if (err) 3823 goto bad; 3824 } 3825 p += maplen; 3826 nr_maps--; 3827 } 3828 3829 done: 3830 /* 3831 * subscribe to subsequent osdmap updates if full to ensure 3832 * we find out when we are no longer full and stop returning 3833 * ENOSPC. 3834 */ 3835 pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD); 3836 pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || 3837 ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || 3838 have_pool_full(osdc); 3839 if (was_pauserd || was_pausewr || pauserd || pausewr || 3840 osdc->osdmap->epoch < osdc->epoch_barrier) 3841 maybe_request_map(osdc); 3842 3843 kick_requests(osdc, &need_resend, &need_resend_linger); 3844 3845 ceph_osdc_abort_on_full(osdc); 3846 ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP, 3847 osdc->osdmap->epoch); 3848 up_write(&osdc->lock); 3849 wake_up_all(&osdc->client->auth_wq); 3850 return; 3851 3852 bad: 3853 pr_err("osdc handle_map corrupt msg\n"); 3854 ceph_msg_dump(msg); 3855 up_write(&osdc->lock); 3856 } 3857 3858 /* 3859 * Resubmit requests pending on the given osd. 3860 */ 3861 static void kick_osd_requests(struct ceph_osd *osd) 3862 { 3863 struct rb_node *n; 3864 3865 clear_backoffs(osd); 3866 3867 for (n = rb_first(&osd->o_requests); n; ) { 3868 struct ceph_osd_request *req = 3869 rb_entry(n, struct ceph_osd_request, r_node); 3870 3871 n = rb_next(n); /* cancel_linger_request() */ 3872 3873 if (!req->r_linger) { 3874 if (!req->r_t.paused) 3875 send_request(req); 3876 } else { 3877 cancel_linger_request(req); 3878 } 3879 } 3880 for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) { 3881 struct ceph_osd_linger_request *lreq = 3882 rb_entry(n, struct ceph_osd_linger_request, node); 3883 3884 send_linger(lreq); 3885 } 3886 } 3887 3888 /* 3889 * If the osd connection drops, we need to resubmit all requests. 3890 */ 3891 static void osd_fault(struct ceph_connection *con) 3892 { 3893 struct ceph_osd *osd = con->private; 3894 struct ceph_osd_client *osdc = osd->o_osdc; 3895 3896 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd); 3897 3898 down_write(&osdc->lock); 3899 if (!osd_registered(osd)) { 3900 dout("%s osd%d unknown\n", __func__, osd->o_osd); 3901 goto out_unlock; 3902 } 3903 3904 if (!reopen_osd(osd)) 3905 kick_osd_requests(osd); 3906 maybe_request_map(osdc); 3907 3908 out_unlock: 3909 up_write(&osdc->lock); 3910 } 3911 3912 struct MOSDBackoff { 3913 struct ceph_spg spgid; 3914 u32 map_epoch; 3915 u8 op; 3916 u64 id; 3917 struct ceph_hobject_id *begin; 3918 struct ceph_hobject_id *end; 3919 }; 3920 3921 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m) 3922 { 3923 void *p = msg->front.iov_base; 3924 void *const end = p + msg->front.iov_len; 3925 u8 struct_v; 3926 u32 struct_len; 3927 int ret; 3928 3929 ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len); 3930 if (ret) 3931 return ret; 3932 3933 ret = ceph_decode_pgid(&p, end, &m->spgid.pgid); 3934 if (ret) 3935 return ret; 3936 3937 ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval); 3938 ceph_decode_32_safe(&p, end, m->map_epoch, e_inval); 3939 ceph_decode_8_safe(&p, end, m->op, e_inval); 3940 ceph_decode_64_safe(&p, end, m->id, e_inval); 3941 3942 m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO); 3943 if (!m->begin) 3944 return -ENOMEM; 3945 3946 ret = decode_hoid(&p, end, m->begin); 3947 if (ret) { 3948 free_hoid(m->begin); 3949 return ret; 3950 } 3951 3952 m->end = kzalloc(sizeof(*m->end), GFP_NOIO); 3953 if (!m->end) { 3954 free_hoid(m->begin); 3955 return -ENOMEM; 3956 } 3957 3958 ret = decode_hoid(&p, end, m->end); 3959 if (ret) { 3960 free_hoid(m->begin); 3961 free_hoid(m->end); 3962 return ret; 3963 } 3964 3965 return 0; 3966 3967 e_inval: 3968 return -EINVAL; 3969 } 3970 3971 static struct ceph_msg *create_backoff_message( 3972 const struct ceph_osd_backoff *backoff, 3973 u32 map_epoch) 3974 { 3975 struct ceph_msg *msg; 3976 void *p, *end; 3977 int msg_size; 3978 3979 msg_size = CEPH_ENCODING_START_BLK_LEN + 3980 CEPH_PGID_ENCODING_LEN + 1; /* spgid */ 3981 msg_size += 4 + 1 + 8; /* map_epoch, op, id */ 3982 msg_size += CEPH_ENCODING_START_BLK_LEN + 3983 hoid_encoding_size(backoff->begin); 3984 msg_size += CEPH_ENCODING_START_BLK_LEN + 3985 hoid_encoding_size(backoff->end); 3986 3987 msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true); 3988 if (!msg) 3989 return NULL; 3990 3991 p = msg->front.iov_base; 3992 end = p + msg->front_alloc_len; 3993 3994 encode_spgid(&p, &backoff->spgid); 3995 ceph_encode_32(&p, map_epoch); 3996 ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK); 3997 ceph_encode_64(&p, backoff->id); 3998 encode_hoid(&p, end, backoff->begin); 3999 encode_hoid(&p, end, backoff->end); 4000 BUG_ON(p != end); 4001 4002 msg->front.iov_len = p - msg->front.iov_base; 4003 msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */ 4004 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); 4005 4006 return msg; 4007 } 4008 4009 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m) 4010 { 4011 struct ceph_spg_mapping *spg; 4012 struct ceph_osd_backoff *backoff; 4013 struct ceph_msg *msg; 4014 4015 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4016 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4017 4018 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid); 4019 if (!spg) { 4020 spg = alloc_spg_mapping(); 4021 if (!spg) { 4022 pr_err("%s failed to allocate spg\n", __func__); 4023 return; 4024 } 4025 spg->spgid = m->spgid; /* struct */ 4026 insert_spg_mapping(&osd->o_backoff_mappings, spg); 4027 } 4028 4029 backoff = alloc_backoff(); 4030 if (!backoff) { 4031 pr_err("%s failed to allocate backoff\n", __func__); 4032 return; 4033 } 4034 backoff->spgid = m->spgid; /* struct */ 4035 backoff->id = m->id; 4036 backoff->begin = m->begin; 4037 m->begin = NULL; /* backoff now owns this */ 4038 backoff->end = m->end; 4039 m->end = NULL; /* ditto */ 4040 4041 insert_backoff(&spg->backoffs, backoff); 4042 insert_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4043 4044 /* 4045 * Ack with original backoff's epoch so that the OSD can 4046 * discard this if there was a PG split. 4047 */ 4048 msg = create_backoff_message(backoff, m->map_epoch); 4049 if (!msg) { 4050 pr_err("%s failed to allocate msg\n", __func__); 4051 return; 4052 } 4053 ceph_con_send(&osd->o_con, msg); 4054 } 4055 4056 static bool target_contained_by(const struct ceph_osd_request_target *t, 4057 const struct ceph_hobject_id *begin, 4058 const struct ceph_hobject_id *end) 4059 { 4060 struct ceph_hobject_id hoid; 4061 int cmp; 4062 4063 hoid_fill_from_target(&hoid, t); 4064 cmp = hoid_compare(&hoid, begin); 4065 return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0); 4066 } 4067 4068 static void handle_backoff_unblock(struct ceph_osd *osd, 4069 const struct MOSDBackoff *m) 4070 { 4071 struct ceph_spg_mapping *spg; 4072 struct ceph_osd_backoff *backoff; 4073 struct rb_node *n; 4074 4075 dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd, 4076 m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id); 4077 4078 backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id); 4079 if (!backoff) { 4080 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n", 4081 __func__, osd->o_osd, m->spgid.pgid.pool, 4082 m->spgid.pgid.seed, m->spgid.shard, m->id); 4083 return; 4084 } 4085 4086 if (hoid_compare(backoff->begin, m->begin) && 4087 hoid_compare(backoff->end, m->end)) { 4088 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n", 4089 __func__, osd->o_osd, m->spgid.pgid.pool, 4090 m->spgid.pgid.seed, m->spgid.shard, m->id); 4091 /* unblock it anyway... */ 4092 } 4093 4094 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid); 4095 BUG_ON(!spg); 4096 4097 erase_backoff(&spg->backoffs, backoff); 4098 erase_backoff_by_id(&osd->o_backoffs_by_id, backoff); 4099 free_backoff(backoff); 4100 4101 if (RB_EMPTY_ROOT(&spg->backoffs)) { 4102 erase_spg_mapping(&osd->o_backoff_mappings, spg); 4103 free_spg_mapping(spg); 4104 } 4105 4106 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) { 4107 struct ceph_osd_request *req = 4108 rb_entry(n, struct ceph_osd_request, r_node); 4109 4110 if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) { 4111 /* 4112 * Match against @m, not @backoff -- the PG may 4113 * have split on the OSD. 4114 */ 4115 if (target_contained_by(&req->r_t, m->begin, m->end)) { 4116 /* 4117 * If no other installed backoff applies, 4118 * resend. 4119 */ 4120 send_request(req); 4121 } 4122 } 4123 } 4124 } 4125 4126 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg) 4127 { 4128 struct ceph_osd_client *osdc = osd->o_osdc; 4129 struct MOSDBackoff m; 4130 int ret; 4131 4132 down_read(&osdc->lock); 4133 if (!osd_registered(osd)) { 4134 dout("%s osd%d unknown\n", __func__, osd->o_osd); 4135 up_read(&osdc->lock); 4136 return; 4137 } 4138 WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num)); 4139 4140 mutex_lock(&osd->lock); 4141 ret = decode_MOSDBackoff(msg, &m); 4142 if (ret) { 4143 pr_err("failed to decode MOSDBackoff: %d\n", ret); 4144 ceph_msg_dump(msg); 4145 goto out_unlock; 4146 } 4147 4148 switch (m.op) { 4149 case CEPH_OSD_BACKOFF_OP_BLOCK: 4150 handle_backoff_block(osd, &m); 4151 break; 4152 case CEPH_OSD_BACKOFF_OP_UNBLOCK: 4153 handle_backoff_unblock(osd, &m); 4154 break; 4155 default: 4156 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op); 4157 } 4158 4159 free_hoid(m.begin); 4160 free_hoid(m.end); 4161 4162 out_unlock: 4163 mutex_unlock(&osd->lock); 4164 up_read(&osdc->lock); 4165 } 4166 4167 /* 4168 * Process osd watch notifications 4169 */ 4170 static void handle_watch_notify(struct ceph_osd_client *osdc, 4171 struct ceph_msg *msg) 4172 { 4173 void *p = msg->front.iov_base; 4174 void *const end = p + msg->front.iov_len; 4175 struct ceph_osd_linger_request *lreq; 4176 struct linger_work *lwork; 4177 u8 proto_ver, opcode; 4178 u64 cookie, notify_id; 4179 u64 notifier_id = 0; 4180 s32 return_code = 0; 4181 void *payload = NULL; 4182 u32 payload_len = 0; 4183 4184 ceph_decode_8_safe(&p, end, proto_ver, bad); 4185 ceph_decode_8_safe(&p, end, opcode, bad); 4186 ceph_decode_64_safe(&p, end, cookie, bad); 4187 p += 8; /* skip ver */ 4188 ceph_decode_64_safe(&p, end, notify_id, bad); 4189 4190 if (proto_ver >= 1) { 4191 ceph_decode_32_safe(&p, end, payload_len, bad); 4192 ceph_decode_need(&p, end, payload_len, bad); 4193 payload = p; 4194 p += payload_len; 4195 } 4196 4197 if (le16_to_cpu(msg->hdr.version) >= 2) 4198 ceph_decode_32_safe(&p, end, return_code, bad); 4199 4200 if (le16_to_cpu(msg->hdr.version) >= 3) 4201 ceph_decode_64_safe(&p, end, notifier_id, bad); 4202 4203 down_read(&osdc->lock); 4204 lreq = lookup_linger_osdc(&osdc->linger_requests, cookie); 4205 if (!lreq) { 4206 dout("%s opcode %d cookie %llu dne\n", __func__, opcode, 4207 cookie); 4208 goto out_unlock_osdc; 4209 } 4210 4211 mutex_lock(&lreq->lock); 4212 dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__, 4213 opcode, cookie, lreq, lreq->is_watch); 4214 if (opcode == CEPH_WATCH_EVENT_DISCONNECT) { 4215 if (!lreq->last_error) { 4216 lreq->last_error = -ENOTCONN; 4217 queue_watch_error(lreq); 4218 } 4219 } else if (!lreq->is_watch) { 4220 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */ 4221 if (lreq->notify_id && lreq->notify_id != notify_id) { 4222 dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq, 4223 lreq->notify_id, notify_id); 4224 } else if (!completion_done(&lreq->notify_finish_wait)) { 4225 struct ceph_msg_data *data = 4226 list_first_entry_or_null(&msg->data, 4227 struct ceph_msg_data, 4228 links); 4229 4230 if (data) { 4231 if (lreq->preply_pages) { 4232 WARN_ON(data->type != 4233 CEPH_MSG_DATA_PAGES); 4234 *lreq->preply_pages = data->pages; 4235 *lreq->preply_len = data->length; 4236 } else { 4237 ceph_release_page_vector(data->pages, 4238 calc_pages_for(0, data->length)); 4239 } 4240 } 4241 lreq->notify_finish_error = return_code; 4242 complete_all(&lreq->notify_finish_wait); 4243 } 4244 } else { 4245 /* CEPH_WATCH_EVENT_NOTIFY */ 4246 lwork = lwork_alloc(lreq, do_watch_notify); 4247 if (!lwork) { 4248 pr_err("failed to allocate notify-lwork\n"); 4249 goto out_unlock_lreq; 4250 } 4251 4252 lwork->notify.notify_id = notify_id; 4253 lwork->notify.notifier_id = notifier_id; 4254 lwork->notify.payload = payload; 4255 lwork->notify.payload_len = payload_len; 4256 lwork->notify.msg = ceph_msg_get(msg); 4257 lwork_queue(lwork); 4258 } 4259 4260 out_unlock_lreq: 4261 mutex_unlock(&lreq->lock); 4262 out_unlock_osdc: 4263 up_read(&osdc->lock); 4264 return; 4265 4266 bad: 4267 pr_err("osdc handle_watch_notify corrupt msg\n"); 4268 } 4269 4270 /* 4271 * Register request, send initial attempt. 4272 */ 4273 int ceph_osdc_start_request(struct ceph_osd_client *osdc, 4274 struct ceph_osd_request *req, 4275 bool nofail) 4276 { 4277 down_read(&osdc->lock); 4278 submit_request(req, false); 4279 up_read(&osdc->lock); 4280 4281 return 0; 4282 } 4283 EXPORT_SYMBOL(ceph_osdc_start_request); 4284 4285 /* 4286 * Unregister a registered request. The request is not completed: 4287 * ->r_result isn't set and __complete_request() isn't called. 4288 */ 4289 void ceph_osdc_cancel_request(struct ceph_osd_request *req) 4290 { 4291 struct ceph_osd_client *osdc = req->r_osdc; 4292 4293 down_write(&osdc->lock); 4294 if (req->r_osd) 4295 cancel_request(req); 4296 up_write(&osdc->lock); 4297 } 4298 EXPORT_SYMBOL(ceph_osdc_cancel_request); 4299 4300 /* 4301 * @timeout: in jiffies, 0 means "wait forever" 4302 */ 4303 static int wait_request_timeout(struct ceph_osd_request *req, 4304 unsigned long timeout) 4305 { 4306 long left; 4307 4308 dout("%s req %p tid %llu\n", __func__, req, req->r_tid); 4309 left = wait_for_completion_killable_timeout(&req->r_completion, 4310 ceph_timeout_jiffies(timeout)); 4311 if (left <= 0) { 4312 left = left ?: -ETIMEDOUT; 4313 ceph_osdc_cancel_request(req); 4314 } else { 4315 left = req->r_result; /* completed */ 4316 } 4317 4318 return left; 4319 } 4320 4321 /* 4322 * wait for a request to complete 4323 */ 4324 int ceph_osdc_wait_request(struct ceph_osd_client *osdc, 4325 struct ceph_osd_request *req) 4326 { 4327 return wait_request_timeout(req, 0); 4328 } 4329 EXPORT_SYMBOL(ceph_osdc_wait_request); 4330 4331 /* 4332 * sync - wait for all in-flight requests to flush. avoid starvation. 4333 */ 4334 void ceph_osdc_sync(struct ceph_osd_client *osdc) 4335 { 4336 struct rb_node *n, *p; 4337 u64 last_tid = atomic64_read(&osdc->last_tid); 4338 4339 again: 4340 down_read(&osdc->lock); 4341 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { 4342 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); 4343 4344 mutex_lock(&osd->lock); 4345 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) { 4346 struct ceph_osd_request *req = 4347 rb_entry(p, struct ceph_osd_request, r_node); 4348 4349 if (req->r_tid > last_tid) 4350 break; 4351 4352 if (!(req->r_flags & CEPH_OSD_FLAG_WRITE)) 4353 continue; 4354 4355 ceph_osdc_get_request(req); 4356 mutex_unlock(&osd->lock); 4357 up_read(&osdc->lock); 4358 dout("%s waiting on req %p tid %llu last_tid %llu\n", 4359 __func__, req, req->r_tid, last_tid); 4360 wait_for_completion(&req->r_completion); 4361 ceph_osdc_put_request(req); 4362 goto again; 4363 } 4364 4365 mutex_unlock(&osd->lock); 4366 } 4367 4368 up_read(&osdc->lock); 4369 dout("%s done last_tid %llu\n", __func__, last_tid); 4370 } 4371 EXPORT_SYMBOL(ceph_osdc_sync); 4372 4373 static struct ceph_osd_request * 4374 alloc_linger_request(struct ceph_osd_linger_request *lreq) 4375 { 4376 struct ceph_osd_request *req; 4377 4378 req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO); 4379 if (!req) 4380 return NULL; 4381 4382 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4383 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4384 4385 if (ceph_osdc_alloc_messages(req, GFP_NOIO)) { 4386 ceph_osdc_put_request(req); 4387 return NULL; 4388 } 4389 4390 return req; 4391 } 4392 4393 /* 4394 * Returns a handle, caller owns a ref. 4395 */ 4396 struct ceph_osd_linger_request * 4397 ceph_osdc_watch(struct ceph_osd_client *osdc, 4398 struct ceph_object_id *oid, 4399 struct ceph_object_locator *oloc, 4400 rados_watchcb2_t wcb, 4401 rados_watcherrcb_t errcb, 4402 void *data) 4403 { 4404 struct ceph_osd_linger_request *lreq; 4405 int ret; 4406 4407 lreq = linger_alloc(osdc); 4408 if (!lreq) 4409 return ERR_PTR(-ENOMEM); 4410 4411 lreq->is_watch = true; 4412 lreq->wcb = wcb; 4413 lreq->errcb = errcb; 4414 lreq->data = data; 4415 lreq->watch_valid_thru = jiffies; 4416 4417 ceph_oid_copy(&lreq->t.base_oid, oid); 4418 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4419 lreq->t.flags = CEPH_OSD_FLAG_WRITE; 4420 ktime_get_real_ts(&lreq->mtime); 4421 4422 lreq->reg_req = alloc_linger_request(lreq); 4423 if (!lreq->reg_req) { 4424 ret = -ENOMEM; 4425 goto err_put_lreq; 4426 } 4427 4428 lreq->ping_req = alloc_linger_request(lreq); 4429 if (!lreq->ping_req) { 4430 ret = -ENOMEM; 4431 goto err_put_lreq; 4432 } 4433 4434 down_write(&osdc->lock); 4435 linger_register(lreq); /* before osd_req_op_* */ 4436 osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id, 4437 CEPH_OSD_WATCH_OP_WATCH); 4438 osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id, 4439 CEPH_OSD_WATCH_OP_PING); 4440 linger_submit(lreq); 4441 up_write(&osdc->lock); 4442 4443 ret = linger_reg_commit_wait(lreq); 4444 if (ret) { 4445 linger_cancel(lreq); 4446 goto err_put_lreq; 4447 } 4448 4449 return lreq; 4450 4451 err_put_lreq: 4452 linger_put(lreq); 4453 return ERR_PTR(ret); 4454 } 4455 EXPORT_SYMBOL(ceph_osdc_watch); 4456 4457 /* 4458 * Releases a ref. 4459 * 4460 * Times out after mount_timeout to preserve rbd unmap behaviour 4461 * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap 4462 * with mount_timeout"). 4463 */ 4464 int ceph_osdc_unwatch(struct ceph_osd_client *osdc, 4465 struct ceph_osd_linger_request *lreq) 4466 { 4467 struct ceph_options *opts = osdc->client->options; 4468 struct ceph_osd_request *req; 4469 int ret; 4470 4471 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4472 if (!req) 4473 return -ENOMEM; 4474 4475 ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); 4476 ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); 4477 req->r_flags = CEPH_OSD_FLAG_WRITE; 4478 ktime_get_real_ts(&req->r_mtime); 4479 osd_req_op_watch_init(req, 0, lreq->linger_id, 4480 CEPH_OSD_WATCH_OP_UNWATCH); 4481 4482 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4483 if (ret) 4484 goto out_put_req; 4485 4486 ceph_osdc_start_request(osdc, req, false); 4487 linger_cancel(lreq); 4488 linger_put(lreq); 4489 ret = wait_request_timeout(req, opts->mount_timeout); 4490 4491 out_put_req: 4492 ceph_osdc_put_request(req); 4493 return ret; 4494 } 4495 EXPORT_SYMBOL(ceph_osdc_unwatch); 4496 4497 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which, 4498 u64 notify_id, u64 cookie, void *payload, 4499 size_t payload_len) 4500 { 4501 struct ceph_osd_req_op *op; 4502 struct ceph_pagelist *pl; 4503 int ret; 4504 4505 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0); 4506 4507 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4508 if (!pl) 4509 return -ENOMEM; 4510 4511 ceph_pagelist_init(pl); 4512 ret = ceph_pagelist_encode_64(pl, notify_id); 4513 ret |= ceph_pagelist_encode_64(pl, cookie); 4514 if (payload) { 4515 ret |= ceph_pagelist_encode_32(pl, payload_len); 4516 ret |= ceph_pagelist_append(pl, payload, payload_len); 4517 } else { 4518 ret |= ceph_pagelist_encode_32(pl, 0); 4519 } 4520 if (ret) { 4521 ceph_pagelist_release(pl); 4522 return -ENOMEM; 4523 } 4524 4525 ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl); 4526 op->indata_len = pl->length; 4527 return 0; 4528 } 4529 4530 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, 4531 struct ceph_object_id *oid, 4532 struct ceph_object_locator *oloc, 4533 u64 notify_id, 4534 u64 cookie, 4535 void *payload, 4536 size_t payload_len) 4537 { 4538 struct ceph_osd_request *req; 4539 int ret; 4540 4541 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4542 if (!req) 4543 return -ENOMEM; 4544 4545 ceph_oid_copy(&req->r_base_oid, oid); 4546 ceph_oloc_copy(&req->r_base_oloc, oloc); 4547 req->r_flags = CEPH_OSD_FLAG_READ; 4548 4549 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4550 if (ret) 4551 goto out_put_req; 4552 4553 ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, 4554 payload_len); 4555 if (ret) 4556 goto out_put_req; 4557 4558 ceph_osdc_start_request(osdc, req, false); 4559 ret = ceph_osdc_wait_request(osdc, req); 4560 4561 out_put_req: 4562 ceph_osdc_put_request(req); 4563 return ret; 4564 } 4565 EXPORT_SYMBOL(ceph_osdc_notify_ack); 4566 4567 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which, 4568 u64 cookie, u32 prot_ver, u32 timeout, 4569 void *payload, size_t payload_len) 4570 { 4571 struct ceph_osd_req_op *op; 4572 struct ceph_pagelist *pl; 4573 int ret; 4574 4575 op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0); 4576 op->notify.cookie = cookie; 4577 4578 pl = kmalloc(sizeof(*pl), GFP_NOIO); 4579 if (!pl) 4580 return -ENOMEM; 4581 4582 ceph_pagelist_init(pl); 4583 ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */ 4584 ret |= ceph_pagelist_encode_32(pl, timeout); 4585 ret |= ceph_pagelist_encode_32(pl, payload_len); 4586 ret |= ceph_pagelist_append(pl, payload, payload_len); 4587 if (ret) { 4588 ceph_pagelist_release(pl); 4589 return -ENOMEM; 4590 } 4591 4592 ceph_osd_data_pagelist_init(&op->notify.request_data, pl); 4593 op->indata_len = pl->length; 4594 return 0; 4595 } 4596 4597 /* 4598 * @timeout: in seconds 4599 * 4600 * @preply_{pages,len} are initialized both on success and error. 4601 * The caller is responsible for: 4602 * 4603 * ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len)) 4604 */ 4605 int ceph_osdc_notify(struct ceph_osd_client *osdc, 4606 struct ceph_object_id *oid, 4607 struct ceph_object_locator *oloc, 4608 void *payload, 4609 size_t payload_len, 4610 u32 timeout, 4611 struct page ***preply_pages, 4612 size_t *preply_len) 4613 { 4614 struct ceph_osd_linger_request *lreq; 4615 struct page **pages; 4616 int ret; 4617 4618 WARN_ON(!timeout); 4619 if (preply_pages) { 4620 *preply_pages = NULL; 4621 *preply_len = 0; 4622 } 4623 4624 lreq = linger_alloc(osdc); 4625 if (!lreq) 4626 return -ENOMEM; 4627 4628 lreq->preply_pages = preply_pages; 4629 lreq->preply_len = preply_len; 4630 4631 ceph_oid_copy(&lreq->t.base_oid, oid); 4632 ceph_oloc_copy(&lreq->t.base_oloc, oloc); 4633 lreq->t.flags = CEPH_OSD_FLAG_READ; 4634 4635 lreq->reg_req = alloc_linger_request(lreq); 4636 if (!lreq->reg_req) { 4637 ret = -ENOMEM; 4638 goto out_put_lreq; 4639 } 4640 4641 /* for notify_id */ 4642 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4643 if (IS_ERR(pages)) { 4644 ret = PTR_ERR(pages); 4645 goto out_put_lreq; 4646 } 4647 4648 down_write(&osdc->lock); 4649 linger_register(lreq); /* before osd_req_op_* */ 4650 ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1, 4651 timeout, payload, payload_len); 4652 if (ret) { 4653 linger_unregister(lreq); 4654 up_write(&osdc->lock); 4655 ceph_release_page_vector(pages, 1); 4656 goto out_put_lreq; 4657 } 4658 ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify, 4659 response_data), 4660 pages, PAGE_SIZE, 0, false, true); 4661 linger_submit(lreq); 4662 up_write(&osdc->lock); 4663 4664 ret = linger_reg_commit_wait(lreq); 4665 if (!ret) 4666 ret = linger_notify_finish_wait(lreq); 4667 else 4668 dout("lreq %p failed to initiate notify %d\n", lreq, ret); 4669 4670 linger_cancel(lreq); 4671 out_put_lreq: 4672 linger_put(lreq); 4673 return ret; 4674 } 4675 EXPORT_SYMBOL(ceph_osdc_notify); 4676 4677 /* 4678 * Return the number of milliseconds since the watch was last 4679 * confirmed, or an error. If there is an error, the watch is no 4680 * longer valid, and should be destroyed with ceph_osdc_unwatch(). 4681 */ 4682 int ceph_osdc_watch_check(struct ceph_osd_client *osdc, 4683 struct ceph_osd_linger_request *lreq) 4684 { 4685 unsigned long stamp, age; 4686 int ret; 4687 4688 down_read(&osdc->lock); 4689 mutex_lock(&lreq->lock); 4690 stamp = lreq->watch_valid_thru; 4691 if (!list_empty(&lreq->pending_lworks)) { 4692 struct linger_work *lwork = 4693 list_first_entry(&lreq->pending_lworks, 4694 struct linger_work, 4695 pending_item); 4696 4697 if (time_before(lwork->queued_stamp, stamp)) 4698 stamp = lwork->queued_stamp; 4699 } 4700 age = jiffies - stamp; 4701 dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__, 4702 lreq, lreq->linger_id, age, lreq->last_error); 4703 /* we are truncating to msecs, so return a safe upper bound */ 4704 ret = lreq->last_error ?: 1 + jiffies_to_msecs(age); 4705 4706 mutex_unlock(&lreq->lock); 4707 up_read(&osdc->lock); 4708 return ret; 4709 } 4710 4711 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item) 4712 { 4713 u8 struct_v; 4714 u32 struct_len; 4715 int ret; 4716 4717 ret = ceph_start_decoding(p, end, 2, "watch_item_t", 4718 &struct_v, &struct_len); 4719 if (ret) 4720 return ret; 4721 4722 ceph_decode_copy(p, &item->name, sizeof(item->name)); 4723 item->cookie = ceph_decode_64(p); 4724 *p += 4; /* skip timeout_seconds */ 4725 if (struct_v >= 2) { 4726 ceph_decode_copy(p, &item->addr, sizeof(item->addr)); 4727 ceph_decode_addr(&item->addr); 4728 } 4729 4730 dout("%s %s%llu cookie %llu addr %s\n", __func__, 4731 ENTITY_NAME(item->name), item->cookie, 4732 ceph_pr_addr(&item->addr.in_addr)); 4733 return 0; 4734 } 4735 4736 static int decode_watchers(void **p, void *end, 4737 struct ceph_watch_item **watchers, 4738 u32 *num_watchers) 4739 { 4740 u8 struct_v; 4741 u32 struct_len; 4742 int i; 4743 int ret; 4744 4745 ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t", 4746 &struct_v, &struct_len); 4747 if (ret) 4748 return ret; 4749 4750 *num_watchers = ceph_decode_32(p); 4751 *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO); 4752 if (!*watchers) 4753 return -ENOMEM; 4754 4755 for (i = 0; i < *num_watchers; i++) { 4756 ret = decode_watcher(p, end, *watchers + i); 4757 if (ret) { 4758 kfree(*watchers); 4759 return ret; 4760 } 4761 } 4762 4763 return 0; 4764 } 4765 4766 /* 4767 * On success, the caller is responsible for: 4768 * 4769 * kfree(watchers); 4770 */ 4771 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, 4772 struct ceph_object_id *oid, 4773 struct ceph_object_locator *oloc, 4774 struct ceph_watch_item **watchers, 4775 u32 *num_watchers) 4776 { 4777 struct ceph_osd_request *req; 4778 struct page **pages; 4779 int ret; 4780 4781 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4782 if (!req) 4783 return -ENOMEM; 4784 4785 ceph_oid_copy(&req->r_base_oid, oid); 4786 ceph_oloc_copy(&req->r_base_oloc, oloc); 4787 req->r_flags = CEPH_OSD_FLAG_READ; 4788 4789 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4790 if (ret) 4791 goto out_put_req; 4792 4793 pages = ceph_alloc_page_vector(1, GFP_NOIO); 4794 if (IS_ERR(pages)) { 4795 ret = PTR_ERR(pages); 4796 goto out_put_req; 4797 } 4798 4799 osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0); 4800 ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers, 4801 response_data), 4802 pages, PAGE_SIZE, 0, false, true); 4803 4804 ceph_osdc_start_request(osdc, req, false); 4805 ret = ceph_osdc_wait_request(osdc, req); 4806 if (ret >= 0) { 4807 void *p = page_address(pages[0]); 4808 void *const end = p + req->r_ops[0].outdata_len; 4809 4810 ret = decode_watchers(&p, end, watchers, num_watchers); 4811 } 4812 4813 out_put_req: 4814 ceph_osdc_put_request(req); 4815 return ret; 4816 } 4817 EXPORT_SYMBOL(ceph_osdc_list_watchers); 4818 4819 /* 4820 * Call all pending notify callbacks - for use after a watch is 4821 * unregistered, to make sure no more callbacks for it will be invoked 4822 */ 4823 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc) 4824 { 4825 dout("%s osdc %p\n", __func__, osdc); 4826 flush_workqueue(osdc->notify_wq); 4827 } 4828 EXPORT_SYMBOL(ceph_osdc_flush_notifies); 4829 4830 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc) 4831 { 4832 down_read(&osdc->lock); 4833 maybe_request_map(osdc); 4834 up_read(&osdc->lock); 4835 } 4836 EXPORT_SYMBOL(ceph_osdc_maybe_request_map); 4837 4838 /* 4839 * Execute an OSD class method on an object. 4840 * 4841 * @flags: CEPH_OSD_FLAG_* 4842 * @resp_len: in/out param for reply length 4843 */ 4844 int ceph_osdc_call(struct ceph_osd_client *osdc, 4845 struct ceph_object_id *oid, 4846 struct ceph_object_locator *oloc, 4847 const char *class, const char *method, 4848 unsigned int flags, 4849 struct page *req_page, size_t req_len, 4850 struct page *resp_page, size_t *resp_len) 4851 { 4852 struct ceph_osd_request *req; 4853 int ret; 4854 4855 if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE)) 4856 return -E2BIG; 4857 4858 req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); 4859 if (!req) 4860 return -ENOMEM; 4861 4862 ceph_oid_copy(&req->r_base_oid, oid); 4863 ceph_oloc_copy(&req->r_base_oloc, oloc); 4864 req->r_flags = flags; 4865 4866 ret = ceph_osdc_alloc_messages(req, GFP_NOIO); 4867 if (ret) 4868 goto out_put_req; 4869 4870 osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); 4871 if (req_page) 4872 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 4873 0, false, false); 4874 if (resp_page) 4875 osd_req_op_cls_response_data_pages(req, 0, &resp_page, 4876 *resp_len, 0, false, false); 4877 4878 ceph_osdc_start_request(osdc, req, false); 4879 ret = ceph_osdc_wait_request(osdc, req); 4880 if (ret >= 0) { 4881 ret = req->r_ops[0].rval; 4882 if (resp_page) 4883 *resp_len = req->r_ops[0].outdata_len; 4884 } 4885 4886 out_put_req: 4887 ceph_osdc_put_request(req); 4888 return ret; 4889 } 4890 EXPORT_SYMBOL(ceph_osdc_call); 4891 4892 /* 4893 * init, shutdown 4894 */ 4895 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) 4896 { 4897 int err; 4898 4899 dout("init\n"); 4900 osdc->client = client; 4901 init_rwsem(&osdc->lock); 4902 osdc->osds = RB_ROOT; 4903 INIT_LIST_HEAD(&osdc->osd_lru); 4904 spin_lock_init(&osdc->osd_lru_lock); 4905 osd_init(&osdc->homeless_osd); 4906 osdc->homeless_osd.o_osdc = osdc; 4907 osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD; 4908 osdc->last_linger_id = CEPH_LINGER_ID_START; 4909 osdc->linger_requests = RB_ROOT; 4910 osdc->map_checks = RB_ROOT; 4911 osdc->linger_map_checks = RB_ROOT; 4912 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout); 4913 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout); 4914 4915 err = -ENOMEM; 4916 osdc->osdmap = ceph_osdmap_alloc(); 4917 if (!osdc->osdmap) 4918 goto out; 4919 4920 osdc->req_mempool = mempool_create_slab_pool(10, 4921 ceph_osd_request_cache); 4922 if (!osdc->req_mempool) 4923 goto out_map; 4924 4925 err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP, 4926 PAGE_SIZE, 10, true, "osd_op"); 4927 if (err < 0) 4928 goto out_mempool; 4929 err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY, 4930 PAGE_SIZE, 10, true, "osd_op_reply"); 4931 if (err < 0) 4932 goto out_msgpool; 4933 4934 err = -ENOMEM; 4935 osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify"); 4936 if (!osdc->notify_wq) 4937 goto out_msgpool_reply; 4938 4939 schedule_delayed_work(&osdc->timeout_work, 4940 osdc->client->options->osd_keepalive_timeout); 4941 schedule_delayed_work(&osdc->osds_timeout_work, 4942 round_jiffies_relative(osdc->client->options->osd_idle_ttl)); 4943 4944 return 0; 4945 4946 out_msgpool_reply: 4947 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 4948 out_msgpool: 4949 ceph_msgpool_destroy(&osdc->msgpool_op); 4950 out_mempool: 4951 mempool_destroy(osdc->req_mempool); 4952 out_map: 4953 ceph_osdmap_destroy(osdc->osdmap); 4954 out: 4955 return err; 4956 } 4957 4958 void ceph_osdc_stop(struct ceph_osd_client *osdc) 4959 { 4960 flush_workqueue(osdc->notify_wq); 4961 destroy_workqueue(osdc->notify_wq); 4962 cancel_delayed_work_sync(&osdc->timeout_work); 4963 cancel_delayed_work_sync(&osdc->osds_timeout_work); 4964 4965 down_write(&osdc->lock); 4966 while (!RB_EMPTY_ROOT(&osdc->osds)) { 4967 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds), 4968 struct ceph_osd, o_node); 4969 close_osd(osd); 4970 } 4971 up_write(&osdc->lock); 4972 WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1); 4973 osd_cleanup(&osdc->homeless_osd); 4974 4975 WARN_ON(!list_empty(&osdc->osd_lru)); 4976 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests)); 4977 WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks)); 4978 WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks)); 4979 WARN_ON(atomic_read(&osdc->num_requests)); 4980 WARN_ON(atomic_read(&osdc->num_homeless)); 4981 4982 ceph_osdmap_destroy(osdc->osdmap); 4983 mempool_destroy(osdc->req_mempool); 4984 ceph_msgpool_destroy(&osdc->msgpool_op); 4985 ceph_msgpool_destroy(&osdc->msgpool_op_reply); 4986 } 4987 4988 /* 4989 * Read some contiguous pages. If we cross a stripe boundary, shorten 4990 * *plen. Return number of bytes read, or error. 4991 */ 4992 int ceph_osdc_readpages(struct ceph_osd_client *osdc, 4993 struct ceph_vino vino, struct ceph_file_layout *layout, 4994 u64 off, u64 *plen, 4995 u32 truncate_seq, u64 truncate_size, 4996 struct page **pages, int num_pages, int page_align) 4997 { 4998 struct ceph_osd_request *req; 4999 int rc = 0; 5000 5001 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, 5002 vino.snap, off, *plen); 5003 req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, 5004 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, 5005 NULL, truncate_seq, truncate_size, 5006 false); 5007 if (IS_ERR(req)) 5008 return PTR_ERR(req); 5009 5010 /* it may be a short read due to an object boundary */ 5011 osd_req_op_extent_osd_data_pages(req, 0, 5012 pages, *plen, page_align, false, false); 5013 5014 dout("readpages final extent is %llu~%llu (%llu bytes align %d)\n", 5015 off, *plen, *plen, page_align); 5016 5017 rc = ceph_osdc_start_request(osdc, req, false); 5018 if (!rc) 5019 rc = ceph_osdc_wait_request(osdc, req); 5020 5021 ceph_osdc_put_request(req); 5022 dout("readpages result %d\n", rc); 5023 return rc; 5024 } 5025 EXPORT_SYMBOL(ceph_osdc_readpages); 5026 5027 /* 5028 * do a synchronous write on N pages 5029 */ 5030 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, 5031 struct ceph_file_layout *layout, 5032 struct ceph_snap_context *snapc, 5033 u64 off, u64 len, 5034 u32 truncate_seq, u64 truncate_size, 5035 struct timespec *mtime, 5036 struct page **pages, int num_pages) 5037 { 5038 struct ceph_osd_request *req; 5039 int rc = 0; 5040 int page_align = off & ~PAGE_MASK; 5041 5042 req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, 5043 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, 5044 snapc, truncate_seq, truncate_size, 5045 true); 5046 if (IS_ERR(req)) 5047 return PTR_ERR(req); 5048 5049 /* it may be a short write due to an object boundary */ 5050 osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align, 5051 false, false); 5052 dout("writepages %llu~%llu (%llu bytes)\n", off, len, len); 5053 5054 req->r_mtime = *mtime; 5055 rc = ceph_osdc_start_request(osdc, req, true); 5056 if (!rc) 5057 rc = ceph_osdc_wait_request(osdc, req); 5058 5059 ceph_osdc_put_request(req); 5060 if (rc == 0) 5061 rc = len; 5062 dout("writepages result %d\n", rc); 5063 return rc; 5064 } 5065 EXPORT_SYMBOL(ceph_osdc_writepages); 5066 5067 int ceph_osdc_setup(void) 5068 { 5069 size_t size = sizeof(struct ceph_osd_request) + 5070 CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); 5071 5072 BUG_ON(ceph_osd_request_cache); 5073 ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, 5074 0, 0, NULL); 5075 5076 return ceph_osd_request_cache ? 0 : -ENOMEM; 5077 } 5078 EXPORT_SYMBOL(ceph_osdc_setup); 5079 5080 void ceph_osdc_cleanup(void) 5081 { 5082 BUG_ON(!ceph_osd_request_cache); 5083 kmem_cache_destroy(ceph_osd_request_cache); 5084 ceph_osd_request_cache = NULL; 5085 } 5086 EXPORT_SYMBOL(ceph_osdc_cleanup); 5087 5088 /* 5089 * handle incoming message 5090 */ 5091 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) 5092 { 5093 struct ceph_osd *osd = con->private; 5094 struct ceph_osd_client *osdc = osd->o_osdc; 5095 int type = le16_to_cpu(msg->hdr.type); 5096 5097 switch (type) { 5098 case CEPH_MSG_OSD_MAP: 5099 ceph_osdc_handle_map(osdc, msg); 5100 break; 5101 case CEPH_MSG_OSD_OPREPLY: 5102 handle_reply(osd, msg); 5103 break; 5104 case CEPH_MSG_OSD_BACKOFF: 5105 handle_backoff(osd, msg); 5106 break; 5107 case CEPH_MSG_WATCH_NOTIFY: 5108 handle_watch_notify(osdc, msg); 5109 break; 5110 5111 default: 5112 pr_err("received unknown message type %d %s\n", type, 5113 ceph_msg_type_name(type)); 5114 } 5115 5116 ceph_msg_put(msg); 5117 } 5118 5119 /* 5120 * Lookup and return message for incoming reply. Don't try to do 5121 * anything about a larger than preallocated data portion of the 5122 * message at the moment - for now, just skip the message. 5123 */ 5124 static struct ceph_msg *get_reply(struct ceph_connection *con, 5125 struct ceph_msg_header *hdr, 5126 int *skip) 5127 { 5128 struct ceph_osd *osd = con->private; 5129 struct ceph_osd_client *osdc = osd->o_osdc; 5130 struct ceph_msg *m = NULL; 5131 struct ceph_osd_request *req; 5132 int front_len = le32_to_cpu(hdr->front_len); 5133 int data_len = le32_to_cpu(hdr->data_len); 5134 u64 tid = le64_to_cpu(hdr->tid); 5135 5136 down_read(&osdc->lock); 5137 if (!osd_registered(osd)) { 5138 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd); 5139 *skip = 1; 5140 goto out_unlock_osdc; 5141 } 5142 WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num)); 5143 5144 mutex_lock(&osd->lock); 5145 req = lookup_request(&osd->o_requests, tid); 5146 if (!req) { 5147 dout("%s osd%d tid %llu unknown, skipping\n", __func__, 5148 osd->o_osd, tid); 5149 *skip = 1; 5150 goto out_unlock_session; 5151 } 5152 5153 ceph_msg_revoke_incoming(req->r_reply); 5154 5155 if (front_len > req->r_reply->front_alloc_len) { 5156 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", 5157 __func__, osd->o_osd, req->r_tid, front_len, 5158 req->r_reply->front_alloc_len); 5159 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, 5160 false); 5161 if (!m) 5162 goto out_unlock_session; 5163 ceph_msg_put(req->r_reply); 5164 req->r_reply = m; 5165 } 5166 5167 if (data_len > req->r_reply->data_length) { 5168 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", 5169 __func__, osd->o_osd, req->r_tid, data_len, 5170 req->r_reply->data_length); 5171 m = NULL; 5172 *skip = 1; 5173 goto out_unlock_session; 5174 } 5175 5176 m = ceph_msg_get(req->r_reply); 5177 dout("get_reply tid %lld %p\n", tid, m); 5178 5179 out_unlock_session: 5180 mutex_unlock(&osd->lock); 5181 out_unlock_osdc: 5182 up_read(&osdc->lock); 5183 return m; 5184 } 5185 5186 /* 5187 * TODO: switch to a msg-owned pagelist 5188 */ 5189 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr) 5190 { 5191 struct ceph_msg *m; 5192 int type = le16_to_cpu(hdr->type); 5193 u32 front_len = le32_to_cpu(hdr->front_len); 5194 u32 data_len = le32_to_cpu(hdr->data_len); 5195 5196 m = ceph_msg_new(type, front_len, GFP_NOIO, false); 5197 if (!m) 5198 return NULL; 5199 5200 if (data_len) { 5201 struct page **pages; 5202 struct ceph_osd_data osd_data; 5203 5204 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len), 5205 GFP_NOIO); 5206 if (IS_ERR(pages)) { 5207 ceph_msg_put(m); 5208 return NULL; 5209 } 5210 5211 ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false, 5212 false); 5213 ceph_osdc_msg_data_add(m, &osd_data); 5214 } 5215 5216 return m; 5217 } 5218 5219 static struct ceph_msg *alloc_msg(struct ceph_connection *con, 5220 struct ceph_msg_header *hdr, 5221 int *skip) 5222 { 5223 struct ceph_osd *osd = con->private; 5224 int type = le16_to_cpu(hdr->type); 5225 5226 *skip = 0; 5227 switch (type) { 5228 case CEPH_MSG_OSD_MAP: 5229 case CEPH_MSG_OSD_BACKOFF: 5230 case CEPH_MSG_WATCH_NOTIFY: 5231 return alloc_msg_with_page_vector(hdr); 5232 case CEPH_MSG_OSD_OPREPLY: 5233 return get_reply(con, hdr, skip); 5234 default: 5235 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__, 5236 osd->o_osd, type); 5237 *skip = 1; 5238 return NULL; 5239 } 5240 } 5241 5242 /* 5243 * Wrappers to refcount containing ceph_osd struct 5244 */ 5245 static struct ceph_connection *get_osd_con(struct ceph_connection *con) 5246 { 5247 struct ceph_osd *osd = con->private; 5248 if (get_osd(osd)) 5249 return con; 5250 return NULL; 5251 } 5252 5253 static void put_osd_con(struct ceph_connection *con) 5254 { 5255 struct ceph_osd *osd = con->private; 5256 put_osd(osd); 5257 } 5258 5259 /* 5260 * authentication 5261 */ 5262 /* 5263 * Note: returned pointer is the address of a structure that's 5264 * managed separately. Caller must *not* attempt to free it. 5265 */ 5266 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con, 5267 int *proto, int force_new) 5268 { 5269 struct ceph_osd *o = con->private; 5270 struct ceph_osd_client *osdc = o->o_osdc; 5271 struct ceph_auth_client *ac = osdc->client->monc.auth; 5272 struct ceph_auth_handshake *auth = &o->o_auth; 5273 5274 if (force_new && auth->authorizer) { 5275 ceph_auth_destroy_authorizer(auth->authorizer); 5276 auth->authorizer = NULL; 5277 } 5278 if (!auth->authorizer) { 5279 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 5280 auth); 5281 if (ret) 5282 return ERR_PTR(ret); 5283 } else { 5284 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD, 5285 auth); 5286 if (ret) 5287 return ERR_PTR(ret); 5288 } 5289 *proto = ac->protocol; 5290 5291 return auth; 5292 } 5293 5294 5295 static int verify_authorizer_reply(struct ceph_connection *con) 5296 { 5297 struct ceph_osd *o = con->private; 5298 struct ceph_osd_client *osdc = o->o_osdc; 5299 struct ceph_auth_client *ac = osdc->client->monc.auth; 5300 5301 return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer); 5302 } 5303 5304 static int invalidate_authorizer(struct ceph_connection *con) 5305 { 5306 struct ceph_osd *o = con->private; 5307 struct ceph_osd_client *osdc = o->o_osdc; 5308 struct ceph_auth_client *ac = osdc->client->monc.auth; 5309 5310 ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD); 5311 return ceph_monc_validate_auth(&osdc->client->monc); 5312 } 5313 5314 static void osd_reencode_message(struct ceph_msg *msg) 5315 { 5316 int type = le16_to_cpu(msg->hdr.type); 5317 5318 if (type == CEPH_MSG_OSD_OP) 5319 encode_request_finish(msg); 5320 } 5321 5322 static int osd_sign_message(struct ceph_msg *msg) 5323 { 5324 struct ceph_osd *o = msg->con->private; 5325 struct ceph_auth_handshake *auth = &o->o_auth; 5326 5327 return ceph_auth_sign_message(auth, msg); 5328 } 5329 5330 static int osd_check_message_signature(struct ceph_msg *msg) 5331 { 5332 struct ceph_osd *o = msg->con->private; 5333 struct ceph_auth_handshake *auth = &o->o_auth; 5334 5335 return ceph_auth_check_message_signature(auth, msg); 5336 } 5337 5338 static const struct ceph_connection_operations osd_con_ops = { 5339 .get = get_osd_con, 5340 .put = put_osd_con, 5341 .dispatch = dispatch, 5342 .get_authorizer = get_authorizer, 5343 .verify_authorizer_reply = verify_authorizer_reply, 5344 .invalidate_authorizer = invalidate_authorizer, 5345 .alloc_msg = alloc_msg, 5346 .reencode_message = osd_reencode_message, 5347 .sign_message = osd_sign_message, 5348 .check_message_signature = osd_check_message_signature, 5349 .fault = osd_fault, 5350 }; 5351