xref: /openbmc/qemu/util/qht.c (revision cb764d06)
1 /*
2  * qht.c - QEMU Hash Table, designed to scale for read-mostly workloads.
3  *
4  * Copyright (C) 2016, Emilio G. Cota <cota@braap.org>
5  *
6  * License: GNU GPL, version 2 or later.
7  *   See the COPYING file in the top-level directory.
8  *
9  * Assumptions:
10  * - NULL cannot be inserted/removed as a pointer value.
11  * - Trying to insert an already-existing hash-pointer pair is OK. However,
12  *   it is not OK to insert into the same hash table different hash-pointer
13  *   pairs that have the same pointer value, but not the hashes.
14  * - Lookups are performed under an RCU read-critical section; removals
15  *   must wait for a grace period to elapse before freeing removed objects.
16  *
17  * Features:
18  * - Reads (i.e. lookups and iterators) can be concurrent with other reads.
19  *   Lookups that are concurrent with writes to the same bucket will retry
20  *   via a seqlock; iterators acquire all bucket locks and therefore can be
21  *   concurrent with lookups and are serialized wrt writers.
22  * - Writes (i.e. insertions/removals) can be concurrent with writes to
23  *   different buckets; writes to the same bucket are serialized through a lock.
24  * - Optional auto-resizing: the hash table resizes up if the load surpasses
25  *   a certain threshold. Resizing is done concurrently with readers; writes
26  *   are serialized with the resize operation.
27  *
28  * The key structure is the bucket, which is cacheline-sized. Buckets
29  * contain a few hash values and pointers; the u32 hash values are stored in
30  * full so that resizing is fast. Having this structure instead of directly
31  * chaining items has two advantages:
32  * - Failed lookups fail fast, and touch a minimum number of cache lines.
33  * - Resizing the hash table with concurrent lookups is easy.
34  *
35  * There are two types of buckets:
36  * 1. "head" buckets are the ones allocated in the array of buckets in qht_map.
37  * 2. all "non-head" buckets (i.e. all others) are members of a chain that
38  *    starts from a head bucket.
39  * Note that the seqlock and spinlock of a head bucket applies to all buckets
40  * chained to it; these two fields are unused in non-head buckets.
41  *
42  * On removals, we move the last valid item in the chain to the position of the
43  * just-removed entry. This makes lookups slightly faster, since the moment an
44  * invalid entry is found, the (failed) lookup is over.
45  *
46  * Resizing is done by taking all bucket spinlocks (so that no other writers can
47  * race with us) and then copying all entries into a new hash map. Then, the
48  * ht->map pointer is set, and the old map is freed once no RCU readers can see
49  * it anymore.
50  *
51  * Writers check for concurrent resizes by comparing ht->map before and after
52  * acquiring their bucket lock. If they don't match, a resize has occured
53  * while the bucket spinlock was being acquired.
54  *
55  * Related Work:
56  * - Idea of cacheline-sized buckets with full hashes taken from:
57  *   David, Guerraoui & Trigonakis, "Asynchronized Concurrency:
58  *   The Secret to Scaling Concurrent Search Data Structures", ASPLOS'15.
59  * - Why not RCU-based hash tables? They would allow us to get rid of the
60  *   seqlock, but resizing would take forever since RCU read critical
61  *   sections in QEMU take quite a long time.
62  *   More info on relativistic hash tables:
63  *   + Triplett, McKenney & Walpole, "Resizable, Scalable, Concurrent Hash
64  *     Tables via Relativistic Programming", USENIX ATC'11.
65  *   + Corbet, "Relativistic hash tables, part 1: Algorithms", @ lwn.net, 2014.
66  *     https://lwn.net/Articles/612021/
67  */
68 #include "qemu/osdep.h"
69 #include "qemu/qht.h"
70 #include "qemu/atomic.h"
71 #include "qemu/rcu.h"
72 
73 //#define QHT_DEBUG
74 
75 /*
76  * We want to avoid false sharing of cache lines. Most systems have 64-byte
77  * cache lines so we go with it for simplicity.
78  *
79  * Note that systems with smaller cache lines will be fine (the struct is
80  * almost 64-bytes); systems with larger cache lines might suffer from
81  * some false sharing.
82  */
83 #define QHT_BUCKET_ALIGN 64
84 
85 /* define these to keep sizeof(qht_bucket) within QHT_BUCKET_ALIGN */
86 #if HOST_LONG_BITS == 32
87 #define QHT_BUCKET_ENTRIES 6
88 #else /* 64-bit */
89 #define QHT_BUCKET_ENTRIES 4
90 #endif
91 
92 /*
93  * Do _not_ use qemu_mutex_[try]lock directly! Use these macros, otherwise
94  * the profiler (QSP) will deadlock.
95  */
96 static inline void qht_lock(struct qht *ht)
97 {
98     if (ht->mode & QHT_MODE_RAW_MUTEXES) {
99         qemu_mutex_lock__raw(&ht->lock);
100     } else {
101         qemu_mutex_lock(&ht->lock);
102     }
103 }
104 
105 static inline int qht_trylock(struct qht *ht)
106 {
107     if (ht->mode & QHT_MODE_RAW_MUTEXES) {
108         return qemu_mutex_trylock__raw(&(ht)->lock);
109     }
110     return qemu_mutex_trylock(&(ht)->lock);
111 }
112 
113 /* this inline is not really necessary, but it helps keep code consistent */
114 static inline void qht_unlock(struct qht *ht)
115 {
116     qemu_mutex_unlock(&ht->lock);
117 }
118 
119 /*
120  * Note: reading partially-updated pointers in @pointers could lead to
121  * segfaults. We thus access them with atomic_read/set; this guarantees
122  * that the compiler makes all those accesses atomic. We also need the
123  * volatile-like behavior in atomic_read, since otherwise the compiler
124  * might refetch the pointer.
125  * atomic_read's are of course not necessary when the bucket lock is held.
126  *
127  * If both ht->lock and b->lock are grabbed, ht->lock should always
128  * be grabbed first.
129  */
130 struct qht_bucket {
131     QemuSpin lock;
132     QemuSeqLock sequence;
133     uint32_t hashes[QHT_BUCKET_ENTRIES];
134     void *pointers[QHT_BUCKET_ENTRIES];
135     struct qht_bucket *next;
136 } QEMU_ALIGNED(QHT_BUCKET_ALIGN);
137 
138 QEMU_BUILD_BUG_ON(sizeof(struct qht_bucket) > QHT_BUCKET_ALIGN);
139 
140 /**
141  * struct qht_map - structure to track an array of buckets
142  * @rcu: used by RCU. Keep it as the top field in the struct to help valgrind
143  *       find the whole struct.
144  * @buckets: array of head buckets. It is constant once the map is created.
145  * @n_buckets: number of head buckets. It is constant once the map is created.
146  * @n_added_buckets: number of added (i.e. "non-head") buckets
147  * @n_added_buckets_threshold: threshold to trigger an upward resize once the
148  *                             number of added buckets surpasses it.
149  *
150  * Buckets are tracked in what we call a "map", i.e. this structure.
151  */
152 struct qht_map {
153     struct rcu_head rcu;
154     struct qht_bucket *buckets;
155     size_t n_buckets;
156     size_t n_added_buckets;
157     size_t n_added_buckets_threshold;
158 };
159 
160 /* trigger a resize when n_added_buckets > n_buckets / div */
161 #define QHT_NR_ADDED_BUCKETS_THRESHOLD_DIV 8
162 
163 static void qht_do_resize_reset(struct qht *ht, struct qht_map *new,
164                                 bool reset);
165 static void qht_grow_maybe(struct qht *ht);
166 
167 #ifdef QHT_DEBUG
168 
169 #define qht_debug_assert(X) do { assert(X); } while (0)
170 
171 static void qht_bucket_debug__locked(struct qht_bucket *b)
172 {
173     bool seen_empty = false;
174     bool corrupt = false;
175     int i;
176 
177     do {
178         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
179             if (b->pointers[i] == NULL) {
180                 seen_empty = true;
181                 continue;
182             }
183             if (seen_empty) {
184                 fprintf(stderr, "%s: b: %p, pos: %i, hash: 0x%x, p: %p\n",
185                         __func__, b, i, b->hashes[i], b->pointers[i]);
186                 corrupt = true;
187             }
188         }
189         b = b->next;
190     } while (b);
191     qht_debug_assert(!corrupt);
192 }
193 
194 static void qht_map_debug__all_locked(struct qht_map *map)
195 {
196     int i;
197 
198     for (i = 0; i < map->n_buckets; i++) {
199         qht_bucket_debug__locked(&map->buckets[i]);
200     }
201 }
202 #else
203 
204 #define qht_debug_assert(X) do { (void)(X); } while (0)
205 
206 static inline void qht_bucket_debug__locked(struct qht_bucket *b)
207 { }
208 
209 static inline void qht_map_debug__all_locked(struct qht_map *map)
210 { }
211 #endif /* QHT_DEBUG */
212 
213 static inline size_t qht_elems_to_buckets(size_t n_elems)
214 {
215     return pow2ceil(n_elems / QHT_BUCKET_ENTRIES);
216 }
217 
218 static inline void qht_head_init(struct qht_bucket *b)
219 {
220     memset(b, 0, sizeof(*b));
221     qemu_spin_init(&b->lock);
222     seqlock_init(&b->sequence);
223 }
224 
225 static inline
226 struct qht_bucket *qht_map_to_bucket(struct qht_map *map, uint32_t hash)
227 {
228     return &map->buckets[hash & (map->n_buckets - 1)];
229 }
230 
231 /* acquire all bucket locks from a map */
232 static void qht_map_lock_buckets(struct qht_map *map)
233 {
234     size_t i;
235 
236     for (i = 0; i < map->n_buckets; i++) {
237         struct qht_bucket *b = &map->buckets[i];
238 
239         qemu_spin_lock(&b->lock);
240     }
241 }
242 
243 static void qht_map_unlock_buckets(struct qht_map *map)
244 {
245     size_t i;
246 
247     for (i = 0; i < map->n_buckets; i++) {
248         struct qht_bucket *b = &map->buckets[i];
249 
250         qemu_spin_unlock(&b->lock);
251     }
252 }
253 
254 /*
255  * Call with at least a bucket lock held.
256  * @map should be the value read before acquiring the lock (or locks).
257  */
258 static inline bool qht_map_is_stale__locked(struct qht *ht, struct qht_map *map)
259 {
260     return map != ht->map;
261 }
262 
263 /*
264  * Grab all bucket locks, and set @pmap after making sure the map isn't stale.
265  *
266  * Pairs with qht_map_unlock_buckets(), hence the pass-by-reference.
267  *
268  * Note: callers cannot have ht->lock held.
269  */
270 static inline
271 void qht_map_lock_buckets__no_stale(struct qht *ht, struct qht_map **pmap)
272 {
273     struct qht_map *map;
274 
275     map = atomic_rcu_read(&ht->map);
276     qht_map_lock_buckets(map);
277     if (likely(!qht_map_is_stale__locked(ht, map))) {
278         *pmap = map;
279         return;
280     }
281     qht_map_unlock_buckets(map);
282 
283     /* we raced with a resize; acquire ht->lock to see the updated ht->map */
284     qht_lock(ht);
285     map = ht->map;
286     qht_map_lock_buckets(map);
287     qht_unlock(ht);
288     *pmap = map;
289     return;
290 }
291 
292 /*
293  * Get a head bucket and lock it, making sure its parent map is not stale.
294  * @pmap is filled with a pointer to the bucket's parent map.
295  *
296  * Unlock with qemu_spin_unlock(&b->lock).
297  *
298  * Note: callers cannot have ht->lock held.
299  */
300 static inline
301 struct qht_bucket *qht_bucket_lock__no_stale(struct qht *ht, uint32_t hash,
302                                              struct qht_map **pmap)
303 {
304     struct qht_bucket *b;
305     struct qht_map *map;
306 
307     map = atomic_rcu_read(&ht->map);
308     b = qht_map_to_bucket(map, hash);
309 
310     qemu_spin_lock(&b->lock);
311     if (likely(!qht_map_is_stale__locked(ht, map))) {
312         *pmap = map;
313         return b;
314     }
315     qemu_spin_unlock(&b->lock);
316 
317     /* we raced with a resize; acquire ht->lock to see the updated ht->map */
318     qht_lock(ht);
319     map = ht->map;
320     b = qht_map_to_bucket(map, hash);
321     qemu_spin_lock(&b->lock);
322     qht_unlock(ht);
323     *pmap = map;
324     return b;
325 }
326 
327 static inline bool qht_map_needs_resize(struct qht_map *map)
328 {
329     return atomic_read(&map->n_added_buckets) > map->n_added_buckets_threshold;
330 }
331 
332 static inline void qht_chain_destroy(struct qht_bucket *head)
333 {
334     struct qht_bucket *curr = head->next;
335     struct qht_bucket *prev;
336 
337     while (curr) {
338         prev = curr;
339         curr = curr->next;
340         qemu_vfree(prev);
341     }
342 }
343 
344 /* pass only an orphan map */
345 static void qht_map_destroy(struct qht_map *map)
346 {
347     size_t i;
348 
349     for (i = 0; i < map->n_buckets; i++) {
350         qht_chain_destroy(&map->buckets[i]);
351     }
352     qemu_vfree(map->buckets);
353     g_free(map);
354 }
355 
356 static struct qht_map *qht_map_create(size_t n_buckets)
357 {
358     struct qht_map *map;
359     size_t i;
360 
361     map = g_malloc(sizeof(*map));
362     map->n_buckets = n_buckets;
363 
364     map->n_added_buckets = 0;
365     map->n_added_buckets_threshold = n_buckets /
366         QHT_NR_ADDED_BUCKETS_THRESHOLD_DIV;
367 
368     /* let tiny hash tables to at least add one non-head bucket */
369     if (unlikely(map->n_added_buckets_threshold == 0)) {
370         map->n_added_buckets_threshold = 1;
371     }
372 
373     map->buckets = qemu_memalign(QHT_BUCKET_ALIGN,
374                                  sizeof(*map->buckets) * n_buckets);
375     for (i = 0; i < n_buckets; i++) {
376         qht_head_init(&map->buckets[i]);
377     }
378     return map;
379 }
380 
381 void qht_init(struct qht *ht, qht_cmp_func_t cmp, size_t n_elems,
382               unsigned int mode)
383 {
384     struct qht_map *map;
385     size_t n_buckets = qht_elems_to_buckets(n_elems);
386 
387     g_assert(cmp);
388     ht->cmp = cmp;
389     ht->mode = mode;
390     qemu_mutex_init(&ht->lock);
391     map = qht_map_create(n_buckets);
392     atomic_rcu_set(&ht->map, map);
393 }
394 
395 /* call only when there are no readers/writers left */
396 void qht_destroy(struct qht *ht)
397 {
398     qht_map_destroy(ht->map);
399     memset(ht, 0, sizeof(*ht));
400 }
401 
402 static void qht_bucket_reset__locked(struct qht_bucket *head)
403 {
404     struct qht_bucket *b = head;
405     int i;
406 
407     seqlock_write_begin(&head->sequence);
408     do {
409         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
410             if (b->pointers[i] == NULL) {
411                 goto done;
412             }
413             atomic_set(&b->hashes[i], 0);
414             atomic_set(&b->pointers[i], NULL);
415         }
416         b = b->next;
417     } while (b);
418  done:
419     seqlock_write_end(&head->sequence);
420 }
421 
422 /* call with all bucket locks held */
423 static void qht_map_reset__all_locked(struct qht_map *map)
424 {
425     size_t i;
426 
427     for (i = 0; i < map->n_buckets; i++) {
428         qht_bucket_reset__locked(&map->buckets[i]);
429     }
430     qht_map_debug__all_locked(map);
431 }
432 
433 void qht_reset(struct qht *ht)
434 {
435     struct qht_map *map;
436 
437     qht_map_lock_buckets__no_stale(ht, &map);
438     qht_map_reset__all_locked(map);
439     qht_map_unlock_buckets(map);
440 }
441 
442 static inline void qht_do_resize(struct qht *ht, struct qht_map *new)
443 {
444     qht_do_resize_reset(ht, new, false);
445 }
446 
447 static inline void qht_do_resize_and_reset(struct qht *ht, struct qht_map *new)
448 {
449     qht_do_resize_reset(ht, new, true);
450 }
451 
452 bool qht_reset_size(struct qht *ht, size_t n_elems)
453 {
454     struct qht_map *new = NULL;
455     struct qht_map *map;
456     size_t n_buckets;
457 
458     n_buckets = qht_elems_to_buckets(n_elems);
459 
460     qht_lock(ht);
461     map = ht->map;
462     if (n_buckets != map->n_buckets) {
463         new = qht_map_create(n_buckets);
464     }
465     qht_do_resize_and_reset(ht, new);
466     qht_unlock(ht);
467 
468     return !!new;
469 }
470 
471 static inline
472 void *qht_do_lookup(struct qht_bucket *head, qht_lookup_func_t func,
473                     const void *userp, uint32_t hash)
474 {
475     struct qht_bucket *b = head;
476     int i;
477 
478     do {
479         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
480             if (atomic_read(&b->hashes[i]) == hash) {
481                 /* The pointer is dereferenced before seqlock_read_retry,
482                  * so (unlike qht_insert__locked) we need to use
483                  * atomic_rcu_read here.
484                  */
485                 void *p = atomic_rcu_read(&b->pointers[i]);
486 
487                 if (likely(p) && likely(func(p, userp))) {
488                     return p;
489                 }
490             }
491         }
492         b = atomic_rcu_read(&b->next);
493     } while (b);
494 
495     return NULL;
496 }
497 
498 static __attribute__((noinline))
499 void *qht_lookup__slowpath(struct qht_bucket *b, qht_lookup_func_t func,
500                            const void *userp, uint32_t hash)
501 {
502     unsigned int version;
503     void *ret;
504 
505     do {
506         version = seqlock_read_begin(&b->sequence);
507         ret = qht_do_lookup(b, func, userp, hash);
508     } while (seqlock_read_retry(&b->sequence, version));
509     return ret;
510 }
511 
512 void *qht_lookup_custom(struct qht *ht, const void *userp, uint32_t hash,
513                         qht_lookup_func_t func)
514 {
515     struct qht_bucket *b;
516     struct qht_map *map;
517     unsigned int version;
518     void *ret;
519 
520     map = atomic_rcu_read(&ht->map);
521     b = qht_map_to_bucket(map, hash);
522 
523     version = seqlock_read_begin(&b->sequence);
524     ret = qht_do_lookup(b, func, userp, hash);
525     if (likely(!seqlock_read_retry(&b->sequence, version))) {
526         return ret;
527     }
528     /*
529      * Removing the do/while from the fastpath gives a 4% perf. increase when
530      * running a 100%-lookup microbenchmark.
531      */
532     return qht_lookup__slowpath(b, func, userp, hash);
533 }
534 
535 void *qht_lookup(struct qht *ht, const void *userp, uint32_t hash)
536 {
537     return qht_lookup_custom(ht, userp, hash, ht->cmp);
538 }
539 
540 /* call with head->lock held */
541 static void *qht_insert__locked(struct qht *ht, struct qht_map *map,
542                                 struct qht_bucket *head, void *p, uint32_t hash,
543                                 bool *needs_resize)
544 {
545     struct qht_bucket *b = head;
546     struct qht_bucket *prev = NULL;
547     struct qht_bucket *new = NULL;
548     int i;
549 
550     do {
551         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
552             if (b->pointers[i]) {
553                 if (unlikely(b->hashes[i] == hash &&
554                              ht->cmp(b->pointers[i], p))) {
555                     return b->pointers[i];
556                 }
557             } else {
558                 goto found;
559             }
560         }
561         prev = b;
562         b = b->next;
563     } while (b);
564 
565     b = qemu_memalign(QHT_BUCKET_ALIGN, sizeof(*b));
566     memset(b, 0, sizeof(*b));
567     new = b;
568     i = 0;
569     atomic_inc(&map->n_added_buckets);
570     if (unlikely(qht_map_needs_resize(map)) && needs_resize) {
571         *needs_resize = true;
572     }
573 
574  found:
575     /* found an empty key: acquire the seqlock and write */
576     seqlock_write_begin(&head->sequence);
577     if (new) {
578         atomic_rcu_set(&prev->next, b);
579     }
580     /* smp_wmb() implicit in seqlock_write_begin.  */
581     atomic_set(&b->hashes[i], hash);
582     atomic_set(&b->pointers[i], p);
583     seqlock_write_end(&head->sequence);
584     return NULL;
585 }
586 
587 static __attribute__((noinline)) void qht_grow_maybe(struct qht *ht)
588 {
589     struct qht_map *map;
590 
591     /*
592      * If the lock is taken it probably means there's an ongoing resize,
593      * so bail out.
594      */
595     if (qht_trylock(ht)) {
596         return;
597     }
598     map = ht->map;
599     /* another thread might have just performed the resize we were after */
600     if (qht_map_needs_resize(map)) {
601         struct qht_map *new = qht_map_create(map->n_buckets * 2);
602 
603         qht_do_resize(ht, new);
604     }
605     qht_unlock(ht);
606 }
607 
608 bool qht_insert(struct qht *ht, void *p, uint32_t hash, void **existing)
609 {
610     struct qht_bucket *b;
611     struct qht_map *map;
612     bool needs_resize = false;
613     void *prev;
614 
615     /* NULL pointers are not supported */
616     qht_debug_assert(p);
617 
618     b = qht_bucket_lock__no_stale(ht, hash, &map);
619     prev = qht_insert__locked(ht, map, b, p, hash, &needs_resize);
620     qht_bucket_debug__locked(b);
621     qemu_spin_unlock(&b->lock);
622 
623     if (unlikely(needs_resize) && ht->mode & QHT_MODE_AUTO_RESIZE) {
624         qht_grow_maybe(ht);
625     }
626     if (likely(prev == NULL)) {
627         return true;
628     }
629     if (existing) {
630         *existing = prev;
631     }
632     return false;
633 }
634 
635 static inline bool qht_entry_is_last(struct qht_bucket *b, int pos)
636 {
637     if (pos == QHT_BUCKET_ENTRIES - 1) {
638         if (b->next == NULL) {
639             return true;
640         }
641         return b->next->pointers[0] == NULL;
642     }
643     return b->pointers[pos + 1] == NULL;
644 }
645 
646 static void
647 qht_entry_move(struct qht_bucket *to, int i, struct qht_bucket *from, int j)
648 {
649     qht_debug_assert(!(to == from && i == j));
650     qht_debug_assert(to->pointers[i]);
651     qht_debug_assert(from->pointers[j]);
652 
653     atomic_set(&to->hashes[i], from->hashes[j]);
654     atomic_set(&to->pointers[i], from->pointers[j]);
655 
656     atomic_set(&from->hashes[j], 0);
657     atomic_set(&from->pointers[j], NULL);
658 }
659 
660 /*
661  * Find the last valid entry in @head, and swap it with @orig[pos], which has
662  * just been invalidated.
663  */
664 static inline void qht_bucket_remove_entry(struct qht_bucket *orig, int pos)
665 {
666     struct qht_bucket *b = orig;
667     struct qht_bucket *prev = NULL;
668     int i;
669 
670     if (qht_entry_is_last(orig, pos)) {
671         orig->hashes[pos] = 0;
672         atomic_set(&orig->pointers[pos], NULL);
673         return;
674     }
675     do {
676         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
677             if (b->pointers[i]) {
678                 continue;
679             }
680             if (i > 0) {
681                 return qht_entry_move(orig, pos, b, i - 1);
682             }
683             qht_debug_assert(prev);
684             return qht_entry_move(orig, pos, prev, QHT_BUCKET_ENTRIES - 1);
685         }
686         prev = b;
687         b = b->next;
688     } while (b);
689     /* no free entries other than orig[pos], so swap it with the last one */
690     qht_entry_move(orig, pos, prev, QHT_BUCKET_ENTRIES - 1);
691 }
692 
693 /* call with b->lock held */
694 static inline
695 bool qht_remove__locked(struct qht_map *map, struct qht_bucket *head,
696                         const void *p, uint32_t hash)
697 {
698     struct qht_bucket *b = head;
699     int i;
700 
701     do {
702         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
703             void *q = b->pointers[i];
704 
705             if (unlikely(q == NULL)) {
706                 return false;
707             }
708             if (q == p) {
709                 qht_debug_assert(b->hashes[i] == hash);
710                 seqlock_write_begin(&head->sequence);
711                 qht_bucket_remove_entry(b, i);
712                 seqlock_write_end(&head->sequence);
713                 return true;
714             }
715         }
716         b = b->next;
717     } while (b);
718     return false;
719 }
720 
721 bool qht_remove(struct qht *ht, const void *p, uint32_t hash)
722 {
723     struct qht_bucket *b;
724     struct qht_map *map;
725     bool ret;
726 
727     /* NULL pointers are not supported */
728     qht_debug_assert(p);
729 
730     b = qht_bucket_lock__no_stale(ht, hash, &map);
731     ret = qht_remove__locked(map, b, p, hash);
732     qht_bucket_debug__locked(b);
733     qemu_spin_unlock(&b->lock);
734     return ret;
735 }
736 
737 static inline void qht_bucket_iter(struct qht *ht, struct qht_bucket *b,
738                                    qht_iter_func_t func, void *userp)
739 {
740     int i;
741 
742     do {
743         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
744             if (b->pointers[i] == NULL) {
745                 return;
746             }
747             func(ht, b->pointers[i], b->hashes[i], userp);
748         }
749         b = b->next;
750     } while (b);
751 }
752 
753 /* call with all of the map's locks held */
754 static inline void qht_map_iter__all_locked(struct qht *ht, struct qht_map *map,
755                                             qht_iter_func_t func, void *userp)
756 {
757     size_t i;
758 
759     for (i = 0; i < map->n_buckets; i++) {
760         qht_bucket_iter(ht, &map->buckets[i], func, userp);
761     }
762 }
763 
764 void qht_iter(struct qht *ht, qht_iter_func_t func, void *userp)
765 {
766     struct qht_map *map;
767 
768     map = atomic_rcu_read(&ht->map);
769     qht_map_lock_buckets(map);
770     /* Note: ht here is merely for carrying ht->mode; ht->map won't be read */
771     qht_map_iter__all_locked(ht, map, func, userp);
772     qht_map_unlock_buckets(map);
773 }
774 
775 static void qht_map_copy(struct qht *ht, void *p, uint32_t hash, void *userp)
776 {
777     struct qht_map *new = userp;
778     struct qht_bucket *b = qht_map_to_bucket(new, hash);
779 
780     /* no need to acquire b->lock because no thread has seen this map yet */
781     qht_insert__locked(ht, new, b, p, hash, NULL);
782 }
783 
784 /*
785  * Atomically perform a resize and/or reset.
786  * Call with ht->lock held.
787  */
788 static void qht_do_resize_reset(struct qht *ht, struct qht_map *new, bool reset)
789 {
790     struct qht_map *old;
791 
792     old = ht->map;
793     qht_map_lock_buckets(old);
794 
795     if (reset) {
796         qht_map_reset__all_locked(old);
797     }
798 
799     if (new == NULL) {
800         qht_map_unlock_buckets(old);
801         return;
802     }
803 
804     g_assert(new->n_buckets != old->n_buckets);
805     qht_map_iter__all_locked(ht, old, qht_map_copy, new);
806     qht_map_debug__all_locked(new);
807 
808     atomic_rcu_set(&ht->map, new);
809     qht_map_unlock_buckets(old);
810     call_rcu(old, qht_map_destroy, rcu);
811 }
812 
813 bool qht_resize(struct qht *ht, size_t n_elems)
814 {
815     size_t n_buckets = qht_elems_to_buckets(n_elems);
816     size_t ret = false;
817 
818     qht_lock(ht);
819     if (n_buckets != ht->map->n_buckets) {
820         struct qht_map *new;
821 
822         new = qht_map_create(n_buckets);
823         qht_do_resize(ht, new);
824         ret = true;
825     }
826     qht_unlock(ht);
827 
828     return ret;
829 }
830 
831 /* pass @stats to qht_statistics_destroy() when done */
832 void qht_statistics_init(struct qht *ht, struct qht_stats *stats)
833 {
834     struct qht_map *map;
835     int i;
836 
837     map = atomic_rcu_read(&ht->map);
838 
839     stats->used_head_buckets = 0;
840     stats->entries = 0;
841     qdist_init(&stats->chain);
842     qdist_init(&stats->occupancy);
843     /* bail out if the qht has not yet been initialized */
844     if (unlikely(map == NULL)) {
845         stats->head_buckets = 0;
846         return;
847     }
848     stats->head_buckets = map->n_buckets;
849 
850     for (i = 0; i < map->n_buckets; i++) {
851         struct qht_bucket *head = &map->buckets[i];
852         struct qht_bucket *b;
853         unsigned int version;
854         size_t buckets;
855         size_t entries;
856         int j;
857 
858         do {
859             version = seqlock_read_begin(&head->sequence);
860             buckets = 0;
861             entries = 0;
862             b = head;
863             do {
864                 for (j = 0; j < QHT_BUCKET_ENTRIES; j++) {
865                     if (atomic_read(&b->pointers[j]) == NULL) {
866                         break;
867                     }
868                     entries++;
869                 }
870                 buckets++;
871                 b = atomic_rcu_read(&b->next);
872             } while (b);
873         } while (seqlock_read_retry(&head->sequence, version));
874 
875         if (entries) {
876             qdist_inc(&stats->chain, buckets);
877             qdist_inc(&stats->occupancy,
878                       (double)entries / QHT_BUCKET_ENTRIES / buckets);
879             stats->used_head_buckets++;
880             stats->entries += entries;
881         } else {
882             qdist_inc(&stats->occupancy, 0);
883         }
884     }
885 }
886 
887 void qht_statistics_destroy(struct qht_stats *stats)
888 {
889     qdist_destroy(&stats->occupancy);
890     qdist_destroy(&stats->chain);
891 }
892