xref: /openbmc/linux/fs/dlm/ast.c (revision bbaf1ff06af49e856501024abbe161d96c1f0d66)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /******************************************************************************
3  *******************************************************************************
4  **
5  **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
6  **  Copyright (C) 2004-2010 Red Hat, Inc.  All rights reserved.
7  **
8  **
9  *******************************************************************************
10  ******************************************************************************/
11  
12  #include <trace/events/dlm.h>
13  
14  #include "dlm_internal.h"
15  #include "memory.h"
16  #include "lock.h"
17  #include "user.h"
18  #include "ast.h"
19  
20  void dlm_release_callback(struct kref *ref)
21  {
22  	struct dlm_callback *cb = container_of(ref, struct dlm_callback, ref);
23  
24  	dlm_free_cb(cb);
25  }
26  
27  void dlm_callback_set_last_ptr(struct dlm_callback **from,
28  			       struct dlm_callback *to)
29  {
30  	if (*from)
31  		kref_put(&(*from)->ref, dlm_release_callback);
32  
33  	if (to)
34  		kref_get(&to->ref);
35  
36  	*from = to;
37  }
38  
39  void dlm_purge_lkb_callbacks(struct dlm_lkb *lkb)
40  {
41  	struct dlm_callback *cb, *safe;
42  
43  	list_for_each_entry_safe(cb, safe, &lkb->lkb_callbacks, list) {
44  		list_del(&cb->list);
45  		kref_put(&cb->ref, dlm_release_callback);
46  	}
47  
48  	clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
49  
50  	/* invalidate */
51  	dlm_callback_set_last_ptr(&lkb->lkb_last_cast, NULL);
52  	dlm_callback_set_last_ptr(&lkb->lkb_last_cb, NULL);
53  	lkb->lkb_last_bast_mode = -1;
54  }
55  
56  int dlm_enqueue_lkb_callback(struct dlm_lkb *lkb, uint32_t flags, int mode,
57  			     int status, uint32_t sbflags)
58  {
59  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
60  	int rv = DLM_ENQUEUE_CALLBACK_SUCCESS;
61  	struct dlm_callback *cb;
62  	int prev_mode;
63  
64  	if (flags & DLM_CB_BAST) {
65  		/* if cb is a bast, it should be skipped if the blocking mode is
66  		 * compatible with the last granted mode
67  		 */
68  		if (lkb->lkb_last_cast) {
69  			if (dlm_modes_compat(mode, lkb->lkb_last_cast->mode)) {
70  				log_debug(ls, "skip %x bast mode %d for cast mode %d",
71  					  lkb->lkb_id, mode,
72  					  lkb->lkb_last_cast->mode);
73  				goto out;
74  			}
75  		}
76  
77  		/*
78  		 * Suppress some redundant basts here, do more on removal.
79  		 * Don't even add a bast if the callback just before it
80  		 * is a bast for the same mode or a more restrictive mode.
81  		 * (the addional > PR check is needed for PR/CW inversion)
82  		 */
83  		if (lkb->lkb_last_cb && lkb->lkb_last_cb->flags & DLM_CB_BAST) {
84  			prev_mode = lkb->lkb_last_cb->mode;
85  
86  			if ((prev_mode == mode) ||
87  			    (prev_mode > mode && prev_mode > DLM_LOCK_PR)) {
88  				log_debug(ls, "skip %x add bast mode %d for bast mode %d",
89  					  lkb->lkb_id, mode, prev_mode);
90  				goto out;
91  			}
92  		}
93  	}
94  
95  	cb = dlm_allocate_cb();
96  	if (!cb) {
97  		rv = DLM_ENQUEUE_CALLBACK_FAILURE;
98  		goto out;
99  	}
100  
101  	cb->flags = flags;
102  	cb->mode = mode;
103  	cb->sb_status = status;
104  	cb->sb_flags = (sbflags & 0x000000FF);
105  	kref_init(&cb->ref);
106  	if (!test_and_set_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags))
107  		rv = DLM_ENQUEUE_CALLBACK_NEED_SCHED;
108  
109  	list_add_tail(&cb->list, &lkb->lkb_callbacks);
110  
111  	if (flags & DLM_CB_CAST)
112  		dlm_callback_set_last_ptr(&lkb->lkb_last_cast, cb);
113  
114  	dlm_callback_set_last_ptr(&lkb->lkb_last_cb, cb);
115  
116   out:
117  	return rv;
118  }
119  
120  int dlm_dequeue_lkb_callback(struct dlm_lkb *lkb, struct dlm_callback **cb)
121  {
122  	/* oldest undelivered cb is callbacks first entry */
123  	*cb = list_first_entry_or_null(&lkb->lkb_callbacks,
124  				       struct dlm_callback, list);
125  	if (!*cb)
126  		return DLM_DEQUEUE_CALLBACK_EMPTY;
127  
128  	/* remove it from callbacks so shift others down */
129  	list_del(&(*cb)->list);
130  	if (list_empty(&lkb->lkb_callbacks))
131  		return DLM_DEQUEUE_CALLBACK_LAST;
132  
133  	return DLM_DEQUEUE_CALLBACK_SUCCESS;
134  }
135  
136  void dlm_add_cb(struct dlm_lkb *lkb, uint32_t flags, int mode, int status,
137  		uint32_t sbflags)
138  {
139  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
140  	int rv;
141  
142  	if (test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
143  		dlm_user_add_ast(lkb, flags, mode, status, sbflags);
144  		return;
145  	}
146  
147  	spin_lock(&lkb->lkb_cb_lock);
148  	rv = dlm_enqueue_lkb_callback(lkb, flags, mode, status, sbflags);
149  	switch (rv) {
150  	case DLM_ENQUEUE_CALLBACK_NEED_SCHED:
151  		kref_get(&lkb->lkb_ref);
152  
153  		spin_lock(&ls->ls_cb_lock);
154  		if (test_bit(LSFL_CB_DELAY, &ls->ls_flags)) {
155  			list_add(&lkb->lkb_cb_list, &ls->ls_cb_delay);
156  		} else {
157  			queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
158  		}
159  		spin_unlock(&ls->ls_cb_lock);
160  		break;
161  	case DLM_ENQUEUE_CALLBACK_FAILURE:
162  		WARN_ON_ONCE(1);
163  		break;
164  	case DLM_ENQUEUE_CALLBACK_SUCCESS:
165  		break;
166  	default:
167  		WARN_ON_ONCE(1);
168  		break;
169  	}
170  	spin_unlock(&lkb->lkb_cb_lock);
171  }
172  
173  void dlm_callback_work(struct work_struct *work)
174  {
175  	struct dlm_lkb *lkb = container_of(work, struct dlm_lkb, lkb_cb_work);
176  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
177  	void (*castfn) (void *astparam);
178  	void (*bastfn) (void *astparam, int mode);
179  	struct dlm_callback *cb;
180  	int rv;
181  
182  	spin_lock(&lkb->lkb_cb_lock);
183  	rv = dlm_dequeue_lkb_callback(lkb, &cb);
184  	spin_unlock(&lkb->lkb_cb_lock);
185  
186  	if (WARN_ON_ONCE(rv == DLM_DEQUEUE_CALLBACK_EMPTY))
187  		goto out;
188  
189  	for (;;) {
190  		castfn = lkb->lkb_astfn;
191  		bastfn = lkb->lkb_bastfn;
192  
193  		if (cb->flags & DLM_CB_BAST) {
194  			trace_dlm_bast(ls, lkb, cb->mode);
195  			lkb->lkb_last_bast_time = ktime_get();
196  			lkb->lkb_last_bast_mode = cb->mode;
197  			bastfn(lkb->lkb_astparam, cb->mode);
198  		} else if (cb->flags & DLM_CB_CAST) {
199  			lkb->lkb_lksb->sb_status = cb->sb_status;
200  			lkb->lkb_lksb->sb_flags = cb->sb_flags;
201  			trace_dlm_ast(ls, lkb);
202  			lkb->lkb_last_cast_time = ktime_get();
203  			castfn(lkb->lkb_astparam);
204  		}
205  
206  		kref_put(&cb->ref, dlm_release_callback);
207  
208  		spin_lock(&lkb->lkb_cb_lock);
209  		rv = dlm_dequeue_lkb_callback(lkb, &cb);
210  		if (rv == DLM_DEQUEUE_CALLBACK_EMPTY) {
211  			clear_bit(DLM_IFL_CB_PENDING_BIT, &lkb->lkb_iflags);
212  			spin_unlock(&lkb->lkb_cb_lock);
213  			break;
214  		}
215  		spin_unlock(&lkb->lkb_cb_lock);
216  	}
217  
218  out:
219  	/* undo kref_get from dlm_add_callback, may cause lkb to be freed */
220  	dlm_put_lkb(lkb);
221  }
222  
223  int dlm_callback_start(struct dlm_ls *ls)
224  {
225  	ls->ls_callback_wq = alloc_workqueue("dlm_callback",
226  					     WQ_HIGHPRI | WQ_MEM_RECLAIM, 0);
227  	if (!ls->ls_callback_wq) {
228  		log_print("can't start dlm_callback workqueue");
229  		return -ENOMEM;
230  	}
231  	return 0;
232  }
233  
234  void dlm_callback_stop(struct dlm_ls *ls)
235  {
236  	if (ls->ls_callback_wq)
237  		destroy_workqueue(ls->ls_callback_wq);
238  }
239  
240  void dlm_callback_suspend(struct dlm_ls *ls)
241  {
242  	if (ls->ls_callback_wq) {
243  		spin_lock(&ls->ls_cb_lock);
244  		set_bit(LSFL_CB_DELAY, &ls->ls_flags);
245  		spin_unlock(&ls->ls_cb_lock);
246  
247  		flush_workqueue(ls->ls_callback_wq);
248  	}
249  }
250  
251  #define MAX_CB_QUEUE 25
252  
253  void dlm_callback_resume(struct dlm_ls *ls)
254  {
255  	struct dlm_lkb *lkb, *safe;
256  	int count = 0, sum = 0;
257  	bool empty;
258  
259  	if (!ls->ls_callback_wq)
260  		return;
261  
262  more:
263  	spin_lock(&ls->ls_cb_lock);
264  	list_for_each_entry_safe(lkb, safe, &ls->ls_cb_delay, lkb_cb_list) {
265  		list_del_init(&lkb->lkb_cb_list);
266  		queue_work(ls->ls_callback_wq, &lkb->lkb_cb_work);
267  		count++;
268  		if (count == MAX_CB_QUEUE)
269  			break;
270  	}
271  	empty = list_empty(&ls->ls_cb_delay);
272  	if (empty)
273  		clear_bit(LSFL_CB_DELAY, &ls->ls_flags);
274  	spin_unlock(&ls->ls_cb_lock);
275  
276  	sum += count;
277  	if (!empty) {
278  		count = 0;
279  		cond_resched();
280  		goto more;
281  	}
282  
283  	if (sum)
284  		log_rinfo(ls, "%s %d", __func__, sum);
285  }
286  
287