xref: /openbmc/linux/fs/gfs2/lops.c (revision feaa7bba)
1 /*
2  * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
3  * Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
4  *
5  * This copyrighted material is made available to anyone wishing to use,
6  * modify, copy, or redistribute it subject to the terms and conditions
7  * of the GNU General Public License v.2.
8  */
9 
10 #include <linux/sched.h>
11 #include <linux/slab.h>
12 #include <linux/spinlock.h>
13 #include <linux/completion.h>
14 #include <linux/buffer_head.h>
15 #include <linux/gfs2_ondisk.h>
16 
17 #include "gfs2.h"
18 #include "lm_interface.h"
19 #include "incore.h"
20 #include "glock.h"
21 #include "log.h"
22 #include "lops.h"
23 #include "meta_io.h"
24 #include "recovery.h"
25 #include "rgrp.h"
26 #include "trans.h"
27 #include "util.h"
28 
29 static void glock_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
30 {
31 	struct gfs2_glock *gl;
32 	struct gfs2_trans *tr = current->journal_info;
33 
34 	tr->tr_touched = 1;
35 
36 	if (!list_empty(&le->le_list))
37 		return;
38 
39 	gl = container_of(le, struct gfs2_glock, gl_le);
40 	if (gfs2_assert_withdraw(sdp, gfs2_glock_is_held_excl(gl)))
41 		return;
42 	gfs2_glock_hold(gl);
43 	set_bit(GLF_DIRTY, &gl->gl_flags);
44 
45 	gfs2_log_lock(sdp);
46 	sdp->sd_log_num_gl++;
47 	list_add(&le->le_list, &sdp->sd_log_le_gl);
48 	gfs2_log_unlock(sdp);
49 }
50 
51 static void glock_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
52 {
53 	struct list_head *head = &sdp->sd_log_le_gl;
54 	struct gfs2_glock *gl;
55 
56 	while (!list_empty(head)) {
57 		gl = list_entry(head->next, struct gfs2_glock, gl_le.le_list);
58 		list_del_init(&gl->gl_le.le_list);
59 		sdp->sd_log_num_gl--;
60 
61 		gfs2_assert_withdraw(sdp, gfs2_glock_is_held_excl(gl));
62 		gfs2_glock_put(gl);
63 	}
64 	gfs2_assert_warn(sdp, !sdp->sd_log_num_gl);
65 }
66 
67 static void buf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
68 {
69 	struct gfs2_bufdata *bd = container_of(le, struct gfs2_bufdata, bd_le);
70 	struct gfs2_trans *tr;
71 
72 	if (!list_empty(&bd->bd_list_tr))
73 		return;
74 
75 	tr = current->journal_info;
76 	tr->tr_touched = 1;
77 	tr->tr_num_buf++;
78 	list_add(&bd->bd_list_tr, &tr->tr_list_buf);
79 
80 	if (!list_empty(&le->le_list))
81 		return;
82 
83 	gfs2_trans_add_gl(bd->bd_gl);
84 
85 	gfs2_meta_check(sdp, bd->bd_bh);
86 	gfs2_pin(sdp, bd->bd_bh);
87 
88 	gfs2_log_lock(sdp);
89 	sdp->sd_log_num_buf++;
90 	list_add(&le->le_list, &sdp->sd_log_le_buf);
91 	gfs2_log_unlock(sdp);
92 
93 	tr->tr_num_buf_new++;
94 }
95 
96 static void buf_lo_incore_commit(struct gfs2_sbd *sdp, struct gfs2_trans *tr)
97 {
98 	struct list_head *head = &tr->tr_list_buf;
99 	struct gfs2_bufdata *bd;
100 
101 	while (!list_empty(head)) {
102 		bd = list_entry(head->next, struct gfs2_bufdata, bd_list_tr);
103 		list_del_init(&bd->bd_list_tr);
104 		tr->tr_num_buf--;
105 	}
106 	gfs2_assert_warn(sdp, !tr->tr_num_buf);
107 }
108 
109 static void buf_lo_before_commit(struct gfs2_sbd *sdp)
110 {
111 	struct buffer_head *bh;
112 	struct gfs2_log_descriptor *ld;
113 	struct gfs2_bufdata *bd1 = NULL, *bd2;
114 	unsigned int total = sdp->sd_log_num_buf;
115 	unsigned int offset = sizeof(struct gfs2_log_descriptor);
116 	unsigned int limit;
117 	unsigned int num;
118 	unsigned n;
119 	__be64 *ptr;
120 
121 	offset += (sizeof(__be64) - 1);
122 	offset &= ~(sizeof(__be64) - 1);
123 	limit = (sdp->sd_sb.sb_bsize - offset)/sizeof(__be64);
124 	/* for 4k blocks, limit = 503 */
125 
126 	bd1 = bd2 = list_prepare_entry(bd1, &sdp->sd_log_le_buf, bd_le.le_list);
127 	while(total) {
128 		num = total;
129 		if (total > limit)
130 			num = limit;
131 		bh = gfs2_log_get_buf(sdp);
132 		sdp->sd_log_num_hdrs++;
133 		ld = (struct gfs2_log_descriptor *)bh->b_data;
134 		ptr = (__be64 *)(bh->b_data + offset);
135 		ld->ld_header.mh_magic = cpu_to_be32(GFS2_MAGIC);
136 		ld->ld_header.mh_type = cpu_to_be32(GFS2_METATYPE_LD);
137 		ld->ld_header.mh_format = cpu_to_be32(GFS2_FORMAT_LD);
138 		ld->ld_type = cpu_to_be32(GFS2_LOG_DESC_METADATA);
139 		ld->ld_length = cpu_to_be32(num + 1);
140 		ld->ld_data1 = cpu_to_be32(num);
141 		ld->ld_data2 = cpu_to_be32(0);
142 		memset(ld->ld_reserved, 0, sizeof(ld->ld_reserved));
143 
144 		n = 0;
145 		list_for_each_entry_continue(bd1, &sdp->sd_log_le_buf,
146 					     bd_le.le_list) {
147 			*ptr++ = cpu_to_be64(bd1->bd_bh->b_blocknr);
148 			if (++n >= num)
149 				break;
150 		}
151 
152 		set_buffer_dirty(bh);
153 		ll_rw_block(WRITE, 1, &bh);
154 
155 		n = 0;
156 		list_for_each_entry_continue(bd2, &sdp->sd_log_le_buf,
157 					     bd_le.le_list) {
158 			bh = gfs2_log_fake_buf(sdp, bd2->bd_bh);
159 			set_buffer_dirty(bh);
160 			ll_rw_block(WRITE, 1, &bh);
161 			if (++n >= num)
162 				break;
163 		}
164 
165 		total -= num;
166 	}
167 }
168 
169 static void buf_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
170 {
171 	struct list_head *head = &sdp->sd_log_le_buf;
172 	struct gfs2_bufdata *bd;
173 
174 	while (!list_empty(head)) {
175 		bd = list_entry(head->next, struct gfs2_bufdata, bd_le.le_list);
176 		list_del_init(&bd->bd_le.le_list);
177 		sdp->sd_log_num_buf--;
178 
179 		gfs2_unpin(sdp, bd->bd_bh, ai);
180 	}
181 	gfs2_assert_warn(sdp, !sdp->sd_log_num_buf);
182 }
183 
184 static void buf_lo_before_scan(struct gfs2_jdesc *jd,
185 			       struct gfs2_log_header *head, int pass)
186 {
187 	struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
188 
189 	if (pass != 0)
190 		return;
191 
192 	sdp->sd_found_blocks = 0;
193 	sdp->sd_replayed_blocks = 0;
194 }
195 
196 static int buf_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
197 				struct gfs2_log_descriptor *ld, __be64 *ptr,
198 				int pass)
199 {
200 	struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
201 	struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
202 	struct gfs2_glock *gl = ip->i_gl;
203 	unsigned int blks = be32_to_cpu(ld->ld_data1);
204 	struct buffer_head *bh_log, *bh_ip;
205 	uint64_t blkno;
206 	int error = 0;
207 
208 	if (pass != 1 || be32_to_cpu(ld->ld_type) != GFS2_LOG_DESC_METADATA)
209 		return 0;
210 
211 	gfs2_replay_incr_blk(sdp, &start);
212 
213 	for (; blks; gfs2_replay_incr_blk(sdp, &start), blks--) {
214 		blkno = be64_to_cpu(*ptr++);
215 
216 		sdp->sd_found_blocks++;
217 
218 		if (gfs2_revoke_check(sdp, blkno, start))
219 			continue;
220 
221 		error = gfs2_replay_read_block(jd, start, &bh_log);
222                 if (error)
223                         return error;
224 
225 		bh_ip = gfs2_meta_new(gl, blkno);
226 		memcpy(bh_ip->b_data, bh_log->b_data, bh_log->b_size);
227 
228 		if (gfs2_meta_check(sdp, bh_ip))
229 			error = -EIO;
230 		else
231 			mark_buffer_dirty(bh_ip);
232 
233 		brelse(bh_log);
234 		brelse(bh_ip);
235 
236 		if (error)
237 			break;
238 
239 		sdp->sd_replayed_blocks++;
240 	}
241 
242 	return error;
243 }
244 
245 static void buf_lo_after_scan(struct gfs2_jdesc *jd, int error, int pass)
246 {
247 	struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
248 	struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
249 
250 	if (error) {
251 		gfs2_meta_sync(ip->i_gl,
252 			       DIO_START | DIO_WAIT);
253 		return;
254 	}
255 	if (pass != 1)
256 		return;
257 
258 	gfs2_meta_sync(ip->i_gl, DIO_START | DIO_WAIT);
259 
260 	fs_info(sdp, "jid=%u: Replayed %u of %u blocks\n",
261 	        jd->jd_jid, sdp->sd_replayed_blocks, sdp->sd_found_blocks);
262 }
263 
264 static void revoke_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
265 {
266 	struct gfs2_trans *tr;
267 
268 	tr = current->journal_info;
269 	tr->tr_touched = 1;
270 	tr->tr_num_revoke++;
271 
272 	gfs2_log_lock(sdp);
273 	sdp->sd_log_num_revoke++;
274 	list_add(&le->le_list, &sdp->sd_log_le_revoke);
275 	gfs2_log_unlock(sdp);
276 }
277 
278 static void revoke_lo_before_commit(struct gfs2_sbd *sdp)
279 {
280 	struct gfs2_log_descriptor *ld;
281 	struct gfs2_meta_header *mh;
282 	struct buffer_head *bh;
283 	unsigned int offset;
284 	struct list_head *head = &sdp->sd_log_le_revoke;
285 	struct gfs2_revoke *rv;
286 
287 	if (!sdp->sd_log_num_revoke)
288 		return;
289 
290 	bh = gfs2_log_get_buf(sdp);
291 	ld = (struct gfs2_log_descriptor *)bh->b_data;
292 	ld->ld_header.mh_magic = cpu_to_be32(GFS2_MAGIC);
293 	ld->ld_header.mh_type = cpu_to_be32(GFS2_METATYPE_LD);
294 	ld->ld_header.mh_format = cpu_to_be32(GFS2_FORMAT_LD);
295 	ld->ld_type = cpu_to_be32(GFS2_LOG_DESC_REVOKE);
296 	ld->ld_length = cpu_to_be32(gfs2_struct2blk(sdp, sdp->sd_log_num_revoke,
297 						    sizeof(uint64_t)));
298 	ld->ld_data1 = cpu_to_be32(sdp->sd_log_num_revoke);
299 	ld->ld_data2 = cpu_to_be32(0);
300 	memset(ld->ld_reserved, 0, sizeof(ld->ld_reserved));
301 	offset = sizeof(struct gfs2_log_descriptor);
302 
303 	while (!list_empty(head)) {
304 		rv = list_entry(head->next, struct gfs2_revoke, rv_le.le_list);
305 		list_del_init(&rv->rv_le.le_list);
306 		sdp->sd_log_num_revoke--;
307 
308 		if (offset + sizeof(uint64_t) > sdp->sd_sb.sb_bsize) {
309 			set_buffer_dirty(bh);
310 			ll_rw_block(WRITE, 1, &bh);
311 
312 			bh = gfs2_log_get_buf(sdp);
313 			mh = (struct gfs2_meta_header *)bh->b_data;
314 			mh->mh_magic = cpu_to_be32(GFS2_MAGIC);
315 			mh->mh_type = cpu_to_be32(GFS2_METATYPE_LB);
316 			mh->mh_format = cpu_to_be32(GFS2_FORMAT_LB);
317 			offset = sizeof(struct gfs2_meta_header);
318 		}
319 
320 		*(__be64 *)(bh->b_data + offset) = cpu_to_be64(rv->rv_blkno);
321 		kfree(rv);
322 
323 		offset += sizeof(uint64_t);
324 	}
325 	gfs2_assert_withdraw(sdp, !sdp->sd_log_num_revoke);
326 
327 	set_buffer_dirty(bh);
328 	ll_rw_block(WRITE, 1, &bh);
329 }
330 
331 static void revoke_lo_before_scan(struct gfs2_jdesc *jd,
332 				  struct gfs2_log_header *head, int pass)
333 {
334 	struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
335 
336 	if (pass != 0)
337 		return;
338 
339 	sdp->sd_found_revokes = 0;
340 	sdp->sd_replay_tail = head->lh_tail;
341 }
342 
343 static int revoke_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
344 				   struct gfs2_log_descriptor *ld, __be64 *ptr,
345 				   int pass)
346 {
347 	struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
348 	unsigned int blks = be32_to_cpu(ld->ld_length);
349 	unsigned int revokes = be32_to_cpu(ld->ld_data1);
350 	struct buffer_head *bh;
351 	unsigned int offset;
352 	uint64_t blkno;
353 	int first = 1;
354 	int error;
355 
356 	if (pass != 0 || be32_to_cpu(ld->ld_type) != GFS2_LOG_DESC_REVOKE)
357 		return 0;
358 
359 	offset = sizeof(struct gfs2_log_descriptor);
360 
361 	for (; blks; gfs2_replay_incr_blk(sdp, &start), blks--) {
362 		error = gfs2_replay_read_block(jd, start, &bh);
363 		if (error)
364 			return error;
365 
366 		if (!first)
367 			gfs2_metatype_check(sdp, bh, GFS2_METATYPE_LB);
368 
369 		while (offset + sizeof(uint64_t) <= sdp->sd_sb.sb_bsize) {
370 			blkno = be64_to_cpu(*(__be64 *)(bh->b_data + offset));
371 
372 			error = gfs2_revoke_add(sdp, blkno, start);
373 			if (error < 0)
374 				return error;
375 			else if (error)
376 				sdp->sd_found_revokes++;
377 
378 			if (!--revokes)
379 				break;
380 			offset += sizeof(uint64_t);
381 		}
382 
383 		brelse(bh);
384 		offset = sizeof(struct gfs2_meta_header);
385 		first = 0;
386 	}
387 
388 	return 0;
389 }
390 
391 static void revoke_lo_after_scan(struct gfs2_jdesc *jd, int error, int pass)
392 {
393 	struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
394 
395 	if (error) {
396 		gfs2_revoke_clean(sdp);
397 		return;
398 	}
399 	if (pass != 1)
400 		return;
401 
402 	fs_info(sdp, "jid=%u: Found %u revoke tags\n",
403 	        jd->jd_jid, sdp->sd_found_revokes);
404 
405 	gfs2_revoke_clean(sdp);
406 }
407 
408 static void rg_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
409 {
410 	struct gfs2_rgrpd *rgd;
411 	struct gfs2_trans *tr = current->journal_info;
412 
413 	tr->tr_touched = 1;
414 
415 	if (!list_empty(&le->le_list))
416 		return;
417 
418 	rgd = container_of(le, struct gfs2_rgrpd, rd_le);
419 	gfs2_rgrp_bh_hold(rgd);
420 
421 	gfs2_log_lock(sdp);
422 	sdp->sd_log_num_rg++;
423 	list_add(&le->le_list, &sdp->sd_log_le_rg);
424 	gfs2_log_unlock(sdp);
425 }
426 
427 static void rg_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
428 {
429 	struct list_head *head = &sdp->sd_log_le_rg;
430 	struct gfs2_rgrpd *rgd;
431 
432 	while (!list_empty(head)) {
433 		rgd = list_entry(head->next, struct gfs2_rgrpd, rd_le.le_list);
434 		list_del_init(&rgd->rd_le.le_list);
435 		sdp->sd_log_num_rg--;
436 
437 		gfs2_rgrp_repolish_clones(rgd);
438 		gfs2_rgrp_bh_put(rgd);
439 	}
440 	gfs2_assert_warn(sdp, !sdp->sd_log_num_rg);
441 }
442 
443 /**
444  * databuf_lo_add - Add a databuf to the transaction.
445  *
446  * This is used in two distinct cases:
447  * i) In ordered write mode
448  *    We put the data buffer on a list so that we can ensure that its
449  *    synced to disk at the right time
450  * ii) In journaled data mode
451  *    We need to journal the data block in the same way as metadata in
452  *    the functions above. The difference is that here we have a tag
453  *    which is two __be64's being the block number (as per meta data)
454  *    and a flag which says whether the data block needs escaping or
455  *    not. This means we need a new log entry for each 251 or so data
456  *    blocks, which isn't an enormous overhead but twice as much as
457  *    for normal metadata blocks.
458  */
459 static void databuf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
460 {
461 	struct gfs2_bufdata *bd = container_of(le, struct gfs2_bufdata, bd_le);
462 	struct gfs2_trans *tr = current->journal_info;
463 	struct address_space *mapping = bd->bd_bh->b_page->mapping;
464 	struct gfs2_inode *ip = GFS2_I(mapping->host);
465 
466 	tr->tr_touched = 1;
467 	if (!list_empty(&bd->bd_list_tr) &&
468 	    (ip->i_di.di_flags & GFS2_DIF_JDATA)) {
469 		tr->tr_num_buf++;
470 		gfs2_trans_add_gl(bd->bd_gl);
471 		list_add(&bd->bd_list_tr, &tr->tr_list_buf);
472 		gfs2_pin(sdp, bd->bd_bh);
473 		tr->tr_num_buf_new++;
474 	}
475 	gfs2_log_lock(sdp);
476 	if (!list_empty(&le->le_list)) {
477 		if (ip->i_di.di_flags & GFS2_DIF_JDATA)
478 			sdp->sd_log_num_jdata++;
479 		sdp->sd_log_num_databuf++;
480 		list_add(&le->le_list, &sdp->sd_log_le_databuf);
481 	}
482 	gfs2_log_unlock(sdp);
483 }
484 
485 static int gfs2_check_magic(struct buffer_head *bh)
486 {
487 	struct page *page = bh->b_page;
488 	void *kaddr;
489 	__be32 *ptr;
490 	int rv = 0;
491 
492 	kaddr = kmap_atomic(page, KM_USER0);
493 	ptr = kaddr + bh_offset(bh);
494 	if (*ptr == cpu_to_be32(GFS2_MAGIC))
495 		rv = 1;
496 	kunmap_atomic(page, KM_USER0);
497 
498 	return rv;
499 }
500 
501 /**
502  * databuf_lo_before_commit - Scan the data buffers, writing as we go
503  *
504  * Here we scan through the lists of buffers and make the assumption
505  * that any buffer thats been pinned is being journaled, and that
506  * any unpinned buffer is an ordered write data buffer and therefore
507  * will be written back rather than journaled.
508  */
509 static void databuf_lo_before_commit(struct gfs2_sbd *sdp)
510 {
511 	LIST_HEAD(started);
512 	struct gfs2_bufdata *bd1 = NULL, *bd2, *bdt;
513 	struct buffer_head *bh = NULL;
514 	unsigned int offset = sizeof(struct gfs2_log_descriptor);
515 	struct gfs2_log_descriptor *ld;
516 	unsigned int limit;
517 	unsigned int total_dbuf = sdp->sd_log_num_databuf;
518 	unsigned int total_jdata = sdp->sd_log_num_jdata;
519 	unsigned int num, n;
520 	__be64 *ptr = NULL;
521 
522 	offset += (2*sizeof(__be64) - 1);
523 	offset &= ~(2*sizeof(__be64) - 1);
524 	limit = (sdp->sd_sb.sb_bsize - offset)/sizeof(__be64);
525 
526 	/*
527 	 * Start writing ordered buffers, write journaled buffers
528 	 * into the log along with a header
529 	 */
530 	gfs2_log_lock(sdp);
531 	bd2 = bd1 = list_prepare_entry(bd1, &sdp->sd_log_le_databuf,
532 				       bd_le.le_list);
533 	while(total_dbuf) {
534 		num = total_jdata;
535 		if (num > limit)
536 			num = limit;
537 		n = 0;
538 		list_for_each_entry_safe_continue(bd1, bdt,
539 						  &sdp->sd_log_le_databuf,
540 						  bd_le.le_list) {
541 			/* An ordered write buffer */
542 			if (bd1->bd_bh && !buffer_pinned(bd1->bd_bh)) {
543 				list_move(&bd1->bd_le.le_list, &started);
544 				if (bd1 == bd2) {
545 					bd2 = NULL;
546 					bd2 = list_prepare_entry(bd2,
547 							&sdp->sd_log_le_databuf,
548 							bd_le.le_list);
549 				}
550 				total_dbuf--;
551 				if (bd1->bd_bh) {
552 					get_bh(bd1->bd_bh);
553 					if (buffer_dirty(bd1->bd_bh)) {
554 						gfs2_log_unlock(sdp);
555 						wait_on_buffer(bd1->bd_bh);
556 						ll_rw_block(WRITE, 1,
557 							    &bd1->bd_bh);
558 						gfs2_log_lock(sdp);
559 					}
560 					brelse(bd1->bd_bh);
561 					continue;
562 				}
563 				continue;
564 			} else if (bd1->bd_bh) { /* A journaled buffer */
565 				int magic;
566 				gfs2_log_unlock(sdp);
567 				if (!bh) {
568 					bh = gfs2_log_get_buf(sdp);
569 					sdp->sd_log_num_hdrs++;
570 					ld = (struct gfs2_log_descriptor *)
571 					     bh->b_data;
572 					ptr = (__be64 *)(bh->b_data + offset);
573 					ld->ld_header.mh_magic =
574 						cpu_to_be32(GFS2_MAGIC);
575 					ld->ld_header.mh_type =
576 						cpu_to_be32(GFS2_METATYPE_LD);
577 					ld->ld_header.mh_format =
578 						cpu_to_be32(GFS2_FORMAT_LD);
579 					ld->ld_type =
580 						cpu_to_be32(GFS2_LOG_DESC_JDATA);
581 					ld->ld_length = cpu_to_be32(num + 1);
582 					ld->ld_data1 = cpu_to_be32(num);
583 					ld->ld_data2 = cpu_to_be32(0);
584 					memset(ld->ld_reserved, 0, sizeof(ld->ld_reserved));
585 				}
586 				magic = gfs2_check_magic(bd1->bd_bh);
587 				*ptr++ = cpu_to_be64(bd1->bd_bh->b_blocknr);
588 				*ptr++ = cpu_to_be64((__u64)magic);
589 				clear_buffer_escaped(bd1->bd_bh);
590 				if (unlikely(magic != 0))
591 					set_buffer_escaped(bd1->bd_bh);
592 				gfs2_log_lock(sdp);
593 				if (n++ > num)
594 					break;
595 			}
596 		}
597 		gfs2_log_unlock(sdp);
598 		if (bh) {
599 			set_buffer_dirty(bh);
600 			ll_rw_block(WRITE, 1, &bh);
601 			bh = NULL;
602 		}
603 		n = 0;
604 		gfs2_log_lock(sdp);
605 		list_for_each_entry_continue(bd2, &sdp->sd_log_le_databuf,
606 					     bd_le.le_list) {
607 			if (!bd2->bd_bh)
608 				continue;
609 			/* copy buffer if it needs escaping */
610 			gfs2_log_unlock(sdp);
611 			if (unlikely(buffer_escaped(bd2->bd_bh))) {
612 				void *kaddr;
613 				struct page *page = bd2->bd_bh->b_page;
614 				bh = gfs2_log_get_buf(sdp);
615 				kaddr = kmap_atomic(page, KM_USER0);
616 				memcpy(bh->b_data,
617 				       kaddr + bh_offset(bd2->bd_bh),
618 				       sdp->sd_sb.sb_bsize);
619 				kunmap_atomic(page, KM_USER0);
620 				*(__be32 *)bh->b_data = 0;
621 			} else {
622 				bh = gfs2_log_fake_buf(sdp, bd2->bd_bh);
623 			}
624 			set_buffer_dirty(bh);
625 			ll_rw_block(WRITE, 1, &bh);
626 			gfs2_log_lock(sdp);
627 			if (++n >= num)
628 				break;
629 		}
630 		bh = NULL;
631 		total_dbuf -= num;
632 		total_jdata -= num;
633 	}
634 	gfs2_log_unlock(sdp);
635 
636 	/* Wait on all ordered buffers */
637 	while (!list_empty(&started)) {
638 		gfs2_log_lock(sdp);
639 		bd1 = list_entry(started.next, struct gfs2_bufdata,
640 				 bd_le.le_list);
641 		list_del(&bd1->bd_le.le_list);
642 		sdp->sd_log_num_databuf--;
643 
644 		bh = bd1->bd_bh;
645 		if (bh) {
646 			bh->b_private = NULL;
647 			gfs2_log_unlock(sdp);
648 			wait_on_buffer(bh);
649 			brelse(bh);
650 		} else
651 			gfs2_log_unlock(sdp);
652 
653 		kfree(bd1);
654 	}
655 
656 	/* We've removed all the ordered write bufs here, so only jdata left */
657 	gfs2_assert_warn(sdp, sdp->sd_log_num_databuf == sdp->sd_log_num_jdata);
658 }
659 
660 static int databuf_lo_scan_elements(struct gfs2_jdesc *jd, unsigned int start,
661 				    struct gfs2_log_descriptor *ld,
662 				    __be64 *ptr, int pass)
663 {
664 	struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
665 	struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
666 	struct gfs2_glock *gl = ip->i_gl;
667 	unsigned int blks = be32_to_cpu(ld->ld_data1);
668 	struct buffer_head *bh_log, *bh_ip;
669 	uint64_t blkno;
670 	uint64_t esc;
671 	int error = 0;
672 
673 	if (pass != 1 || be32_to_cpu(ld->ld_type) != GFS2_LOG_DESC_JDATA)
674 		return 0;
675 
676 	gfs2_replay_incr_blk(sdp, &start);
677 	for (; blks; gfs2_replay_incr_blk(sdp, &start), blks--) {
678 		blkno = be64_to_cpu(*ptr++);
679 		esc = be64_to_cpu(*ptr++);
680 
681 		sdp->sd_found_blocks++;
682 
683 		if (gfs2_revoke_check(sdp, blkno, start))
684 			continue;
685 
686 		error = gfs2_replay_read_block(jd, start, &bh_log);
687 		if (error)
688 			return error;
689 
690 		bh_ip = gfs2_meta_new(gl, blkno);
691 		memcpy(bh_ip->b_data, bh_log->b_data, bh_log->b_size);
692 
693 		/* Unescape */
694 		if (esc) {
695 			__be32 *eptr = (__be32 *)bh_ip->b_data;
696 			*eptr = cpu_to_be32(GFS2_MAGIC);
697 		}
698 		mark_buffer_dirty(bh_ip);
699 
700 		brelse(bh_log);
701 		brelse(bh_ip);
702 		if (error)
703 			break;
704 
705 		sdp->sd_replayed_blocks++;
706 	}
707 
708 	return error;
709 }
710 
711 /* FIXME: sort out accounting for log blocks etc. */
712 
713 static void databuf_lo_after_scan(struct gfs2_jdesc *jd, int error, int pass)
714 {
715 	struct gfs2_inode *ip = GFS2_I(jd->jd_inode);
716 	struct gfs2_sbd *sdp = GFS2_SB(jd->jd_inode);
717 
718 	if (error) {
719 		gfs2_meta_sync(ip->i_gl,
720 			       DIO_START | DIO_WAIT);
721 		return;
722 	}
723 	if (pass != 1)
724 		return;
725 
726 	/* data sync? */
727 	gfs2_meta_sync(ip->i_gl, DIO_START | DIO_WAIT);
728 
729 	fs_info(sdp, "jid=%u: Replayed %u of %u data blocks\n",
730 		jd->jd_jid, sdp->sd_replayed_blocks, sdp->sd_found_blocks);
731 }
732 
733 static void databuf_lo_after_commit(struct gfs2_sbd *sdp, struct gfs2_ail *ai)
734 {
735 	struct list_head *head = &sdp->sd_log_le_databuf;
736 	struct gfs2_bufdata *bd;
737 
738 	while (!list_empty(head)) {
739 		bd = list_entry(head->next, struct gfs2_bufdata, bd_le.le_list);
740 		list_del(&bd->bd_le.le_list);
741 		sdp->sd_log_num_databuf--;
742 		sdp->sd_log_num_jdata--;
743 		gfs2_unpin(sdp, bd->bd_bh, ai);
744 	}
745 	gfs2_assert_warn(sdp, !sdp->sd_log_num_databuf);
746 	gfs2_assert_warn(sdp, !sdp->sd_log_num_jdata);
747 }
748 
749 
750 const struct gfs2_log_operations gfs2_glock_lops = {
751 	.lo_add = glock_lo_add,
752 	.lo_after_commit = glock_lo_after_commit,
753 	.lo_name = "glock"
754 };
755 
756 const struct gfs2_log_operations gfs2_buf_lops = {
757 	.lo_add = buf_lo_add,
758 	.lo_incore_commit = buf_lo_incore_commit,
759 	.lo_before_commit = buf_lo_before_commit,
760 	.lo_after_commit = buf_lo_after_commit,
761 	.lo_before_scan = buf_lo_before_scan,
762 	.lo_scan_elements = buf_lo_scan_elements,
763 	.lo_after_scan = buf_lo_after_scan,
764 	.lo_name = "buf"
765 };
766 
767 const struct gfs2_log_operations gfs2_revoke_lops = {
768 	.lo_add = revoke_lo_add,
769 	.lo_before_commit = revoke_lo_before_commit,
770 	.lo_before_scan = revoke_lo_before_scan,
771 	.lo_scan_elements = revoke_lo_scan_elements,
772 	.lo_after_scan = revoke_lo_after_scan,
773 	.lo_name = "revoke"
774 };
775 
776 const struct gfs2_log_operations gfs2_rg_lops = {
777 	.lo_add = rg_lo_add,
778 	.lo_after_commit = rg_lo_after_commit,
779 	.lo_name = "rg"
780 };
781 
782 const struct gfs2_log_operations gfs2_databuf_lops = {
783 	.lo_add = databuf_lo_add,
784 	.lo_incore_commit = buf_lo_incore_commit,
785 	.lo_before_commit = databuf_lo_before_commit,
786 	.lo_after_commit = databuf_lo_after_commit,
787 	.lo_scan_elements = databuf_lo_scan_elements,
788 	.lo_after_scan = databuf_lo_after_scan,
789 	.lo_name = "databuf"
790 };
791 
792 const struct gfs2_log_operations *gfs2_log_ops[] = {
793 	&gfs2_glock_lops,
794 	&gfs2_buf_lops,
795 	&gfs2_revoke_lops,
796 	&gfs2_rg_lops,
797 	&gfs2_databuf_lops,
798 	NULL
799 };
800 
801