qht.c (834b9273d5cdab68180dc8c84d641aaa4344b057) qht.c (d73415a315471ac0b127ed3fad45c8ec5d711de1)
1/*
2 * qht.c - QEMU Hash Table, designed to scale for read-mostly workloads.
3 *
4 * Copyright (C) 2016, Emilio G. Cota <cota@braap.org>
5 *
6 * License: GNU GPL, version 2 or later.
7 * See the COPYING file in the top-level directory.
8 *

--- 117 unchanged lines hidden (view full) ---

126/* this inline is not really necessary, but it helps keep code consistent */
127static inline void qht_unlock(struct qht *ht)
128{
129 qemu_mutex_unlock(&ht->lock);
130}
131
132/*
133 * Note: reading partially-updated pointers in @pointers could lead to
1/*
2 * qht.c - QEMU Hash Table, designed to scale for read-mostly workloads.
3 *
4 * Copyright (C) 2016, Emilio G. Cota <cota@braap.org>
5 *
6 * License: GNU GPL, version 2 or later.
7 * See the COPYING file in the top-level directory.
8 *

--- 117 unchanged lines hidden (view full) ---

126/* this inline is not really necessary, but it helps keep code consistent */
127static inline void qht_unlock(struct qht *ht)
128{
129 qemu_mutex_unlock(&ht->lock);
130}
131
132/*
133 * Note: reading partially-updated pointers in @pointers could lead to
134 * segfaults. We thus access them with atomic_read/set; this guarantees
134 * segfaults. We thus access them with qatomic_read/set; this guarantees
135 * that the compiler makes all those accesses atomic. We also need the
135 * that the compiler makes all those accesses atomic. We also need the
136 * volatile-like behavior in atomic_read, since otherwise the compiler
136 * volatile-like behavior in qatomic_read, since otherwise the compiler
137 * might refetch the pointer.
137 * might refetch the pointer.
138 * atomic_read's are of course not necessary when the bucket lock is held.
138 * qatomic_read's are of course not necessary when the bucket lock is held.
139 *
140 * If both ht->lock and b->lock are grabbed, ht->lock should always
141 * be grabbed first.
142 */
143struct qht_bucket {
144 QemuSpin lock;
145 QemuSeqLock sequence;
146 uint32_t hashes[QHT_BUCKET_ENTRIES];

--- 134 unchanged lines hidden (view full) ---

281 *
282 * Note: callers cannot have ht->lock held.
283 */
284static inline
285void qht_map_lock_buckets__no_stale(struct qht *ht, struct qht_map **pmap)
286{
287 struct qht_map *map;
288
139 *
140 * If both ht->lock and b->lock are grabbed, ht->lock should always
141 * be grabbed first.
142 */
143struct qht_bucket {
144 QemuSpin lock;
145 QemuSeqLock sequence;
146 uint32_t hashes[QHT_BUCKET_ENTRIES];

--- 134 unchanged lines hidden (view full) ---

281 *
282 * Note: callers cannot have ht->lock held.
283 */
284static inline
285void qht_map_lock_buckets__no_stale(struct qht *ht, struct qht_map **pmap)
286{
287 struct qht_map *map;
288
289 map = atomic_rcu_read(&ht->map);
289 map = qatomic_rcu_read(&ht->map);
290 qht_map_lock_buckets(map);
291 if (likely(!qht_map_is_stale__locked(ht, map))) {
292 *pmap = map;
293 return;
294 }
295 qht_map_unlock_buckets(map);
296
297 /* we raced with a resize; acquire ht->lock to see the updated ht->map */

--- 15 unchanged lines hidden (view full) ---

313 */
314static inline
315struct qht_bucket *qht_bucket_lock__no_stale(struct qht *ht, uint32_t hash,
316 struct qht_map **pmap)
317{
318 struct qht_bucket *b;
319 struct qht_map *map;
320
290 qht_map_lock_buckets(map);
291 if (likely(!qht_map_is_stale__locked(ht, map))) {
292 *pmap = map;
293 return;
294 }
295 qht_map_unlock_buckets(map);
296
297 /* we raced with a resize; acquire ht->lock to see the updated ht->map */

--- 15 unchanged lines hidden (view full) ---

313 */
314static inline
315struct qht_bucket *qht_bucket_lock__no_stale(struct qht *ht, uint32_t hash,
316 struct qht_map **pmap)
317{
318 struct qht_bucket *b;
319 struct qht_map *map;
320
321 map = atomic_rcu_read(&ht->map);
321 map = qatomic_rcu_read(&ht->map);
322 b = qht_map_to_bucket(map, hash);
323
324 qemu_spin_lock(&b->lock);
325 if (likely(!qht_map_is_stale__locked(ht, map))) {
326 *pmap = map;
327 return b;
328 }
329 qemu_spin_unlock(&b->lock);

--- 5 unchanged lines hidden (view full) ---

335 qemu_spin_lock(&b->lock);
336 qht_unlock(ht);
337 *pmap = map;
338 return b;
339}
340
341static inline bool qht_map_needs_resize(const struct qht_map *map)
342{
322 b = qht_map_to_bucket(map, hash);
323
324 qemu_spin_lock(&b->lock);
325 if (likely(!qht_map_is_stale__locked(ht, map))) {
326 *pmap = map;
327 return b;
328 }
329 qemu_spin_unlock(&b->lock);

--- 5 unchanged lines hidden (view full) ---

335 qemu_spin_lock(&b->lock);
336 qht_unlock(ht);
337 *pmap = map;
338 return b;
339}
340
341static inline bool qht_map_needs_resize(const struct qht_map *map)
342{
343 return atomic_read(&map->n_added_buckets) > map->n_added_buckets_threshold;
343 return qatomic_read(&map->n_added_buckets) >
344 map->n_added_buckets_threshold;
344}
345
346static inline void qht_chain_destroy(const struct qht_bucket *head)
347{
348 struct qht_bucket *curr = head->next;
349 struct qht_bucket *prev;
350
351 qemu_spin_destroy(&head->lock);

--- 47 unchanged lines hidden (view full) ---

399 struct qht_map *map;
400 size_t n_buckets = qht_elems_to_buckets(n_elems);
401
402 g_assert(cmp);
403 ht->cmp = cmp;
404 ht->mode = mode;
405 qemu_mutex_init(&ht->lock);
406 map = qht_map_create(n_buckets);
345}
346
347static inline void qht_chain_destroy(const struct qht_bucket *head)
348{
349 struct qht_bucket *curr = head->next;
350 struct qht_bucket *prev;
351
352 qemu_spin_destroy(&head->lock);

--- 47 unchanged lines hidden (view full) ---

400 struct qht_map *map;
401 size_t n_buckets = qht_elems_to_buckets(n_elems);
402
403 g_assert(cmp);
404 ht->cmp = cmp;
405 ht->mode = mode;
406 qemu_mutex_init(&ht->lock);
407 map = qht_map_create(n_buckets);
407 atomic_rcu_set(&ht->map, map);
408 qatomic_rcu_set(&ht->map, map);
408}
409
410/* call only when there are no readers/writers left */
411void qht_destroy(struct qht *ht)
412{
413 qht_map_destroy(ht->map);
414 memset(ht, 0, sizeof(*ht));
415}

--- 4 unchanged lines hidden (view full) ---

420 int i;
421
422 seqlock_write_begin(&head->sequence);
423 do {
424 for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
425 if (b->pointers[i] == NULL) {
426 goto done;
427 }
409}
410
411/* call only when there are no readers/writers left */
412void qht_destroy(struct qht *ht)
413{
414 qht_map_destroy(ht->map);
415 memset(ht, 0, sizeof(*ht));
416}

--- 4 unchanged lines hidden (view full) ---

421 int i;
422
423 seqlock_write_begin(&head->sequence);
424 do {
425 for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
426 if (b->pointers[i] == NULL) {
427 goto done;
428 }
428 atomic_set(&b->hashes[i], 0);
429 atomic_set(&b->pointers[i], NULL);
429 qatomic_set(&b->hashes[i], 0);
430 qatomic_set(&b->pointers[i], NULL);
430 }
431 b = b->next;
432 } while (b);
433 done:
434 seqlock_write_end(&head->sequence);
435}
436
437/* call with all bucket locks held */

--- 49 unchanged lines hidden (view full) ---

487void *qht_do_lookup(const struct qht_bucket *head, qht_lookup_func_t func,
488 const void *userp, uint32_t hash)
489{
490 const struct qht_bucket *b = head;
491 int i;
492
493 do {
494 for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
431 }
432 b = b->next;
433 } while (b);
434 done:
435 seqlock_write_end(&head->sequence);
436}
437
438/* call with all bucket locks held */

--- 49 unchanged lines hidden (view full) ---

488void *qht_do_lookup(const struct qht_bucket *head, qht_lookup_func_t func,
489 const void *userp, uint32_t hash)
490{
491 const struct qht_bucket *b = head;
492 int i;
493
494 do {
495 for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
495 if (atomic_read(&b->hashes[i]) == hash) {
496 if (qatomic_read(&b->hashes[i]) == hash) {
496 /* The pointer is dereferenced before seqlock_read_retry,
497 * so (unlike qht_insert__locked) we need to use
497 /* The pointer is dereferenced before seqlock_read_retry,
498 * so (unlike qht_insert__locked) we need to use
498 * atomic_rcu_read here.
499 * qatomic_rcu_read here.
499 */
500 */
500 void *p = atomic_rcu_read(&b->pointers[i]);
501 void *p = qatomic_rcu_read(&b->pointers[i]);
501
502 if (likely(p) && likely(func(p, userp))) {
503 return p;
504 }
505 }
506 }
502
503 if (likely(p) && likely(func(p, userp))) {
504 return p;
505 }
506 }
507 }
507 b = atomic_rcu_read(&b->next);
508 b = qatomic_rcu_read(&b->next);
508 } while (b);
509
510 return NULL;
511}
512
513static __attribute__((noinline))
514void *qht_lookup__slowpath(const struct qht_bucket *b, qht_lookup_func_t func,
515 const void *userp, uint32_t hash)

--- 11 unchanged lines hidden (view full) ---

527void *qht_lookup_custom(const struct qht *ht, const void *userp, uint32_t hash,
528 qht_lookup_func_t func)
529{
530 const struct qht_bucket *b;
531 const struct qht_map *map;
532 unsigned int version;
533 void *ret;
534
509 } while (b);
510
511 return NULL;
512}
513
514static __attribute__((noinline))
515void *qht_lookup__slowpath(const struct qht_bucket *b, qht_lookup_func_t func,
516 const void *userp, uint32_t hash)

--- 11 unchanged lines hidden (view full) ---

528void *qht_lookup_custom(const struct qht *ht, const void *userp, uint32_t hash,
529 qht_lookup_func_t func)
530{
531 const struct qht_bucket *b;
532 const struct qht_map *map;
533 unsigned int version;
534 void *ret;
535
535 map = atomic_rcu_read(&ht->map);
536 map = qatomic_rcu_read(&ht->map);
536 b = qht_map_to_bucket(map, hash);
537
538 version = seqlock_read_begin(&b->sequence);
539 ret = qht_do_lookup(b, func, userp, hash);
540 if (likely(!seqlock_read_retry(&b->sequence, version))) {
541 return ret;
542 }
543 /*

--- 35 unchanged lines hidden (view full) ---

579 prev = b;
580 b = b->next;
581 } while (b);
582
583 b = qemu_memalign(QHT_BUCKET_ALIGN, sizeof(*b));
584 memset(b, 0, sizeof(*b));
585 new = b;
586 i = 0;
537 b = qht_map_to_bucket(map, hash);
538
539 version = seqlock_read_begin(&b->sequence);
540 ret = qht_do_lookup(b, func, userp, hash);
541 if (likely(!seqlock_read_retry(&b->sequence, version))) {
542 return ret;
543 }
544 /*

--- 35 unchanged lines hidden (view full) ---

580 prev = b;
581 b = b->next;
582 } while (b);
583
584 b = qemu_memalign(QHT_BUCKET_ALIGN, sizeof(*b));
585 memset(b, 0, sizeof(*b));
586 new = b;
587 i = 0;
587 atomic_inc(&map->n_added_buckets);
588 qatomic_inc(&map->n_added_buckets);
588 if (unlikely(qht_map_needs_resize(map)) && needs_resize) {
589 *needs_resize = true;
590 }
591
592 found:
593 /* found an empty key: acquire the seqlock and write */
594 seqlock_write_begin(&head->sequence);
595 if (new) {
589 if (unlikely(qht_map_needs_resize(map)) && needs_resize) {
590 *needs_resize = true;
591 }
592
593 found:
594 /* found an empty key: acquire the seqlock and write */
595 seqlock_write_begin(&head->sequence);
596 if (new) {
596 atomic_rcu_set(&prev->next, b);
597 qatomic_rcu_set(&prev->next, b);
597 }
598 /* smp_wmb() implicit in seqlock_write_begin. */
598 }
599 /* smp_wmb() implicit in seqlock_write_begin. */
599 atomic_set(&b->hashes[i], hash);
600 atomic_set(&b->pointers[i], p);
600 qatomic_set(&b->hashes[i], hash);
601 qatomic_set(&b->pointers[i], p);
601 seqlock_write_end(&head->sequence);
602 return NULL;
603}
604
605static __attribute__((noinline)) void qht_grow_maybe(struct qht *ht)
606{
607 struct qht_map *map;
608

--- 54 unchanged lines hidden (view full) ---

663
664static void
665qht_entry_move(struct qht_bucket *to, int i, struct qht_bucket *from, int j)
666{
667 qht_debug_assert(!(to == from && i == j));
668 qht_debug_assert(to->pointers[i]);
669 qht_debug_assert(from->pointers[j]);
670
602 seqlock_write_end(&head->sequence);
603 return NULL;
604}
605
606static __attribute__((noinline)) void qht_grow_maybe(struct qht *ht)
607{
608 struct qht_map *map;
609

--- 54 unchanged lines hidden (view full) ---

664
665static void
666qht_entry_move(struct qht_bucket *to, int i, struct qht_bucket *from, int j)
667{
668 qht_debug_assert(!(to == from && i == j));
669 qht_debug_assert(to->pointers[i]);
670 qht_debug_assert(from->pointers[j]);
671
671 atomic_set(&to->hashes[i], from->hashes[j]);
672 atomic_set(&to->pointers[i], from->pointers[j]);
672 qatomic_set(&to->hashes[i], from->hashes[j]);
673 qatomic_set(&to->pointers[i], from->pointers[j]);
673
674
674 atomic_set(&from->hashes[j], 0);
675 atomic_set(&from->pointers[j], NULL);
675 qatomic_set(&from->hashes[j], 0);
676 qatomic_set(&from->pointers[j], NULL);
676}
677
678/*
679 * Find the last valid entry in @orig, and swap it with @orig[pos], which has
680 * just been invalidated.
681 */
682static inline void qht_bucket_remove_entry(struct qht_bucket *orig, int pos)
683{
684 struct qht_bucket *b = orig;
685 struct qht_bucket *prev = NULL;
686 int i;
687
688 if (qht_entry_is_last(orig, pos)) {
689 orig->hashes[pos] = 0;
677}
678
679/*
680 * Find the last valid entry in @orig, and swap it with @orig[pos], which has
681 * just been invalidated.
682 */
683static inline void qht_bucket_remove_entry(struct qht_bucket *orig, int pos)
684{
685 struct qht_bucket *b = orig;
686 struct qht_bucket *prev = NULL;
687 int i;
688
689 if (qht_entry_is_last(orig, pos)) {
690 orig->hashes[pos] = 0;
690 atomic_set(&orig->pointers[pos], NULL);
691 qatomic_set(&orig->pointers[pos], NULL);
691 return;
692 }
693 do {
694 for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
695 if (b->pointers[i]) {
696 continue;
697 }
698 if (i > 0) {

--- 99 unchanged lines hidden (view full) ---

798 }
799}
800
801static inline void
802do_qht_iter(struct qht *ht, const struct qht_iter *iter, void *userp)
803{
804 struct qht_map *map;
805
692 return;
693 }
694 do {
695 for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
696 if (b->pointers[i]) {
697 continue;
698 }
699 if (i > 0) {

--- 99 unchanged lines hidden (view full) ---

799 }
800}
801
802static inline void
803do_qht_iter(struct qht *ht, const struct qht_iter *iter, void *userp)
804{
805 struct qht_map *map;
806
806 map = atomic_rcu_read(&ht->map);
807 map = qatomic_rcu_read(&ht->map);
807 qht_map_lock_buckets(map);
808 qht_map_iter__all_locked(map, iter, userp);
809 qht_map_unlock_buckets(map);
810}
811
812void qht_iter(struct qht *ht, qht_iter_func_t func, void *userp)
813{
814 const struct qht_iter iter = {

--- 56 unchanged lines hidden (view full) ---

871 }
872
873 g_assert(new->n_buckets != old->n_buckets);
874 data.ht = ht;
875 data.new = new;
876 qht_map_iter__all_locked(old, &iter, &data);
877 qht_map_debug__all_locked(new);
878
808 qht_map_lock_buckets(map);
809 qht_map_iter__all_locked(map, iter, userp);
810 qht_map_unlock_buckets(map);
811}
812
813void qht_iter(struct qht *ht, qht_iter_func_t func, void *userp)
814{
815 const struct qht_iter iter = {

--- 56 unchanged lines hidden (view full) ---

872 }
873
874 g_assert(new->n_buckets != old->n_buckets);
875 data.ht = ht;
876 data.new = new;
877 qht_map_iter__all_locked(old, &iter, &data);
878 qht_map_debug__all_locked(new);
879
879 atomic_rcu_set(&ht->map, new);
880 qatomic_rcu_set(&ht->map, new);
880 qht_map_unlock_buckets(old);
881 call_rcu(old, qht_map_destroy, rcu);
882}
883
884bool qht_resize(struct qht *ht, size_t n_elems)
885{
886 size_t n_buckets = qht_elems_to_buckets(n_elems);
887 size_t ret = false;

--- 12 unchanged lines hidden (view full) ---

900}
901
902/* pass @stats to qht_statistics_destroy() when done */
903void qht_statistics_init(const struct qht *ht, struct qht_stats *stats)
904{
905 const struct qht_map *map;
906 int i;
907
881 qht_map_unlock_buckets(old);
882 call_rcu(old, qht_map_destroy, rcu);
883}
884
885bool qht_resize(struct qht *ht, size_t n_elems)
886{
887 size_t n_buckets = qht_elems_to_buckets(n_elems);
888 size_t ret = false;

--- 12 unchanged lines hidden (view full) ---

901}
902
903/* pass @stats to qht_statistics_destroy() when done */
904void qht_statistics_init(const struct qht *ht, struct qht_stats *stats)
905{
906 const struct qht_map *map;
907 int i;
908
908 map = atomic_rcu_read(&ht->map);
909 map = qatomic_rcu_read(&ht->map);
909
910 stats->used_head_buckets = 0;
911 stats->entries = 0;
912 qdist_init(&stats->chain);
913 qdist_init(&stats->occupancy);
914 /* bail out if the qht has not yet been initialized */
915 if (unlikely(map == NULL)) {
916 stats->head_buckets = 0;

--- 11 unchanged lines hidden (view full) ---

928
929 do {
930 version = seqlock_read_begin(&head->sequence);
931 buckets = 0;
932 entries = 0;
933 b = head;
934 do {
935 for (j = 0; j < QHT_BUCKET_ENTRIES; j++) {
910
911 stats->used_head_buckets = 0;
912 stats->entries = 0;
913 qdist_init(&stats->chain);
914 qdist_init(&stats->occupancy);
915 /* bail out if the qht has not yet been initialized */
916 if (unlikely(map == NULL)) {
917 stats->head_buckets = 0;

--- 11 unchanged lines hidden (view full) ---

929
930 do {
931 version = seqlock_read_begin(&head->sequence);
932 buckets = 0;
933 entries = 0;
934 b = head;
935 do {
936 for (j = 0; j < QHT_BUCKET_ENTRIES; j++) {
936 if (atomic_read(&b->pointers[j]) == NULL) {
937 if (qatomic_read(&b->pointers[j]) == NULL) {
937 break;
938 }
939 entries++;
940 }
941 buckets++;
938 break;
939 }
940 entries++;
941 }
942 buckets++;
942 b = atomic_rcu_read(&b->next);
943 b = qatomic_rcu_read(&b->next);
943 } while (b);
944 } while (seqlock_read_retry(&head->sequence, version));
945
946 if (entries) {
947 qdist_inc(&stats->chain, buckets);
948 qdist_inc(&stats->occupancy,
949 (double)entries / QHT_BUCKET_ENTRIES / buckets);
950 stats->used_head_buckets++;

--- 12 unchanged lines hidden ---
944 } while (b);
945 } while (seqlock_read_retry(&head->sequence, version));
946
947 if (entries) {
948 qdist_inc(&stats->chain, buckets);
949 qdist_inc(&stats->occupancy,
950 (double)entries / QHT_BUCKET_ENTRIES / buckets);
951 stats->used_head_buckets++;

--- 12 unchanged lines hidden ---