1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * stack_o2cb.c 5 * 6 * Code which interfaces ocfs2 with the o2cb stack. 7 * 8 * Copyright (C) 2007 Oracle. All rights reserved. 9 * 10 * This program is free software; you can redistribute it and/or 11 * modify it under the terms of the GNU General Public 12 * License as published by the Free Software Foundation, version 2. 13 * 14 * This program is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 * General Public License for more details. 18 */ 19 20 #include <linux/kernel.h> 21 #include <linux/crc32.h> 22 #include <linux/slab.h> 23 #include <linux/module.h> 24 25 /* Needed for AOP_TRUNCATED_PAGE in mlog_errno() */ 26 #include <linux/fs.h> 27 28 #include "cluster/masklog.h" 29 #include "cluster/nodemanager.h" 30 #include "cluster/heartbeat.h" 31 #include "cluster/tcp.h" 32 33 #include "stackglue.h" 34 35 struct o2dlm_private { 36 struct dlm_eviction_cb op_eviction_cb; 37 }; 38 39 static struct ocfs2_stack_plugin o2cb_stack; 40 41 /* These should be identical */ 42 #if (DLM_LOCK_IV != LKM_IVMODE) 43 # error Lock modes do not match 44 #endif 45 #if (DLM_LOCK_NL != LKM_NLMODE) 46 # error Lock modes do not match 47 #endif 48 #if (DLM_LOCK_CR != LKM_CRMODE) 49 # error Lock modes do not match 50 #endif 51 #if (DLM_LOCK_CW != LKM_CWMODE) 52 # error Lock modes do not match 53 #endif 54 #if (DLM_LOCK_PR != LKM_PRMODE) 55 # error Lock modes do not match 56 #endif 57 #if (DLM_LOCK_PW != LKM_PWMODE) 58 # error Lock modes do not match 59 #endif 60 #if (DLM_LOCK_EX != LKM_EXMODE) 61 # error Lock modes do not match 62 #endif 63 static inline int mode_to_o2dlm(int mode) 64 { 65 BUG_ON(mode > LKM_MAXMODE); 66 67 return mode; 68 } 69 70 #define map_flag(_generic, _o2dlm) \ 71 if (flags & (_generic)) { \ 72 flags &= ~(_generic); \ 73 o2dlm_flags |= (_o2dlm); \ 74 } 75 static int flags_to_o2dlm(u32 flags) 76 { 77 int o2dlm_flags = 0; 78 79 map_flag(DLM_LKF_NOQUEUE, LKM_NOQUEUE); 80 map_flag(DLM_LKF_CANCEL, LKM_CANCEL); 81 map_flag(DLM_LKF_CONVERT, LKM_CONVERT); 82 map_flag(DLM_LKF_VALBLK, LKM_VALBLK); 83 map_flag(DLM_LKF_IVVALBLK, LKM_INVVALBLK); 84 map_flag(DLM_LKF_ORPHAN, LKM_ORPHAN); 85 map_flag(DLM_LKF_FORCEUNLOCK, LKM_FORCE); 86 map_flag(DLM_LKF_TIMEOUT, LKM_TIMEOUT); 87 map_flag(DLM_LKF_LOCAL, LKM_LOCAL); 88 89 /* map_flag() should have cleared every flag passed in */ 90 BUG_ON(flags != 0); 91 92 return o2dlm_flags; 93 } 94 #undef map_flag 95 96 /* 97 * Map an o2dlm status to standard errno values. 98 * 99 * o2dlm only uses a handful of these, and returns even fewer to the 100 * caller. Still, we try to assign sane values to each error. 101 * 102 * The following value pairs have special meanings to dlmglue, thus 103 * the right hand side needs to stay unique - never duplicate the 104 * mapping elsewhere in the table! 105 * 106 * DLM_NORMAL: 0 107 * DLM_NOTQUEUED: -EAGAIN 108 * DLM_CANCELGRANT: -EBUSY 109 * DLM_CANCEL: -DLM_ECANCEL 110 */ 111 /* Keep in sync with dlmapi.h */ 112 static int status_map[] = { 113 [DLM_NORMAL] = 0, /* Success */ 114 [DLM_GRANTED] = -EINVAL, 115 [DLM_DENIED] = -EACCES, 116 [DLM_DENIED_NOLOCKS] = -EACCES, 117 [DLM_WORKING] = -EACCES, 118 [DLM_BLOCKED] = -EINVAL, 119 [DLM_BLOCKED_ORPHAN] = -EINVAL, 120 [DLM_DENIED_GRACE_PERIOD] = -EACCES, 121 [DLM_SYSERR] = -ENOMEM, /* It is what it is */ 122 [DLM_NOSUPPORT] = -EPROTO, 123 [DLM_CANCELGRANT] = -EBUSY, /* Cancel after grant */ 124 [DLM_IVLOCKID] = -EINVAL, 125 [DLM_SYNC] = -EINVAL, 126 [DLM_BADTYPE] = -EINVAL, 127 [DLM_BADRESOURCE] = -EINVAL, 128 [DLM_MAXHANDLES] = -ENOMEM, 129 [DLM_NOCLINFO] = -EINVAL, 130 [DLM_NOLOCKMGR] = -EINVAL, 131 [DLM_NOPURGED] = -EINVAL, 132 [DLM_BADARGS] = -EINVAL, 133 [DLM_VOID] = -EINVAL, 134 [DLM_NOTQUEUED] = -EAGAIN, /* Trylock failed */ 135 [DLM_IVBUFLEN] = -EINVAL, 136 [DLM_CVTUNGRANT] = -EPERM, 137 [DLM_BADPARAM] = -EINVAL, 138 [DLM_VALNOTVALID] = -EINVAL, 139 [DLM_REJECTED] = -EPERM, 140 [DLM_ABORT] = -EINVAL, 141 [DLM_CANCEL] = -DLM_ECANCEL, /* Successful cancel */ 142 [DLM_IVRESHANDLE] = -EINVAL, 143 [DLM_DEADLOCK] = -EDEADLK, 144 [DLM_DENIED_NOASTS] = -EINVAL, 145 [DLM_FORWARD] = -EINVAL, 146 [DLM_TIMEOUT] = -ETIMEDOUT, 147 [DLM_IVGROUPID] = -EINVAL, 148 [DLM_VERS_CONFLICT] = -EOPNOTSUPP, 149 [DLM_BAD_DEVICE_PATH] = -ENOENT, 150 [DLM_NO_DEVICE_PERMISSION] = -EPERM, 151 [DLM_NO_CONTROL_DEVICE] = -ENOENT, 152 [DLM_RECOVERING] = -ENOTCONN, 153 [DLM_MIGRATING] = -ERESTART, 154 [DLM_MAXSTATS] = -EINVAL, 155 }; 156 157 static int dlm_status_to_errno(enum dlm_status status) 158 { 159 BUG_ON(status < 0 || status >= ARRAY_SIZE(status_map)); 160 161 return status_map[status]; 162 } 163 164 static void o2dlm_lock_ast_wrapper(void *astarg) 165 { 166 struct ocfs2_dlm_lksb *lksb = astarg; 167 168 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); 169 } 170 171 static void o2dlm_blocking_ast_wrapper(void *astarg, int level) 172 { 173 struct ocfs2_dlm_lksb *lksb = astarg; 174 175 lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); 176 } 177 178 static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status) 179 { 180 struct ocfs2_dlm_lksb *lksb = astarg; 181 int error = dlm_status_to_errno(status); 182 183 /* 184 * In o2dlm, you can get both the lock_ast() for the lock being 185 * granted and the unlock_ast() for the CANCEL failing. A 186 * successful cancel sends DLM_NORMAL here. If the 187 * lock grant happened before the cancel arrived, you get 188 * DLM_CANCELGRANT. 189 * 190 * There's no need for the double-ast. If we see DLM_CANCELGRANT, 191 * we just ignore it. We expect the lock_ast() to handle the 192 * granted lock. 193 */ 194 if (status == DLM_CANCELGRANT) 195 return; 196 197 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, error); 198 } 199 200 static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn, 201 int mode, 202 struct ocfs2_dlm_lksb *lksb, 203 u32 flags, 204 void *name, 205 unsigned int namelen) 206 { 207 enum dlm_status status; 208 int o2dlm_mode = mode_to_o2dlm(mode); 209 int o2dlm_flags = flags_to_o2dlm(flags); 210 int ret; 211 212 status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm, 213 o2dlm_flags, name, namelen, 214 o2dlm_lock_ast_wrapper, lksb, 215 o2dlm_blocking_ast_wrapper); 216 ret = dlm_status_to_errno(status); 217 return ret; 218 } 219 220 static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn, 221 struct ocfs2_dlm_lksb *lksb, 222 u32 flags) 223 { 224 enum dlm_status status; 225 int o2dlm_flags = flags_to_o2dlm(flags); 226 int ret; 227 228 status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm, 229 o2dlm_flags, o2dlm_unlock_ast_wrapper, lksb); 230 ret = dlm_status_to_errno(status); 231 return ret; 232 } 233 234 static int o2cb_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) 235 { 236 return dlm_status_to_errno(lksb->lksb_o2dlm.status); 237 } 238 239 /* 240 * o2dlm aways has a "valid" LVB. If the dlm loses track of the LVB 241 * contents, it will zero out the LVB. Thus the caller can always trust 242 * the contents. 243 */ 244 static int o2cb_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) 245 { 246 return 1; 247 } 248 249 static void *o2cb_dlm_lvb(struct ocfs2_dlm_lksb *lksb) 250 { 251 return (void *)(lksb->lksb_o2dlm.lvb); 252 } 253 254 static void o2cb_dump_lksb(struct ocfs2_dlm_lksb *lksb) 255 { 256 dlm_print_one_lock(lksb->lksb_o2dlm.lockid); 257 } 258 259 /* 260 * Check if this node is heartbeating and is connected to all other 261 * heartbeating nodes. 262 */ 263 static int o2cb_cluster_check(void) 264 { 265 u8 node_num; 266 int i; 267 unsigned long hbmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 268 unsigned long netmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 269 270 node_num = o2nm_this_node(); 271 if (node_num == O2NM_MAX_NODES) { 272 printk(KERN_ERR "o2cb: This node has not been configured.\n"); 273 return -EINVAL; 274 } 275 276 /* 277 * o2dlm expects o2net sockets to be created. If not, then 278 * dlm_join_domain() fails with a stack of errors which are both cryptic 279 * and incomplete. The idea here is to detect upfront whether we have 280 * managed to connect to all nodes or not. If not, then list the nodes 281 * to allow the user to check the configuration (incorrect IP, firewall, 282 * etc.) Yes, this is racy. But its not the end of the world. 283 */ 284 #define O2CB_MAP_STABILIZE_COUNT 60 285 for (i = 0; i < O2CB_MAP_STABILIZE_COUNT; ++i) { 286 o2hb_fill_node_map(hbmap, sizeof(hbmap)); 287 if (!test_bit(node_num, hbmap)) { 288 printk(KERN_ERR "o2cb: %s heartbeat has not been " 289 "started.\n", (o2hb_global_heartbeat_active() ? 290 "Global" : "Local")); 291 return -EINVAL; 292 } 293 o2net_fill_node_map(netmap, sizeof(netmap)); 294 /* Force set the current node to allow easy compare */ 295 set_bit(node_num, netmap); 296 if (!memcmp(hbmap, netmap, sizeof(hbmap))) 297 return 0; 298 if (i < O2CB_MAP_STABILIZE_COUNT - 1) 299 msleep(1000); 300 } 301 302 printk(KERN_ERR "o2cb: This node could not connect to nodes:"); 303 i = -1; 304 while ((i = find_next_bit(hbmap, O2NM_MAX_NODES, 305 i + 1)) < O2NM_MAX_NODES) { 306 if (!test_bit(i, netmap)) 307 printk(" %u", i); 308 } 309 printk(".\n"); 310 311 return -ENOTCONN; 312 } 313 314 /* 315 * Called from the dlm when it's about to evict a node. This is how the 316 * classic stack signals node death. 317 */ 318 static void o2dlm_eviction_cb(int node_num, void *data) 319 { 320 struct ocfs2_cluster_connection *conn = data; 321 322 printk(KERN_NOTICE "o2cb: o2dlm has evicted node %d from domain %.*s\n", 323 node_num, conn->cc_namelen, conn->cc_name); 324 325 conn->cc_recovery_handler(node_num, conn->cc_recovery_data); 326 } 327 328 static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn) 329 { 330 int rc = 0; 331 u32 dlm_key; 332 struct dlm_ctxt *dlm; 333 struct o2dlm_private *priv; 334 struct dlm_protocol_version fs_version; 335 336 BUG_ON(conn == NULL); 337 BUG_ON(conn->cc_proto == NULL); 338 339 /* Ensure cluster stack is up and all nodes are connected */ 340 rc = o2cb_cluster_check(); 341 if (rc) { 342 printk(KERN_ERR "o2cb: Cluster check failed. Fix errors " 343 "before retrying.\n"); 344 goto out; 345 } 346 347 priv = kzalloc(sizeof(struct o2dlm_private), GFP_KERNEL); 348 if (!priv) { 349 rc = -ENOMEM; 350 goto out_free; 351 } 352 353 /* This just fills the structure in. It is safe to pass conn. */ 354 dlm_setup_eviction_cb(&priv->op_eviction_cb, o2dlm_eviction_cb, 355 conn); 356 357 conn->cc_private = priv; 358 359 /* used by the dlm code to make message headers unique, each 360 * node in this domain must agree on this. */ 361 dlm_key = crc32_le(0, conn->cc_name, conn->cc_namelen); 362 fs_version.pv_major = conn->cc_version.pv_major; 363 fs_version.pv_minor = conn->cc_version.pv_minor; 364 365 dlm = dlm_register_domain(conn->cc_name, dlm_key, &fs_version); 366 if (IS_ERR(dlm)) { 367 rc = PTR_ERR(dlm); 368 mlog_errno(rc); 369 goto out_free; 370 } 371 372 conn->cc_version.pv_major = fs_version.pv_major; 373 conn->cc_version.pv_minor = fs_version.pv_minor; 374 conn->cc_lockspace = dlm; 375 376 dlm_register_eviction_cb(dlm, &priv->op_eviction_cb); 377 378 out_free: 379 if (rc) 380 kfree(conn->cc_private); 381 382 out: 383 return rc; 384 } 385 386 static int o2cb_cluster_disconnect(struct ocfs2_cluster_connection *conn) 387 { 388 struct dlm_ctxt *dlm = conn->cc_lockspace; 389 struct o2dlm_private *priv = conn->cc_private; 390 391 dlm_unregister_eviction_cb(&priv->op_eviction_cb); 392 conn->cc_private = NULL; 393 kfree(priv); 394 395 dlm_unregister_domain(dlm); 396 conn->cc_lockspace = NULL; 397 398 return 0; 399 } 400 401 static int o2cb_cluster_this_node(struct ocfs2_cluster_connection *conn, 402 unsigned int *node) 403 { 404 int node_num; 405 406 node_num = o2nm_this_node(); 407 if (node_num == O2NM_INVALID_NODE_NUM) 408 return -ENOENT; 409 410 if (node_num >= O2NM_MAX_NODES) 411 return -EOVERFLOW; 412 413 *node = node_num; 414 return 0; 415 } 416 417 static struct ocfs2_stack_operations o2cb_stack_ops = { 418 .connect = o2cb_cluster_connect, 419 .disconnect = o2cb_cluster_disconnect, 420 .this_node = o2cb_cluster_this_node, 421 .dlm_lock = o2cb_dlm_lock, 422 .dlm_unlock = o2cb_dlm_unlock, 423 .lock_status = o2cb_dlm_lock_status, 424 .lvb_valid = o2cb_dlm_lvb_valid, 425 .lock_lvb = o2cb_dlm_lvb, 426 .dump_lksb = o2cb_dump_lksb, 427 }; 428 429 static struct ocfs2_stack_plugin o2cb_stack = { 430 .sp_name = "o2cb", 431 .sp_ops = &o2cb_stack_ops, 432 .sp_owner = THIS_MODULE, 433 }; 434 435 static int __init o2cb_stack_init(void) 436 { 437 return ocfs2_stack_glue_register(&o2cb_stack); 438 } 439 440 static void __exit o2cb_stack_exit(void) 441 { 442 ocfs2_stack_glue_unregister(&o2cb_stack); 443 } 444 445 MODULE_AUTHOR("Oracle"); 446 MODULE_DESCRIPTION("ocfs2 driver for the classic o2cb stack"); 447 MODULE_LICENSE("GPL"); 448 module_init(o2cb_stack_init); 449 module_exit(o2cb_stack_exit); 450