1# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
6from datetime import datetime, timedelta
7import asyncio
8import logging
9import math
10import time
11import os
12import base64
13import hashlib
14from . import create_async_client
15import bb.asyncrpc
16
17logger = logging.getLogger("hashserv.server")
18
19
20# This permission only exists to match nothing
21NONE_PERM = "@none"
22
23READ_PERM = "@read"
24REPORT_PERM = "@report"
25DB_ADMIN_PERM = "@db-admin"
26USER_ADMIN_PERM = "@user-admin"
27ALL_PERM = "@all"
28
29ALL_PERMISSIONS = {
30    READ_PERM,
31    REPORT_PERM,
32    DB_ADMIN_PERM,
33    USER_ADMIN_PERM,
34    ALL_PERM,
35}
36
37DEFAULT_ANON_PERMS = (
38    READ_PERM,
39    REPORT_PERM,
40    DB_ADMIN_PERM,
41)
42
43TOKEN_ALGORITHM = "sha256"
44
45# 48 bytes of random data will result in 64 characters when base64
46# encoded. This number also ensures that the base64 encoding won't have any
47# trailing '=' characters.
48TOKEN_SIZE = 48
49
50SALT_SIZE = 8
51
52
53class Measurement(object):
54    def __init__(self, sample):
55        self.sample = sample
56
57    def start(self):
58        self.start_time = time.perf_counter()
59
60    def end(self):
61        self.sample.add(time.perf_counter() - self.start_time)
62
63    def __enter__(self):
64        self.start()
65        return self
66
67    def __exit__(self, *args, **kwargs):
68        self.end()
69
70
71class Sample(object):
72    def __init__(self, stats):
73        self.stats = stats
74        self.num_samples = 0
75        self.elapsed = 0
76
77    def measure(self):
78        return Measurement(self)
79
80    def __enter__(self):
81        return self
82
83    def __exit__(self, *args, **kwargs):
84        self.end()
85
86    def add(self, elapsed):
87        self.num_samples += 1
88        self.elapsed += elapsed
89
90    def end(self):
91        if self.num_samples:
92            self.stats.add(self.elapsed)
93            self.num_samples = 0
94            self.elapsed = 0
95
96
97class Stats(object):
98    def __init__(self):
99        self.reset()
100
101    def reset(self):
102        self.num = 0
103        self.total_time = 0
104        self.max_time = 0
105        self.m = 0
106        self.s = 0
107        self.current_elapsed = None
108
109    def add(self, elapsed):
110        self.num += 1
111        if self.num == 1:
112            self.m = elapsed
113            self.s = 0
114        else:
115            last_m = self.m
116            self.m = last_m + (elapsed - last_m) / self.num
117            self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
118
119        self.total_time += elapsed
120
121        if self.max_time < elapsed:
122            self.max_time = elapsed
123
124    def start_sample(self):
125        return Sample(self)
126
127    @property
128    def average(self):
129        if self.num == 0:
130            return 0
131        return self.total_time / self.num
132
133    @property
134    def stdev(self):
135        if self.num <= 1:
136            return 0
137        return math.sqrt(self.s / (self.num - 1))
138
139    def todict(self):
140        return {
141            k: getattr(self, k)
142            for k in ("num", "total_time", "max_time", "average", "stdev")
143        }
144
145
146token_refresh_semaphore = asyncio.Lock()
147
148
149async def new_token():
150    # Prevent malicious users from using this API to deduce the entropy
151    # pool on the server and thus be able to guess a token. *All* token
152    # refresh requests lock the same global semaphore and then sleep for a
153    # short time. The effectively rate limits the total number of requests
154    # than can be made across all clients to 10/second, which should be enough
155    # since you have to be an authenticated users to make the request in the
156    # first place
157    async with token_refresh_semaphore:
158        await asyncio.sleep(0.1)
159        raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK)
160
161    return base64.b64encode(raw, b"._").decode("utf-8")
162
163
164def new_salt():
165    return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex()
166
167
168def hash_token(algo, salt, token):
169    h = hashlib.new(algo)
170    h.update(salt.encode("utf-8"))
171    h.update(token.encode("utf-8"))
172    return ":".join([algo, salt, h.hexdigest()])
173
174
175def permissions(*permissions, allow_anon=True, allow_self_service=False):
176    """
177    Function decorator that can be used to decorate an RPC function call and
178    check that the current users permissions match the require permissions.
179
180    If allow_anon is True, the user will also be allowed to make the RPC call
181    if the anonymous user permissions match the permissions.
182
183    If allow_self_service is True, and the "username" property in the request
184    is the currently logged in user, or not specified, the user will also be
185    allowed to make the request. This allows users to access normal privileged
186    API, as long as they are only modifying their own user properties (e.g.
187    users can be allowed to reset their own token without @user-admin
188    permissions, but not the token for any other user.
189    """
190
191    def wrapper(func):
192        async def wrap(self, request):
193            if allow_self_service and self.user is not None:
194                username = request.get("username", self.user.username)
195                if username == self.user.username:
196                    request["username"] = self.user.username
197                    return await func(self, request)
198
199            if not self.user_has_permissions(*permissions, allow_anon=allow_anon):
200                if not self.user:
201                    username = "Anonymous user"
202                    user_perms = self.server.anon_perms
203                else:
204                    username = self.user.username
205                    user_perms = self.user.permissions
206
207                self.logger.info(
208                    "User %s with permissions %r denied from calling %s. Missing permissions(s) %r",
209                    username,
210                    ", ".join(user_perms),
211                    func.__name__,
212                    ", ".join(permissions),
213                )
214                raise bb.asyncrpc.InvokeError(
215                    f"{username} is not allowed to access permissions(s) {', '.join(permissions)}"
216                )
217
218            return await func(self, request)
219
220        return wrap
221
222    return wrapper
223
224
225class ServerClient(bb.asyncrpc.AsyncServerConnection):
226    def __init__(self, socket, server):
227        super().__init__(socket, "OEHASHEQUIV", server.logger)
228        self.server = server
229        self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
230        self.user = None
231
232        self.handlers.update(
233            {
234                "get": self.handle_get,
235                "get-outhash": self.handle_get_outhash,
236                "get-stream": self.handle_get_stream,
237                "exists-stream": self.handle_exists_stream,
238                "get-stats": self.handle_get_stats,
239                "get-db-usage": self.handle_get_db_usage,
240                "get-db-query-columns": self.handle_get_db_query_columns,
241                # Not always read-only, but internally checks if the server is
242                # read-only
243                "report": self.handle_report,
244                "auth": self.handle_auth,
245                "get-user": self.handle_get_user,
246                "get-all-users": self.handle_get_all_users,
247                "become-user": self.handle_become_user,
248            }
249        )
250
251        if not self.server.read_only:
252            self.handlers.update(
253                {
254                    "report-equiv": self.handle_equivreport,
255                    "reset-stats": self.handle_reset_stats,
256                    "backfill-wait": self.handle_backfill_wait,
257                    "remove": self.handle_remove,
258                    "gc-mark": self.handle_gc_mark,
259                    "gc-sweep": self.handle_gc_sweep,
260                    "gc-status": self.handle_gc_status,
261                    "clean-unused": self.handle_clean_unused,
262                    "refresh-token": self.handle_refresh_token,
263                    "set-user-perms": self.handle_set_perms,
264                    "new-user": self.handle_new_user,
265                    "delete-user": self.handle_delete_user,
266                }
267            )
268
269    def raise_no_user_error(self, username):
270        raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists")
271
272    def user_has_permissions(self, *permissions, allow_anon=True):
273        permissions = set(permissions)
274        if allow_anon:
275            if ALL_PERM in self.server.anon_perms:
276                return True
277
278            if not permissions - self.server.anon_perms:
279                return True
280
281        if self.user is None:
282            return False
283
284        if ALL_PERM in self.user.permissions:
285            return True
286
287        if not permissions - self.user.permissions:
288            return True
289
290        return False
291
292    def validate_proto_version(self):
293        return self.proto_version > (1, 0) and self.proto_version <= (1, 1)
294
295    async def process_requests(self):
296        async with self.server.db_engine.connect(self.logger) as db:
297            self.db = db
298            if self.server.upstream is not None:
299                self.upstream_client = await create_async_client(self.server.upstream)
300            else:
301                self.upstream_client = None
302
303            try:
304                await super().process_requests()
305            finally:
306                if self.upstream_client is not None:
307                    await self.upstream_client.close()
308
309    async def dispatch_message(self, msg):
310        for k in self.handlers.keys():
311            if k in msg:
312                self.logger.debug("Handling %s" % k)
313                if "stream" in k:
314                    return await self.handlers[k](msg[k])
315                else:
316                    with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure():
317                        return await self.handlers[k](msg[k])
318
319        raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
320
321    @permissions(READ_PERM)
322    async def handle_get(self, request):
323        method = request["method"]
324        taskhash = request["taskhash"]
325        fetch_all = request.get("all", False)
326
327        return await self.get_unihash(method, taskhash, fetch_all)
328
329    async def get_unihash(self, method, taskhash, fetch_all=False):
330        d = None
331
332        if fetch_all:
333            row = await self.db.get_unihash_by_taskhash_full(method, taskhash)
334            if row is not None:
335                d = {k: row[k] for k in row.keys()}
336            elif self.upstream_client is not None:
337                d = await self.upstream_client.get_taskhash(method, taskhash, True)
338                await self.update_unified(d)
339        else:
340            row = await self.db.get_equivalent(method, taskhash)
341
342            if row is not None:
343                d = {k: row[k] for k in row.keys()}
344            elif self.upstream_client is not None:
345                d = await self.upstream_client.get_taskhash(method, taskhash)
346                await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
347
348        return d
349
350    @permissions(READ_PERM)
351    async def handle_get_outhash(self, request):
352        method = request["method"]
353        outhash = request["outhash"]
354        taskhash = request["taskhash"]
355        with_unihash = request.get("with_unihash", True)
356
357        return await self.get_outhash(method, outhash, taskhash, with_unihash)
358
359    async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
360        d = None
361        if with_unihash:
362            row = await self.db.get_unihash_by_outhash(method, outhash)
363        else:
364            row = await self.db.get_outhash(method, outhash)
365
366        if row is not None:
367            d = {k: row[k] for k in row.keys()}
368        elif self.upstream_client is not None:
369            d = await self.upstream_client.get_outhash(method, outhash, taskhash)
370            await self.update_unified(d)
371
372        return d
373
374    async def update_unified(self, data):
375        if data is None:
376            return
377
378        await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
379        await self.db.insert_outhash(data)
380
381    async def _stream_handler(self, handler):
382        await self.socket.send_message("ok")
383
384        while True:
385            upstream = None
386
387            l = await self.socket.recv()
388            if not l:
389                break
390
391            try:
392                # This inner loop is very sensitive and must be as fast as
393                # possible (which is why the request sample is handled manually
394                # instead of using 'with', and also why logging statements are
395                # commented out.
396                self.request_sample = self.server.request_stats.start_sample()
397                request_measure = self.request_sample.measure()
398                request_measure.start()
399
400                if l == "END":
401                    break
402
403                msg = await handler(l)
404                await self.socket.send(msg)
405            finally:
406                request_measure.end()
407                self.request_sample.end()
408
409        await self.socket.send("ok")
410        return self.NO_RESPONSE
411
412    @permissions(READ_PERM)
413    async def handle_get_stream(self, request):
414        async def handler(l):
415            (method, taskhash) = l.split()
416            # self.logger.debug('Looking up %s %s' % (method, taskhash))
417            row = await self.db.get_equivalent(method, taskhash)
418
419            if row is not None:
420                # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
421                return row["unihash"]
422
423            if self.upstream_client is not None:
424                upstream = await self.upstream_client.get_unihash(method, taskhash)
425                if upstream:
426                    await self.server.backfill_queue.put((method, taskhash))
427                    return upstream
428
429            return ""
430
431        return await self._stream_handler(handler)
432
433    @permissions(READ_PERM)
434    async def handle_exists_stream(self, request):
435        async def handler(l):
436            if await self.db.unihash_exists(l):
437                return "true"
438
439            if self.upstream_client is not None:
440                if await self.upstream_client.unihash_exists(l):
441                    return "true"
442
443            return "false"
444
445        return await self._stream_handler(handler)
446
447    async def report_readonly(self, data):
448        method = data["method"]
449        outhash = data["outhash"]
450        taskhash = data["taskhash"]
451
452        info = await self.get_outhash(method, outhash, taskhash)
453        if info:
454            unihash = info["unihash"]
455        else:
456            unihash = data["unihash"]
457
458        return {
459            "taskhash": taskhash,
460            "method": method,
461            "unihash": unihash,
462        }
463
464    # Since this can be called either read only or to report, the check to
465    # report is made inside the function
466    @permissions(READ_PERM)
467    async def handle_report(self, data):
468        if self.server.read_only or not self.user_has_permissions(REPORT_PERM):
469            return await self.report_readonly(data)
470
471        outhash_data = {
472            "method": data["method"],
473            "outhash": data["outhash"],
474            "taskhash": data["taskhash"],
475            "created": datetime.now(),
476        }
477
478        for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"):
479            if k in data:
480                outhash_data[k] = data[k]
481
482        if self.user:
483            outhash_data["owner"] = self.user.username
484
485        # Insert the new entry, unless it already exists
486        if await self.db.insert_outhash(outhash_data):
487            # If this row is new, check if it is equivalent to another
488            # output hash
489            row = await self.db.get_equivalent_for_outhash(
490                data["method"], data["outhash"], data["taskhash"]
491            )
492
493            if row is not None:
494                # A matching output hash was found. Set our taskhash to the
495                # same unihash since they are equivalent
496                unihash = row["unihash"]
497            else:
498                # No matching output hash was found. This is probably the
499                # first outhash to be added.
500                unihash = data["unihash"]
501
502                # Query upstream to see if it has a unihash we can use
503                if self.upstream_client is not None:
504                    upstream_data = await self.upstream_client.get_outhash(
505                        data["method"], data["outhash"], data["taskhash"]
506                    )
507                    if upstream_data is not None:
508                        unihash = upstream_data["unihash"]
509
510            await self.db.insert_unihash(data["method"], data["taskhash"], unihash)
511
512        unihash_data = await self.get_unihash(data["method"], data["taskhash"])
513        if unihash_data is not None:
514            unihash = unihash_data["unihash"]
515        else:
516            unihash = data["unihash"]
517
518        return {
519            "taskhash": data["taskhash"],
520            "method": data["method"],
521            "unihash": unihash,
522        }
523
524    @permissions(READ_PERM, REPORT_PERM)
525    async def handle_equivreport(self, data):
526        await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
527
528        # Fetch the unihash that will be reported for the taskhash. If the
529        # unihash matches, it means this row was inserted (or the mapping
530        # was already valid)
531        row = await self.db.get_equivalent(data["method"], data["taskhash"])
532
533        if row["unihash"] == data["unihash"]:
534            self.logger.info(
535                "Adding taskhash equivalence for %s with unihash %s",
536                data["taskhash"],
537                row["unihash"],
538            )
539
540        return {k: row[k] for k in ("taskhash", "method", "unihash")}
541
542    @permissions(READ_PERM)
543    async def handle_get_stats(self, request):
544        return {
545            "requests": self.server.request_stats.todict(),
546        }
547
548    @permissions(DB_ADMIN_PERM)
549    async def handle_reset_stats(self, request):
550        d = {
551            "requests": self.server.request_stats.todict(),
552        }
553
554        self.server.request_stats.reset()
555        return d
556
557    @permissions(READ_PERM)
558    async def handle_backfill_wait(self, request):
559        d = {
560            "tasks": self.server.backfill_queue.qsize(),
561        }
562        await self.server.backfill_queue.join()
563        return d
564
565    @permissions(DB_ADMIN_PERM)
566    async def handle_remove(self, request):
567        condition = request["where"]
568        if not isinstance(condition, dict):
569            raise TypeError("Bad condition type %s" % type(condition))
570
571        return {"count": await self.db.remove(condition)}
572
573    @permissions(DB_ADMIN_PERM)
574    async def handle_gc_mark(self, request):
575        condition = request["where"]
576        mark = request["mark"]
577
578        if not isinstance(condition, dict):
579            raise TypeError("Bad condition type %s" % type(condition))
580
581        if not isinstance(mark, str):
582            raise TypeError("Bad mark type %s" % type(mark))
583
584        return {"count": await self.db.gc_mark(mark, condition)}
585
586    @permissions(DB_ADMIN_PERM)
587    async def handle_gc_sweep(self, request):
588        mark = request["mark"]
589
590        if not isinstance(mark, str):
591            raise TypeError("Bad mark type %s" % type(mark))
592
593        current_mark = await self.db.get_current_gc_mark()
594
595        if not current_mark or mark != current_mark:
596            raise bb.asyncrpc.InvokeError(
597                f"'{mark}' is not the current mark. Refusing to sweep"
598            )
599
600        count = await self.db.gc_sweep()
601
602        return {"count": count}
603
604    @permissions(DB_ADMIN_PERM)
605    async def handle_gc_status(self, request):
606        (keep_rows, remove_rows, current_mark) = await self.db.gc_status()
607        return {
608            "keep": keep_rows,
609            "remove": remove_rows,
610            "mark": current_mark,
611        }
612
613    @permissions(DB_ADMIN_PERM)
614    async def handle_clean_unused(self, request):
615        max_age = request["max_age_seconds"]
616        oldest = datetime.now() - timedelta(seconds=-max_age)
617        return {"count": await self.db.clean_unused(oldest)}
618
619    @permissions(DB_ADMIN_PERM)
620    async def handle_get_db_usage(self, request):
621        return {"usage": await self.db.get_usage()}
622
623    @permissions(DB_ADMIN_PERM)
624    async def handle_get_db_query_columns(self, request):
625        return {"columns": await self.db.get_query_columns()}
626
627    # The authentication API is always allowed
628    async def handle_auth(self, request):
629        username = str(request["username"])
630        token = str(request["token"])
631
632        async def fail_auth():
633            nonlocal username
634            # Rate limit bad login attempts
635            await asyncio.sleep(1)
636            raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}")
637
638        user, db_token = await self.db.lookup_user_token(username)
639
640        if not user or not db_token:
641            await fail_auth()
642
643        try:
644            algo, salt, _ = db_token.split(":")
645        except ValueError:
646            await fail_auth()
647
648        if hash_token(algo, salt, token) != db_token:
649            await fail_auth()
650
651        self.user = user
652
653        self.logger.info("Authenticated as %s", username)
654
655        return {
656            "result": True,
657            "username": self.user.username,
658            "permissions": sorted(list(self.user.permissions)),
659        }
660
661    @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
662    async def handle_refresh_token(self, request):
663        username = str(request["username"])
664
665        token = await new_token()
666
667        updated = await self.db.set_user_token(
668            username,
669            hash_token(TOKEN_ALGORITHM, new_salt(), token),
670        )
671        if not updated:
672            self.raise_no_user_error(username)
673
674        return {"username": username, "token": token}
675
676    def get_perm_arg(self, arg):
677        if not isinstance(arg, list):
678            raise bb.asyncrpc.InvokeError("Unexpected type for permissions")
679
680        arg = set(arg)
681        try:
682            arg.remove(NONE_PERM)
683        except KeyError:
684            pass
685
686        unknown_perms = arg - ALL_PERMISSIONS
687        if unknown_perms:
688            raise bb.asyncrpc.InvokeError(
689                "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms)))
690            )
691
692        return sorted(list(arg))
693
694    def return_perms(self, permissions):
695        if ALL_PERM in permissions:
696            return sorted(list(ALL_PERMISSIONS))
697        return sorted(list(permissions))
698
699    @permissions(USER_ADMIN_PERM, allow_anon=False)
700    async def handle_set_perms(self, request):
701        username = str(request["username"])
702        permissions = self.get_perm_arg(request["permissions"])
703
704        if not await self.db.set_user_perms(username, permissions):
705            self.raise_no_user_error(username)
706
707        return {
708            "username": username,
709            "permissions": self.return_perms(permissions),
710        }
711
712    @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
713    async def handle_get_user(self, request):
714        username = str(request["username"])
715
716        user = await self.db.lookup_user(username)
717        if user is None:
718            return None
719
720        return {
721            "username": user.username,
722            "permissions": self.return_perms(user.permissions),
723        }
724
725    @permissions(USER_ADMIN_PERM, allow_anon=False)
726    async def handle_get_all_users(self, request):
727        users = await self.db.get_all_users()
728        return {
729            "users": [
730                {
731                    "username": u.username,
732                    "permissions": self.return_perms(u.permissions),
733                }
734                for u in users
735            ]
736        }
737
738    @permissions(USER_ADMIN_PERM, allow_anon=False)
739    async def handle_new_user(self, request):
740        username = str(request["username"])
741        permissions = self.get_perm_arg(request["permissions"])
742
743        token = await new_token()
744
745        inserted = await self.db.new_user(
746            username,
747            permissions,
748            hash_token(TOKEN_ALGORITHM, new_salt(), token),
749        )
750        if not inserted:
751            raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'")
752
753        return {
754            "username": username,
755            "permissions": self.return_perms(permissions),
756            "token": token,
757        }
758
759    @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
760    async def handle_delete_user(self, request):
761        username = str(request["username"])
762
763        if not await self.db.delete_user(username):
764            self.raise_no_user_error(username)
765
766        return {"username": username}
767
768    @permissions(USER_ADMIN_PERM, allow_anon=False)
769    async def handle_become_user(self, request):
770        username = str(request["username"])
771
772        user = await self.db.lookup_user(username)
773        if user is None:
774            raise bb.asyncrpc.InvokeError(f"User {username} doesn't exist")
775
776        self.user = user
777
778        self.logger.info("Became user %s", username)
779
780        return {
781            "username": self.user.username,
782            "permissions": self.return_perms(self.user.permissions),
783        }
784
785
786class Server(bb.asyncrpc.AsyncServer):
787    def __init__(
788        self,
789        db_engine,
790        upstream=None,
791        read_only=False,
792        anon_perms=DEFAULT_ANON_PERMS,
793        admin_username=None,
794        admin_password=None,
795    ):
796        if upstream and read_only:
797            raise bb.asyncrpc.ServerError(
798                "Read-only hashserv cannot pull from an upstream server"
799            )
800
801        disallowed_perms = set(anon_perms) - set(
802            [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM]
803        )
804
805        if disallowed_perms:
806            raise bb.asyncrpc.ServerError(
807                f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users"
808            )
809
810        super().__init__(logger)
811
812        self.request_stats = Stats()
813        self.db_engine = db_engine
814        self.upstream = upstream
815        self.read_only = read_only
816        self.backfill_queue = None
817        self.anon_perms = set(anon_perms)
818        self.admin_username = admin_username
819        self.admin_password = admin_password
820
821        self.logger.info(
822            "Anonymous user permissions are: %s", ", ".join(self.anon_perms)
823        )
824
825    def accept_client(self, socket):
826        return ServerClient(socket, self)
827
828    async def create_admin_user(self):
829        admin_permissions = (ALL_PERM,)
830        async with self.db_engine.connect(self.logger) as db:
831            added = await db.new_user(
832                self.admin_username,
833                admin_permissions,
834                hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
835            )
836            if added:
837                self.logger.info("Created admin user '%s'", self.admin_username)
838            else:
839                await db.set_user_perms(
840                    self.admin_username,
841                    admin_permissions,
842                )
843                await db.set_user_token(
844                    self.admin_username,
845                    hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
846                )
847                self.logger.info("Admin user '%s' updated", self.admin_username)
848
849    async def backfill_worker_task(self):
850        async with await create_async_client(
851            self.upstream
852        ) as client, self.db_engine.connect(self.logger) as db:
853            while True:
854                item = await self.backfill_queue.get()
855                if item is None:
856                    self.backfill_queue.task_done()
857                    break
858
859                method, taskhash = item
860                d = await client.get_taskhash(method, taskhash)
861                if d is not None:
862                    await db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
863                self.backfill_queue.task_done()
864
865    def start(self):
866        tasks = super().start()
867        if self.upstream:
868            self.backfill_queue = asyncio.Queue()
869            tasks += [self.backfill_worker_task()]
870
871        self.loop.run_until_complete(self.db_engine.create())
872
873        if self.admin_username:
874            self.loop.run_until_complete(self.create_admin_user())
875
876        return tasks
877
878    async def stop(self):
879        if self.backfill_queue is not None:
880            await self.backfill_queue.put(None)
881        await super().stop()
882