xref: /openbmc/qemu/util/qht.c (revision 2b74dd918007d91f5fee94ad0034b5e7a30ed777)
1 /*
2  * qht.c - QEMU Hash Table, designed to scale for read-mostly workloads.
3  *
4  * Copyright (C) 2016, Emilio G. Cota <cota@braap.org>
5  *
6  * License: GNU GPL, version 2 or later.
7  *   See the COPYING file in the top-level directory.
8  *
9  * Assumptions:
10  * - NULL cannot be inserted/removed as a pointer value.
11  * - Trying to insert an already-existing hash-pointer pair is OK. However,
12  *   it is not OK to insert into the same hash table different hash-pointer
13  *   pairs that have the same pointer value, but not the hashes.
14  * - Lookups are performed under an RCU read-critical section; removals
15  *   must wait for a grace period to elapse before freeing removed objects.
16  *
17  * Features:
18  * - Reads (i.e. lookups and iterators) can be concurrent with other reads.
19  *   Lookups that are concurrent with writes to the same bucket will retry
20  *   via a seqlock; iterators acquire all bucket locks and therefore can be
21  *   concurrent with lookups and are serialized wrt writers.
22  * - Writes (i.e. insertions/removals) can be concurrent with writes to
23  *   different buckets; writes to the same bucket are serialized through a lock.
24  * - Optional auto-resizing: the hash table resizes up if the load surpasses
25  *   a certain threshold. Resizing is done concurrently with readers; writes
26  *   are serialized with the resize operation.
27  *
28  * The key structure is the bucket, which is cacheline-sized. Buckets
29  * contain a few hash values and pointers; the u32 hash values are stored in
30  * full so that resizing is fast. Having this structure instead of directly
31  * chaining items has two advantages:
32  * - Failed lookups fail fast, and touch a minimum number of cache lines.
33  * - Resizing the hash table with concurrent lookups is easy.
34  *
35  * There are two types of buckets:
36  * 1. "head" buckets are the ones allocated in the array of buckets in qht_map.
37  * 2. all "non-head" buckets (i.e. all others) are members of a chain that
38  *    starts from a head bucket.
39  * Note that the seqlock and spinlock of a head bucket applies to all buckets
40  * chained to it; these two fields are unused in non-head buckets.
41  *
42  * On removals, we move the last valid item in the chain to the position of the
43  * just-removed entry. This makes lookups slightly faster, since the moment an
44  * invalid entry is found, the (failed) lookup is over.
45  *
46  * Resizing is done by taking all bucket spinlocks (so that no other writers can
47  * race with us) and then copying all entries into a new hash map. Then, the
48  * ht->map pointer is set, and the old map is freed once no RCU readers can see
49  * it anymore.
50  *
51  * Writers check for concurrent resizes by comparing ht->map before and after
52  * acquiring their bucket lock. If they don't match, a resize has occurred
53  * while the bucket spinlock was being acquired.
54  *
55  * Related Work:
56  * - Idea of cacheline-sized buckets with full hashes taken from:
57  *   David, Guerraoui & Trigonakis, "Asynchronized Concurrency:
58  *   The Secret to Scaling Concurrent Search Data Structures", ASPLOS'15.
59  * - Why not RCU-based hash tables? They would allow us to get rid of the
60  *   seqlock, but resizing would take forever since RCU read critical
61  *   sections in QEMU take quite a long time.
62  *   More info on relativistic hash tables:
63  *   + Triplett, McKenney & Walpole, "Resizable, Scalable, Concurrent Hash
64  *     Tables via Relativistic Programming", USENIX ATC'11.
65  *   + Corbet, "Relativistic hash tables, part 1: Algorithms", @ lwn.net, 2014.
66  *     https://lwn.net/Articles/612021/
67  */
68 #include "qemu/osdep.h"
69 #include "qemu/qht.h"
70 #include "qemu/atomic.h"
71 #include "qemu/rcu.h"
72 #include "qemu/memalign.h"
73 
74 //#define QHT_DEBUG
75 
76 /*
77  * We want to avoid false sharing of cache lines. Most systems have 64-byte
78  * cache lines so we go with it for simplicity.
79  *
80  * Note that systems with smaller cache lines will be fine (the struct is
81  * almost 64-bytes); systems with larger cache lines might suffer from
82  * some false sharing.
83  */
84 #define QHT_BUCKET_ALIGN 64
85 
86 /* define these to keep sizeof(qht_bucket) within QHT_BUCKET_ALIGN */
87 #if HOST_LONG_BITS == 32
88 #define QHT_BUCKET_ENTRIES 6
89 #else /* 64-bit */
90 #define QHT_BUCKET_ENTRIES 4
91 #endif
92 
93 enum qht_iter_type {
94     QHT_ITER_VOID,    /* do nothing; use retvoid */
95     QHT_ITER_RM,      /* remove element if retbool returns true */
96 };
97 
98 struct qht_iter {
99     union {
100         qht_iter_func_t retvoid;
101         qht_iter_bool_func_t retbool;
102     } f;
103     enum qht_iter_type type;
104 };
105 
106 /*
107  * Do _not_ use qemu_mutex_[try]lock directly! Use these macros, otherwise
108  * the profiler (QSP) will deadlock.
109  */
110 static inline void qht_lock(struct qht *ht)
111 {
112     if (ht->mode & QHT_MODE_RAW_MUTEXES) {
113         qemu_mutex_lock__raw(&ht->lock);
114     } else {
115         qemu_mutex_lock(&ht->lock);
116     }
117 }
118 
119 static inline int qht_trylock(struct qht *ht)
120 {
121     if (ht->mode & QHT_MODE_RAW_MUTEXES) {
122         return qemu_mutex_trylock__raw(&(ht)->lock);
123     }
124     return qemu_mutex_trylock(&(ht)->lock);
125 }
126 
127 /* this inline is not really necessary, but it helps keep code consistent */
128 static inline void qht_unlock(struct qht *ht)
129 {
130     qemu_mutex_unlock(&ht->lock);
131 }
132 
133 /*
134  * Note: reading partially-updated pointers in @pointers could lead to
135  * segfaults. We thus access them with qatomic_read/set; this guarantees
136  * that the compiler makes all those accesses atomic. We also need the
137  * volatile-like behavior in qatomic_read, since otherwise the compiler
138  * might refetch the pointer.
139  * qatomic_read's are of course not necessary when the bucket lock is held.
140  *
141  * If both ht->lock and b->lock are grabbed, ht->lock should always
142  * be grabbed first.
143  */
144 struct qht_bucket {
145     QemuSpin lock;
146     QemuSeqLock sequence;
147     uint32_t hashes[QHT_BUCKET_ENTRIES];
148     void *pointers[QHT_BUCKET_ENTRIES];
149     struct qht_bucket *next;
150 } QEMU_ALIGNED(QHT_BUCKET_ALIGN);
151 
152 QEMU_BUILD_BUG_ON(sizeof(struct qht_bucket) > QHT_BUCKET_ALIGN);
153 
154 /*
155  * Under TSAN, we use striped locks instead of one lock per bucket chain.
156  * This avoids crashing under TSAN, since TSAN aborts the program if more than
157  * 64 locks are held (this is a hardcoded limit in TSAN).
158  * When resizing a QHT we grab all the buckets' locks, which can easily
159  * go over TSAN's limit. By using striped locks, we avoid this problem.
160  *
161  * Note: this number must be a power of two for easy index computation.
162  */
163 #define QHT_TSAN_BUCKET_LOCKS_BITS 4
164 #define QHT_TSAN_BUCKET_LOCKS (1 << QHT_TSAN_BUCKET_LOCKS_BITS)
165 
166 struct qht_tsan_lock {
167     QemuSpin lock;
168 } QEMU_ALIGNED(QHT_BUCKET_ALIGN);
169 
170 /**
171  * struct qht_map - structure to track an array of buckets
172  * @rcu: used by RCU. Keep it as the top field in the struct to help valgrind
173  *       find the whole struct.
174  * @buckets: array of head buckets. It is constant once the map is created.
175  * @n_buckets: number of head buckets. It is constant once the map is created.
176  * @n_added_buckets: number of added (i.e. "non-head") buckets
177  * @n_added_buckets_threshold: threshold to trigger an upward resize once the
178  *                             number of added buckets surpasses it.
179  * @tsan_bucket_locks: Array of striped locks to be used only under TSAN.
180  *
181  * Buckets are tracked in what we call a "map", i.e. this structure.
182  */
183 struct qht_map {
184     struct rcu_head rcu;
185     struct qht_bucket *buckets;
186     size_t n_buckets;
187     size_t n_added_buckets;
188     size_t n_added_buckets_threshold;
189 #ifdef CONFIG_TSAN
190     struct qht_tsan_lock tsan_bucket_locks[QHT_TSAN_BUCKET_LOCKS];
191 #endif
192 };
193 
194 /* trigger a resize when n_added_buckets > n_buckets / div */
195 #define QHT_NR_ADDED_BUCKETS_THRESHOLD_DIV 8
196 
197 static void qht_do_resize_reset(struct qht *ht, struct qht_map *new,
198                                 bool reset);
199 static void qht_grow_maybe(struct qht *ht);
200 
201 #ifdef QHT_DEBUG
202 
203 #define qht_debug_assert(X) do { assert(X); } while (0)
204 
205 static void qht_bucket_debug__locked(struct qht_bucket *b)
206 {
207     bool seen_empty = false;
208     bool corrupt = false;
209     int i;
210 
211     do {
212         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
213             if (b->pointers[i] == NULL) {
214                 seen_empty = true;
215                 continue;
216             }
217             if (seen_empty) {
218                 fprintf(stderr, "%s: b: %p, pos: %i, hash: 0x%x, p: %p\n",
219                         __func__, b, i, b->hashes[i], b->pointers[i]);
220                 corrupt = true;
221             }
222         }
223         b = b->next;
224     } while (b);
225     qht_debug_assert(!corrupt);
226 }
227 
228 static void qht_map_debug__all_locked(struct qht_map *map)
229 {
230     int i;
231 
232     for (i = 0; i < map->n_buckets; i++) {
233         qht_bucket_debug__locked(&map->buckets[i]);
234     }
235 }
236 #else
237 
238 #define qht_debug_assert(X) do { (void)(X); } while (0)
239 
240 static inline void qht_bucket_debug__locked(struct qht_bucket *b)
241 { }
242 
243 static inline void qht_map_debug__all_locked(struct qht_map *map)
244 { }
245 #endif /* QHT_DEBUG */
246 
247 static inline size_t qht_elems_to_buckets(size_t n_elems)
248 {
249     return pow2ceil(n_elems / QHT_BUCKET_ENTRIES);
250 }
251 
252 /*
253  * When using striped locks (i.e. under TSAN), we have to be careful not
254  * to operate on the same lock twice (e.g. when iterating through all buckets).
255  * We achieve this by operating only on each stripe's first matching lock.
256  */
257 static inline void qht_do_if_first_in_stripe(struct qht_map *map,
258                                              struct qht_bucket *b,
259                                              void (*func)(QemuSpin *spin))
260 {
261 #ifdef CONFIG_TSAN
262     unsigned long bucket_idx = b - map->buckets;
263     bool is_first_in_stripe = (bucket_idx >> QHT_TSAN_BUCKET_LOCKS_BITS) == 0;
264     if (is_first_in_stripe) {
265         unsigned long lock_idx = bucket_idx & (QHT_TSAN_BUCKET_LOCKS - 1);
266         func(&map->tsan_bucket_locks[lock_idx].lock);
267     }
268 #else
269     func(&b->lock);
270 #endif
271 }
272 
273 static inline void qht_bucket_lock_do(struct qht_map *map,
274                                       struct qht_bucket *b,
275                                       void (*func)(QemuSpin *lock))
276 {
277 #ifdef CONFIG_TSAN
278     unsigned long bucket_idx = b - map->buckets;
279     unsigned long lock_idx = bucket_idx & (QHT_TSAN_BUCKET_LOCKS - 1);
280     func(&map->tsan_bucket_locks[lock_idx].lock);
281 #else
282     func(&b->lock);
283 #endif
284 }
285 
286 static inline void qht_bucket_lock(struct qht_map *map,
287                                    struct qht_bucket *b)
288 {
289     qht_bucket_lock_do(map, b, qemu_spin_lock);
290 }
291 
292 static inline void qht_bucket_unlock(struct qht_map *map,
293                                      struct qht_bucket *b)
294 {
295     qht_bucket_lock_do(map, b, qemu_spin_unlock);
296 }
297 
298 static inline void qht_head_init(struct qht_map *map, struct qht_bucket *b)
299 {
300     memset(b, 0, sizeof(*b));
301     qht_do_if_first_in_stripe(map, b, qemu_spin_init);
302     seqlock_init(&b->sequence);
303 }
304 
305 static inline
306 struct qht_bucket *qht_map_to_bucket(const struct qht_map *map, uint32_t hash)
307 {
308     return &map->buckets[hash & (map->n_buckets - 1)];
309 }
310 
311 /* acquire all bucket locks from a map */
312 static void qht_map_lock_buckets(struct qht_map *map)
313 {
314     size_t i;
315 
316     for (i = 0; i < map->n_buckets; i++) {
317         struct qht_bucket *b = &map->buckets[i];
318 
319         qht_do_if_first_in_stripe(map, b, qemu_spin_lock);
320     }
321 }
322 
323 static void qht_map_unlock_buckets(struct qht_map *map)
324 {
325     size_t i;
326 
327     for (i = 0; i < map->n_buckets; i++) {
328         struct qht_bucket *b = &map->buckets[i];
329 
330         qht_do_if_first_in_stripe(map, b, qemu_spin_unlock);
331     }
332 }
333 
334 /*
335  * Call with at least a bucket lock held.
336  * @map should be the value read before acquiring the lock (or locks).
337  */
338 static inline bool qht_map_is_stale__locked(const struct qht *ht,
339                                             const struct qht_map *map)
340 {
341     return map != ht->map;
342 }
343 
344 /*
345  * Grab all bucket locks, and set @pmap after making sure the map isn't stale.
346  *
347  * Pairs with qht_map_unlock_buckets(), hence the pass-by-reference.
348  *
349  * Note: callers cannot have ht->lock held.
350  */
351 static inline
352 void qht_map_lock_buckets__no_stale(struct qht *ht, struct qht_map **pmap)
353 {
354     struct qht_map *map;
355 
356     map = qatomic_rcu_read(&ht->map);
357     qht_map_lock_buckets(map);
358     if (likely(!qht_map_is_stale__locked(ht, map))) {
359         *pmap = map;
360         return;
361     }
362     qht_map_unlock_buckets(map);
363 
364     /* we raced with a resize; acquire ht->lock to see the updated ht->map */
365     qht_lock(ht);
366     map = ht->map;
367     qht_map_lock_buckets(map);
368     qht_unlock(ht);
369     *pmap = map;
370     return;
371 }
372 
373 /*
374  * Get a head bucket and lock it, making sure its parent map is not stale.
375  * @pmap is filled with a pointer to the bucket's parent map.
376  *
377  * Unlock with qht_bucket_unlock.
378  *
379  * Note: callers cannot have ht->lock held.
380  */
381 static inline
382 struct qht_bucket *qht_bucket_lock__no_stale(struct qht *ht, uint32_t hash,
383                                              struct qht_map **pmap)
384 {
385     struct qht_bucket *b;
386     struct qht_map *map;
387 
388     map = qatomic_rcu_read(&ht->map);
389     b = qht_map_to_bucket(map, hash);
390 
391     qht_bucket_lock(map, b);
392     if (likely(!qht_map_is_stale__locked(ht, map))) {
393         *pmap = map;
394         return b;
395     }
396     qht_bucket_unlock(map, b);
397 
398     /* we raced with a resize; acquire ht->lock to see the updated ht->map */
399     qht_lock(ht);
400     map = ht->map;
401     b = qht_map_to_bucket(map, hash);
402     qht_bucket_lock(map, b);
403     qht_unlock(ht);
404     *pmap = map;
405     return b;
406 }
407 
408 static inline bool qht_map_needs_resize(const struct qht_map *map)
409 {
410     return qatomic_read(&map->n_added_buckets) >
411            map->n_added_buckets_threshold;
412 }
413 
414 static inline void qht_chain_destroy(struct qht_map *map,
415                                      struct qht_bucket *head)
416 {
417     struct qht_bucket *curr = head->next;
418     struct qht_bucket *prev;
419 
420     qht_do_if_first_in_stripe(map, head, qemu_spin_destroy);
421     while (curr) {
422         prev = curr;
423         curr = curr->next;
424         qemu_vfree(prev);
425     }
426 }
427 
428 /* pass only an orphan map */
429 static void qht_map_destroy(struct qht_map *map)
430 {
431     size_t i;
432 
433     for (i = 0; i < map->n_buckets; i++) {
434         qht_chain_destroy(map, &map->buckets[i]);
435     }
436     qemu_vfree(map->buckets);
437     g_free(map);
438 }
439 
440 static struct qht_map *qht_map_create(size_t n_buckets)
441 {
442     struct qht_map *map;
443     size_t i;
444 
445     map = g_malloc(sizeof(*map));
446     map->n_buckets = n_buckets;
447 
448     map->n_added_buckets = 0;
449     map->n_added_buckets_threshold = n_buckets /
450         QHT_NR_ADDED_BUCKETS_THRESHOLD_DIV;
451 
452     /* let tiny hash tables to at least add one non-head bucket */
453     if (unlikely(map->n_added_buckets_threshold == 0)) {
454         map->n_added_buckets_threshold = 1;
455     }
456 
457     map->buckets = qemu_memalign(QHT_BUCKET_ALIGN,
458                                  sizeof(*map->buckets) * n_buckets);
459     for (i = 0; i < n_buckets; i++) {
460         qht_head_init(map, &map->buckets[i]);
461     }
462     return map;
463 }
464 
465 void qht_init(struct qht *ht, qht_cmp_func_t cmp, size_t n_elems,
466               unsigned int mode)
467 {
468     struct qht_map *map;
469     size_t n_buckets = qht_elems_to_buckets(n_elems);
470 
471     g_assert(cmp);
472     ht->cmp = cmp;
473     ht->mode = mode;
474     qemu_mutex_init(&ht->lock);
475     map = qht_map_create(n_buckets);
476     qatomic_rcu_set(&ht->map, map);
477 }
478 
479 /* call only when there are no readers/writers left */
480 void qht_destroy(struct qht *ht)
481 {
482     qht_map_destroy(ht->map);
483     memset(ht, 0, sizeof(*ht));
484 }
485 
486 static void qht_bucket_reset__locked(struct qht_bucket *head)
487 {
488     struct qht_bucket *b = head;
489     int i;
490 
491     seqlock_write_begin(&head->sequence);
492     do {
493         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
494             if (b->pointers[i] == NULL) {
495                 goto done;
496             }
497             qatomic_set(&b->hashes[i], 0);
498             qatomic_set(&b->pointers[i], NULL);
499         }
500         b = b->next;
501     } while (b);
502  done:
503     seqlock_write_end(&head->sequence);
504 }
505 
506 /* call with all bucket locks held */
507 static void qht_map_reset__all_locked(struct qht_map *map)
508 {
509     size_t i;
510 
511     for (i = 0; i < map->n_buckets; i++) {
512         qht_bucket_reset__locked(&map->buckets[i]);
513     }
514     qht_map_debug__all_locked(map);
515 }
516 
517 void qht_reset(struct qht *ht)
518 {
519     struct qht_map *map;
520 
521     qht_map_lock_buckets__no_stale(ht, &map);
522     qht_map_reset__all_locked(map);
523     qht_map_unlock_buckets(map);
524 }
525 
526 static inline void qht_do_resize(struct qht *ht, struct qht_map *new)
527 {
528     qht_do_resize_reset(ht, new, false);
529 }
530 
531 static inline void qht_do_resize_and_reset(struct qht *ht, struct qht_map *new)
532 {
533     qht_do_resize_reset(ht, new, true);
534 }
535 
536 bool qht_reset_size(struct qht *ht, size_t n_elems)
537 {
538     struct qht_map *new = NULL;
539     struct qht_map *map;
540     size_t n_buckets;
541 
542     n_buckets = qht_elems_to_buckets(n_elems);
543 
544     qht_lock(ht);
545     map = ht->map;
546     if (n_buckets != map->n_buckets) {
547         new = qht_map_create(n_buckets);
548     }
549     qht_do_resize_and_reset(ht, new);
550     qht_unlock(ht);
551 
552     return !!new;
553 }
554 
555 static inline
556 void *qht_do_lookup(const struct qht_bucket *head, qht_lookup_func_t func,
557                     const void *userp, uint32_t hash)
558 {
559     const struct qht_bucket *b = head;
560     int i;
561 
562     do {
563         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
564             if (qatomic_read(&b->hashes[i]) == hash) {
565                 /* The pointer is dereferenced before seqlock_read_retry,
566                  * so (unlike qht_insert__locked) we need to use
567                  * qatomic_rcu_read here.
568                  */
569                 void *p = qatomic_rcu_read(&b->pointers[i]);
570 
571                 if (likely(p) && likely(func(p, userp))) {
572                     return p;
573                 }
574             }
575         }
576         b = qatomic_rcu_read(&b->next);
577     } while (b);
578 
579     return NULL;
580 }
581 
582 static __attribute__((noinline))
583 void *qht_lookup__slowpath(const struct qht_bucket *b, qht_lookup_func_t func,
584                            const void *userp, uint32_t hash)
585 {
586     unsigned int version;
587     void *ret;
588 
589     do {
590         version = seqlock_read_begin(&b->sequence);
591         ret = qht_do_lookup(b, func, userp, hash);
592     } while (seqlock_read_retry(&b->sequence, version));
593     return ret;
594 }
595 
596 void *qht_lookup_custom(const struct qht *ht, const void *userp, uint32_t hash,
597                         qht_lookup_func_t func)
598 {
599     const struct qht_bucket *b;
600     const struct qht_map *map;
601     unsigned int version;
602     void *ret;
603 
604     map = qatomic_rcu_read(&ht->map);
605     b = qht_map_to_bucket(map, hash);
606 
607     version = seqlock_read_begin(&b->sequence);
608     ret = qht_do_lookup(b, func, userp, hash);
609     if (likely(!seqlock_read_retry(&b->sequence, version))) {
610         return ret;
611     }
612     /*
613      * Removing the do/while from the fastpath gives a 4% perf. increase when
614      * running a 100%-lookup microbenchmark.
615      */
616     return qht_lookup__slowpath(b, func, userp, hash);
617 }
618 
619 void *qht_lookup(const struct qht *ht, const void *userp, uint32_t hash)
620 {
621     return qht_lookup_custom(ht, userp, hash, ht->cmp);
622 }
623 
624 /*
625  * call with head->lock held
626  * @ht is const since it is only used for ht->cmp()
627  */
628 static void *qht_insert__locked(const struct qht *ht, struct qht_map *map,
629                                 struct qht_bucket *head, void *p, uint32_t hash,
630                                 bool *needs_resize)
631 {
632     struct qht_bucket *b = head;
633     struct qht_bucket *prev = NULL;
634     struct qht_bucket *new = NULL;
635     int i;
636 
637     do {
638         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
639             if (b->pointers[i]) {
640                 if (unlikely(b->hashes[i] == hash &&
641                              ht->cmp(b->pointers[i], p))) {
642                     return b->pointers[i];
643                 }
644             } else {
645                 goto found;
646             }
647         }
648         prev = b;
649         b = b->next;
650     } while (b);
651 
652     b = qemu_memalign(QHT_BUCKET_ALIGN, sizeof(*b));
653     memset(b, 0, sizeof(*b));
654     new = b;
655     i = 0;
656     qatomic_inc(&map->n_added_buckets);
657     if (unlikely(qht_map_needs_resize(map)) && needs_resize) {
658         *needs_resize = true;
659     }
660 
661  found:
662     /* found an empty key: acquire the seqlock and write */
663     seqlock_write_begin(&head->sequence);
664     if (new) {
665         qatomic_rcu_set(&prev->next, b);
666     }
667     /* smp_wmb() implicit in seqlock_write_begin.  */
668     qatomic_set(&b->hashes[i], hash);
669     qatomic_set(&b->pointers[i], p);
670     seqlock_write_end(&head->sequence);
671     return NULL;
672 }
673 
674 static __attribute__((noinline)) void qht_grow_maybe(struct qht *ht)
675 {
676     struct qht_map *map;
677 
678     /*
679      * If the lock is taken it probably means there's an ongoing resize,
680      * so bail out.
681      */
682     if (qht_trylock(ht)) {
683         return;
684     }
685     map = ht->map;
686     /* another thread might have just performed the resize we were after */
687     if (qht_map_needs_resize(map)) {
688         struct qht_map *new = qht_map_create(map->n_buckets * 2);
689 
690         qht_do_resize(ht, new);
691     }
692     qht_unlock(ht);
693 }
694 
695 bool qht_insert(struct qht *ht, void *p, uint32_t hash, void **existing)
696 {
697     struct qht_bucket *b;
698     struct qht_map *map;
699     bool needs_resize = false;
700     void *prev;
701 
702     /* NULL pointers are not supported */
703     qht_debug_assert(p);
704 
705     b = qht_bucket_lock__no_stale(ht, hash, &map);
706     prev = qht_insert__locked(ht, map, b, p, hash, &needs_resize);
707     qht_bucket_debug__locked(b);
708     qht_bucket_unlock(map, b);
709 
710     if (unlikely(needs_resize) && ht->mode & QHT_MODE_AUTO_RESIZE) {
711         qht_grow_maybe(ht);
712     }
713     if (likely(prev == NULL)) {
714         return true;
715     }
716     if (existing) {
717         *existing = prev;
718     }
719     return false;
720 }
721 
722 static inline bool qht_entry_is_last(const struct qht_bucket *b, int pos)
723 {
724     if (pos == QHT_BUCKET_ENTRIES - 1) {
725         if (b->next == NULL) {
726             return true;
727         }
728         return b->next->pointers[0] == NULL;
729     }
730     return b->pointers[pos + 1] == NULL;
731 }
732 
733 static void
734 qht_entry_move(struct qht_bucket *to, int i, struct qht_bucket *from, int j)
735 {
736     qht_debug_assert(!(to == from && i == j));
737     qht_debug_assert(to->pointers[i]);
738     qht_debug_assert(from->pointers[j]);
739 
740     qatomic_set(&to->hashes[i], from->hashes[j]);
741     qatomic_set(&to->pointers[i], from->pointers[j]);
742 
743     qatomic_set(&from->hashes[j], 0);
744     qatomic_set(&from->pointers[j], NULL);
745 }
746 
747 /*
748  * Find the last valid entry in @orig, and swap it with @orig[pos], which has
749  * just been invalidated.
750  */
751 static inline void qht_bucket_remove_entry(struct qht_bucket *orig, int pos)
752 {
753     struct qht_bucket *b = orig;
754     struct qht_bucket *prev = NULL;
755     int i;
756 
757     if (qht_entry_is_last(orig, pos)) {
758         qatomic_set(&orig->hashes[pos], 0);
759         qatomic_set(&orig->pointers[pos], NULL);
760         return;
761     }
762     do {
763         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
764             if (b->pointers[i]) {
765                 continue;
766             }
767             if (i > 0) {
768                 return qht_entry_move(orig, pos, b, i - 1);
769             }
770             qht_debug_assert(prev);
771             return qht_entry_move(orig, pos, prev, QHT_BUCKET_ENTRIES - 1);
772         }
773         prev = b;
774         b = b->next;
775     } while (b);
776     /* no free entries other than orig[pos], so swap it with the last one */
777     qht_entry_move(orig, pos, prev, QHT_BUCKET_ENTRIES - 1);
778 }
779 
780 /* call with b->lock held */
781 static inline
782 bool qht_remove__locked(struct qht_bucket *head, const void *p, uint32_t hash)
783 {
784     struct qht_bucket *b = head;
785     int i;
786 
787     do {
788         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
789             void *q = b->pointers[i];
790 
791             if (unlikely(q == NULL)) {
792                 return false;
793             }
794             if (q == p) {
795                 qht_debug_assert(b->hashes[i] == hash);
796                 seqlock_write_begin(&head->sequence);
797                 qht_bucket_remove_entry(b, i);
798                 seqlock_write_end(&head->sequence);
799                 return true;
800             }
801         }
802         b = b->next;
803     } while (b);
804     return false;
805 }
806 
807 bool qht_remove(struct qht *ht, const void *p, uint32_t hash)
808 {
809     struct qht_bucket *b;
810     struct qht_map *map;
811     bool ret;
812 
813     /* NULL pointers are not supported */
814     qht_debug_assert(p);
815 
816     b = qht_bucket_lock__no_stale(ht, hash, &map);
817     ret = qht_remove__locked(b, p, hash);
818     qht_bucket_debug__locked(b);
819     qht_bucket_unlock(map, b);
820     return ret;
821 }
822 
823 static inline void qht_bucket_iter(struct qht_bucket *head,
824                                    const struct qht_iter *iter, void *userp)
825 {
826     struct qht_bucket *b = head;
827     int i;
828 
829     do {
830         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
831             if (b->pointers[i] == NULL) {
832                 return;
833             }
834             switch (iter->type) {
835             case QHT_ITER_VOID:
836                 iter->f.retvoid(b->pointers[i], b->hashes[i], userp);
837                 break;
838             case QHT_ITER_RM:
839                 if (iter->f.retbool(b->pointers[i], b->hashes[i], userp)) {
840                     /* replace i with the last valid element in the bucket */
841                     seqlock_write_begin(&head->sequence);
842                     qht_bucket_remove_entry(b, i);
843                     seqlock_write_end(&head->sequence);
844                     qht_bucket_debug__locked(b);
845                     /* reevaluate i, since it just got replaced */
846                     i--;
847                     continue;
848                 }
849                 break;
850             default:
851                 g_assert_not_reached();
852             }
853         }
854         b = b->next;
855     } while (b);
856 }
857 
858 /* call with all of the map's locks held */
859 static inline void qht_map_iter__all_locked(struct qht_map *map,
860                                             const struct qht_iter *iter,
861                                             void *userp)
862 {
863     size_t i;
864 
865     for (i = 0; i < map->n_buckets; i++) {
866         qht_bucket_iter(&map->buckets[i], iter, userp);
867     }
868 }
869 
870 static inline void
871 do_qht_iter(struct qht *ht, const struct qht_iter *iter, void *userp)
872 {
873     struct qht_map *map;
874 
875     map = qatomic_rcu_read(&ht->map);
876     qht_map_lock_buckets(map);
877     qht_map_iter__all_locked(map, iter, userp);
878     qht_map_unlock_buckets(map);
879 }
880 
881 void qht_iter(struct qht *ht, qht_iter_func_t func, void *userp)
882 {
883     const struct qht_iter iter = {
884         .f.retvoid = func,
885         .type = QHT_ITER_VOID,
886     };
887 
888     do_qht_iter(ht, &iter, userp);
889 }
890 
891 void qht_iter_remove(struct qht *ht, qht_iter_bool_func_t func, void *userp)
892 {
893     const struct qht_iter iter = {
894         .f.retbool = func,
895         .type = QHT_ITER_RM,
896     };
897 
898     do_qht_iter(ht, &iter, userp);
899 }
900 
901 struct qht_map_copy_data {
902     struct qht *ht;
903     struct qht_map *new;
904 };
905 
906 static void qht_map_copy(void *p, uint32_t hash, void *userp)
907 {
908     struct qht_map_copy_data *data = userp;
909     struct qht *ht = data->ht;
910     struct qht_map *new = data->new;
911     struct qht_bucket *b = qht_map_to_bucket(new, hash);
912 
913     /* no need to acquire b->lock because no thread has seen this map yet */
914     qht_insert__locked(ht, new, b, p, hash, NULL);
915 }
916 
917 /*
918  * Atomically perform a resize and/or reset.
919  * Call with ht->lock held.
920  */
921 static void qht_do_resize_reset(struct qht *ht, struct qht_map *new, bool reset)
922 {
923     struct qht_map *old;
924     const struct qht_iter iter = {
925         .f.retvoid = qht_map_copy,
926         .type = QHT_ITER_VOID,
927     };
928     struct qht_map_copy_data data;
929 
930     old = ht->map;
931     qht_map_lock_buckets(old);
932 
933     if (reset) {
934         qht_map_reset__all_locked(old);
935     }
936 
937     if (new == NULL) {
938         qht_map_unlock_buckets(old);
939         return;
940     }
941 
942     g_assert(new->n_buckets != old->n_buckets);
943     data.ht = ht;
944     data.new = new;
945     qht_map_iter__all_locked(old, &iter, &data);
946     qht_map_debug__all_locked(new);
947 
948     qatomic_rcu_set(&ht->map, new);
949     qht_map_unlock_buckets(old);
950     call_rcu(old, qht_map_destroy, rcu);
951 }
952 
953 bool qht_resize(struct qht *ht, size_t n_elems)
954 {
955     size_t n_buckets = qht_elems_to_buckets(n_elems);
956     size_t ret = false;
957 
958     qht_lock(ht);
959     if (n_buckets != ht->map->n_buckets) {
960         struct qht_map *new;
961 
962         new = qht_map_create(n_buckets);
963         qht_do_resize(ht, new);
964         ret = true;
965     }
966     qht_unlock(ht);
967 
968     return ret;
969 }
970 
971 /* pass @stats to qht_statistics_destroy() when done */
972 void qht_statistics_init(const struct qht *ht, struct qht_stats *stats)
973 {
974     const struct qht_map *map;
975     int i;
976 
977     map = qatomic_rcu_read(&ht->map);
978 
979     stats->used_head_buckets = 0;
980     stats->entries = 0;
981     qdist_init(&stats->chain);
982     qdist_init(&stats->occupancy);
983     /* bail out if the qht has not yet been initialized */
984     if (unlikely(map == NULL)) {
985         stats->head_buckets = 0;
986         return;
987     }
988     stats->head_buckets = map->n_buckets;
989 
990     for (i = 0; i < map->n_buckets; i++) {
991         const struct qht_bucket *head = &map->buckets[i];
992         const struct qht_bucket *b;
993         unsigned int version;
994         size_t buckets;
995         size_t entries;
996         int j;
997 
998         do {
999             version = seqlock_read_begin(&head->sequence);
1000             buckets = 0;
1001             entries = 0;
1002             b = head;
1003             do {
1004                 for (j = 0; j < QHT_BUCKET_ENTRIES; j++) {
1005                     if (qatomic_read(&b->pointers[j]) == NULL) {
1006                         break;
1007                     }
1008                     entries++;
1009                 }
1010                 buckets++;
1011                 b = qatomic_rcu_read(&b->next);
1012             } while (b);
1013         } while (seqlock_read_retry(&head->sequence, version));
1014 
1015         if (entries) {
1016             qdist_inc(&stats->chain, buckets);
1017             qdist_inc(&stats->occupancy,
1018                       (double)entries / QHT_BUCKET_ENTRIES / buckets);
1019             stats->used_head_buckets++;
1020             stats->entries += entries;
1021         } else {
1022             qdist_inc(&stats->occupancy, 0);
1023         }
1024     }
1025 }
1026 
1027 void qht_statistics_destroy(struct qht_stats *stats)
1028 {
1029     qdist_destroy(&stats->occupancy);
1030     qdist_destroy(&stats->chain);
1031 }
1032