xref: /openbmc/openbmc/poky/bitbake/lib/hashserv/server.py (revision c9537f57ab488bf5d90132917b0184e2527970a5)
1# Copyright (C) 2019 Garmin Ltd.
2#
3# SPDX-License-Identifier: GPL-2.0-only
4#
5
6from datetime import datetime, timedelta
7import asyncio
8import logging
9import math
10import time
11import os
12import base64
13import json
14import hashlib
15from . import create_async_client
16import bb.asyncrpc
17
18logger = logging.getLogger("hashserv.server")
19
20
21# This permission only exists to match nothing
22NONE_PERM = "@none"
23
24READ_PERM = "@read"
25REPORT_PERM = "@report"
26DB_ADMIN_PERM = "@db-admin"
27USER_ADMIN_PERM = "@user-admin"
28ALL_PERM = "@all"
29
30ALL_PERMISSIONS = {
31    READ_PERM,
32    REPORT_PERM,
33    DB_ADMIN_PERM,
34    USER_ADMIN_PERM,
35    ALL_PERM,
36}
37
38DEFAULT_ANON_PERMS = (
39    READ_PERM,
40    REPORT_PERM,
41    DB_ADMIN_PERM,
42)
43
44TOKEN_ALGORITHM = "sha256"
45
46# 48 bytes of random data will result in 64 characters when base64
47# encoded. This number also ensures that the base64 encoding won't have any
48# trailing '=' characters.
49TOKEN_SIZE = 48
50
51SALT_SIZE = 8
52
53
54class Measurement(object):
55    def __init__(self, sample):
56        self.sample = sample
57
58    def start(self):
59        self.start_time = time.perf_counter()
60
61    def end(self):
62        self.sample.add(time.perf_counter() - self.start_time)
63
64    def __enter__(self):
65        self.start()
66        return self
67
68    def __exit__(self, *args, **kwargs):
69        self.end()
70
71
72class Sample(object):
73    def __init__(self, stats):
74        self.stats = stats
75        self.num_samples = 0
76        self.elapsed = 0
77
78    def measure(self):
79        return Measurement(self)
80
81    def __enter__(self):
82        return self
83
84    def __exit__(self, *args, **kwargs):
85        self.end()
86
87    def add(self, elapsed):
88        self.num_samples += 1
89        self.elapsed += elapsed
90
91    def end(self):
92        if self.num_samples:
93            self.stats.add(self.elapsed)
94            self.num_samples = 0
95            self.elapsed = 0
96
97
98class Stats(object):
99    def __init__(self):
100        self.reset()
101
102    def reset(self):
103        self.num = 0
104        self.total_time = 0
105        self.max_time = 0
106        self.m = 0
107        self.s = 0
108        self.current_elapsed = None
109
110    def add(self, elapsed):
111        self.num += 1
112        if self.num == 1:
113            self.m = elapsed
114            self.s = 0
115        else:
116            last_m = self.m
117            self.m = last_m + (elapsed - last_m) / self.num
118            self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
119
120        self.total_time += elapsed
121
122        if self.max_time < elapsed:
123            self.max_time = elapsed
124
125    def start_sample(self):
126        return Sample(self)
127
128    @property
129    def average(self):
130        if self.num == 0:
131            return 0
132        return self.total_time / self.num
133
134    @property
135    def stdev(self):
136        if self.num <= 1:
137            return 0
138        return math.sqrt(self.s / (self.num - 1))
139
140    def todict(self):
141        return {
142            k: getattr(self, k)
143            for k in ("num", "total_time", "max_time", "average", "stdev")
144        }
145
146
147token_refresh_semaphore = asyncio.Lock()
148
149
150async def new_token():
151    # Prevent malicious users from using this API to deduce the entropy
152    # pool on the server and thus be able to guess a token. *All* token
153    # refresh requests lock the same global semaphore and then sleep for a
154    # short time. The effectively rate limits the total number of requests
155    # than can be made across all clients to 10/second, which should be enough
156    # since you have to be an authenticated users to make the request in the
157    # first place
158    async with token_refresh_semaphore:
159        await asyncio.sleep(0.1)
160        raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK)
161
162    return base64.b64encode(raw, b"._").decode("utf-8")
163
164
165def new_salt():
166    return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex()
167
168
169def hash_token(algo, salt, token):
170    h = hashlib.new(algo)
171    h.update(salt.encode("utf-8"))
172    h.update(token.encode("utf-8"))
173    return ":".join([algo, salt, h.hexdigest()])
174
175
176def permissions(*permissions, allow_anon=True, allow_self_service=False):
177    """
178    Function decorator that can be used to decorate an RPC function call and
179    check that the current users permissions match the require permissions.
180
181    If allow_anon is True, the user will also be allowed to make the RPC call
182    if the anonymous user permissions match the permissions.
183
184    If allow_self_service is True, and the "username" property in the request
185    is the currently logged in user, or not specified, the user will also be
186    allowed to make the request. This allows users to access normal privileged
187    API, as long as they are only modifying their own user properties (e.g.
188    users can be allowed to reset their own token without @user-admin
189    permissions, but not the token for any other user.
190    """
191
192    def wrapper(func):
193        async def wrap(self, request):
194            if allow_self_service and self.user is not None:
195                username = request.get("username", self.user.username)
196                if username == self.user.username:
197                    request["username"] = self.user.username
198                    return await func(self, request)
199
200            if not self.user_has_permissions(*permissions, allow_anon=allow_anon):
201                if not self.user:
202                    username = "Anonymous user"
203                    user_perms = self.server.anon_perms
204                else:
205                    username = self.user.username
206                    user_perms = self.user.permissions
207
208                self.logger.info(
209                    "User %s with permissions %r denied from calling %s. Missing permissions(s) %r",
210                    username,
211                    ", ".join(user_perms),
212                    func.__name__,
213                    ", ".join(permissions),
214                )
215                raise bb.asyncrpc.InvokeError(
216                    f"{username} is not allowed to access permissions(s) {', '.join(permissions)}"
217                )
218
219            return await func(self, request)
220
221        return wrap
222
223    return wrapper
224
225
226class ServerClient(bb.asyncrpc.AsyncServerConnection):
227    def __init__(self, socket, server):
228        super().__init__(socket, "OEHASHEQUIV", server.logger)
229        self.server = server
230        self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
231        self.user = None
232
233        self.handlers.update(
234            {
235                "get": self.handle_get,
236                "get-outhash": self.handle_get_outhash,
237                "get-stream": self.handle_get_stream,
238                "exists-stream": self.handle_exists_stream,
239                "get-stats": self.handle_get_stats,
240                "get-db-usage": self.handle_get_db_usage,
241                "get-db-query-columns": self.handle_get_db_query_columns,
242                # Not always read-only, but internally checks if the server is
243                # read-only
244                "report": self.handle_report,
245                "auth": self.handle_auth,
246                "get-user": self.handle_get_user,
247                "get-all-users": self.handle_get_all_users,
248                "become-user": self.handle_become_user,
249            }
250        )
251
252        if not self.server.read_only:
253            self.handlers.update(
254                {
255                    "report-equiv": self.handle_equivreport,
256                    "reset-stats": self.handle_reset_stats,
257                    "backfill-wait": self.handle_backfill_wait,
258                    "remove": self.handle_remove,
259                    "gc-mark": self.handle_gc_mark,
260                    "gc-mark-stream": self.handle_gc_mark_stream,
261                    "gc-sweep": self.handle_gc_sweep,
262                    "gc-status": self.handle_gc_status,
263                    "clean-unused": self.handle_clean_unused,
264                    "refresh-token": self.handle_refresh_token,
265                    "set-user-perms": self.handle_set_perms,
266                    "new-user": self.handle_new_user,
267                    "delete-user": self.handle_delete_user,
268                }
269            )
270
271    def raise_no_user_error(self, username):
272        raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists")
273
274    def user_has_permissions(self, *permissions, allow_anon=True):
275        permissions = set(permissions)
276        if allow_anon:
277            if ALL_PERM in self.server.anon_perms:
278                return True
279
280            if not permissions - self.server.anon_perms:
281                return True
282
283        if self.user is None:
284            return False
285
286        if ALL_PERM in self.user.permissions:
287            return True
288
289        if not permissions - self.user.permissions:
290            return True
291
292        return False
293
294    def validate_proto_version(self):
295        return self.proto_version > (1, 0) and self.proto_version <= (1, 1)
296
297    async def process_requests(self):
298        async with self.server.db_engine.connect(self.logger) as db:
299            self.db = db
300            if self.server.upstream is not None:
301                self.upstream_client = await create_async_client(self.server.upstream)
302            else:
303                self.upstream_client = None
304
305            try:
306                await super().process_requests()
307            finally:
308                if self.upstream_client is not None:
309                    await self.upstream_client.close()
310
311    async def dispatch_message(self, msg):
312        for k in self.handlers.keys():
313            if k in msg:
314                self.logger.debug("Handling %s" % k)
315                if "stream" in k:
316                    return await self.handlers[k](msg[k])
317                else:
318                    with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure():
319                        return await self.handlers[k](msg[k])
320
321        raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
322
323    @permissions(READ_PERM)
324    async def handle_get(self, request):
325        method = request["method"]
326        taskhash = request["taskhash"]
327        fetch_all = request.get("all", False)
328
329        return await self.get_unihash(method, taskhash, fetch_all)
330
331    async def get_unihash(self, method, taskhash, fetch_all=False):
332        d = None
333
334        if fetch_all:
335            row = await self.db.get_unihash_by_taskhash_full(method, taskhash)
336            if row is not None:
337                d = {k: row[k] for k in row.keys()}
338            elif self.upstream_client is not None:
339                d = await self.upstream_client.get_taskhash(method, taskhash, True)
340                await self.update_unified(d)
341        else:
342            row = await self.db.get_equivalent(method, taskhash)
343
344            if row is not None:
345                d = {k: row[k] for k in row.keys()}
346            elif self.upstream_client is not None:
347                d = await self.upstream_client.get_taskhash(method, taskhash)
348                await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
349
350        return d
351
352    @permissions(READ_PERM)
353    async def handle_get_outhash(self, request):
354        method = request["method"]
355        outhash = request["outhash"]
356        taskhash = request["taskhash"]
357        with_unihash = request.get("with_unihash", True)
358
359        return await self.get_outhash(method, outhash, taskhash, with_unihash)
360
361    async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
362        d = None
363        if with_unihash:
364            row = await self.db.get_unihash_by_outhash(method, outhash)
365        else:
366            row = await self.db.get_outhash(method, outhash)
367
368        if row is not None:
369            d = {k: row[k] for k in row.keys()}
370        elif self.upstream_client is not None:
371            d = await self.upstream_client.get_outhash(method, outhash, taskhash)
372            await self.update_unified(d)
373
374        return d
375
376    async def update_unified(self, data):
377        if data is None:
378            return
379
380        await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
381        await self.db.insert_outhash(data)
382
383    async def _stream_handler(self, handler):
384        await self.socket.send_message("ok")
385
386        while True:
387            upstream = None
388
389            l = await self.socket.recv()
390            if not l:
391                break
392
393            try:
394                # This inner loop is very sensitive and must be as fast as
395                # possible (which is why the request sample is handled manually
396                # instead of using 'with', and also why logging statements are
397                # commented out.
398                self.request_sample = self.server.request_stats.start_sample()
399                request_measure = self.request_sample.measure()
400                request_measure.start()
401
402                if l == "END":
403                    break
404
405                msg = await handler(l)
406                await self.socket.send(msg)
407            finally:
408                request_measure.end()
409                self.request_sample.end()
410
411        await self.socket.send("ok")
412        return self.NO_RESPONSE
413
414    @permissions(READ_PERM)
415    async def handle_get_stream(self, request):
416        async def handler(l):
417            (method, taskhash) = l.split()
418            # self.logger.debug('Looking up %s %s' % (method, taskhash))
419            row = await self.db.get_equivalent(method, taskhash)
420
421            if row is not None:
422                # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
423                return row["unihash"]
424
425            if self.upstream_client is not None:
426                upstream = await self.upstream_client.get_unihash(method, taskhash)
427                if upstream:
428                    await self.server.backfill_queue.put((method, taskhash))
429                    return upstream
430
431            return ""
432
433        return await self._stream_handler(handler)
434
435    @permissions(READ_PERM)
436    async def handle_exists_stream(self, request):
437        async def handler(l):
438            if await self.db.unihash_exists(l):
439                return "true"
440
441            if self.upstream_client is not None:
442                if await self.upstream_client.unihash_exists(l):
443                    return "true"
444
445            return "false"
446
447        return await self._stream_handler(handler)
448
449    async def report_readonly(self, data):
450        method = data["method"]
451        outhash = data["outhash"]
452        taskhash = data["taskhash"]
453
454        info = await self.get_outhash(method, outhash, taskhash)
455        if info:
456            unihash = info["unihash"]
457        else:
458            unihash = data["unihash"]
459
460        return {
461            "taskhash": taskhash,
462            "method": method,
463            "unihash": unihash,
464        }
465
466    # Since this can be called either read only or to report, the check to
467    # report is made inside the function
468    @permissions(READ_PERM)
469    async def handle_report(self, data):
470        if self.server.read_only or not self.user_has_permissions(REPORT_PERM):
471            return await self.report_readonly(data)
472
473        outhash_data = {
474            "method": data["method"],
475            "outhash": data["outhash"],
476            "taskhash": data["taskhash"],
477            "created": datetime.now(),
478        }
479
480        for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"):
481            if k in data:
482                outhash_data[k] = data[k]
483
484        if self.user:
485            outhash_data["owner"] = self.user.username
486
487        # Insert the new entry, unless it already exists
488        if await self.db.insert_outhash(outhash_data):
489            # If this row is new, check if it is equivalent to another
490            # output hash
491            row = await self.db.get_equivalent_for_outhash(
492                data["method"], data["outhash"], data["taskhash"]
493            )
494
495            if row is not None:
496                # A matching output hash was found. Set our taskhash to the
497                # same unihash since they are equivalent
498                unihash = row["unihash"]
499            else:
500                # No matching output hash was found. This is probably the
501                # first outhash to be added.
502                unihash = data["unihash"]
503
504                # Query upstream to see if it has a unihash we can use
505                if self.upstream_client is not None:
506                    upstream_data = await self.upstream_client.get_outhash(
507                        data["method"], data["outhash"], data["taskhash"]
508                    )
509                    if upstream_data is not None:
510                        unihash = upstream_data["unihash"]
511
512            await self.db.insert_unihash(data["method"], data["taskhash"], unihash)
513
514        unihash_data = await self.get_unihash(data["method"], data["taskhash"])
515        if unihash_data is not None:
516            unihash = unihash_data["unihash"]
517        else:
518            unihash = data["unihash"]
519
520        return {
521            "taskhash": data["taskhash"],
522            "method": data["method"],
523            "unihash": unihash,
524        }
525
526    @permissions(READ_PERM, REPORT_PERM)
527    async def handle_equivreport(self, data):
528        await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
529
530        # Fetch the unihash that will be reported for the taskhash. If the
531        # unihash matches, it means this row was inserted (or the mapping
532        # was already valid)
533        row = await self.db.get_equivalent(data["method"], data["taskhash"])
534
535        if row["unihash"] == data["unihash"]:
536            self.logger.info(
537                "Adding taskhash equivalence for %s with unihash %s",
538                data["taskhash"],
539                row["unihash"],
540            )
541
542        return {k: row[k] for k in ("taskhash", "method", "unihash")}
543
544    @permissions(READ_PERM)
545    async def handle_get_stats(self, request):
546        return {
547            "requests": self.server.request_stats.todict(),
548        }
549
550    @permissions(DB_ADMIN_PERM)
551    async def handle_reset_stats(self, request):
552        d = {
553            "requests": self.server.request_stats.todict(),
554        }
555
556        self.server.request_stats.reset()
557        return d
558
559    @permissions(READ_PERM)
560    async def handle_backfill_wait(self, request):
561        d = {
562            "tasks": self.server.backfill_queue.qsize(),
563        }
564        await self.server.backfill_queue.join()
565        return d
566
567    @permissions(DB_ADMIN_PERM)
568    async def handle_remove(self, request):
569        condition = request["where"]
570        if not isinstance(condition, dict):
571            raise TypeError("Bad condition type %s" % type(condition))
572
573        return {"count": await self.db.remove(condition)}
574
575    @permissions(DB_ADMIN_PERM)
576    async def handle_gc_mark(self, request):
577        condition = request["where"]
578        mark = request["mark"]
579
580        if not isinstance(condition, dict):
581            raise TypeError("Bad condition type %s" % type(condition))
582
583        if not isinstance(mark, str):
584            raise TypeError("Bad mark type %s" % type(mark))
585
586        return {"count": await self.db.gc_mark(mark, condition)}
587
588    @permissions(DB_ADMIN_PERM)
589    async def handle_gc_mark_stream(self, request):
590        async def handler(line):
591            try:
592                decoded_line = json.loads(line)
593            except json.JSONDecodeError as exc:
594                raise bb.asyncrpc.InvokeError(
595                    "Could not decode JSONL input '%s'" % line
596                ) from exc
597
598            try:
599                mark = decoded_line["mark"]
600                condition = decoded_line["where"]
601                if not isinstance(mark, str):
602                    raise TypeError("Bad mark type %s" % type(mark))
603
604                if not isinstance(condition, dict):
605                    raise TypeError("Bad condition type %s" % type(condition))
606            except KeyError as exc:
607                raise bb.asyncrpc.InvokeError(
608                    "Input line is missing key '%s' " % exc
609                ) from exc
610
611            return json.dumps({"count": await self.db.gc_mark(mark, condition)})
612
613        return await self._stream_handler(handler)
614
615    @permissions(DB_ADMIN_PERM)
616    async def handle_gc_sweep(self, request):
617        mark = request["mark"]
618
619        if not isinstance(mark, str):
620            raise TypeError("Bad mark type %s" % type(mark))
621
622        current_mark = await self.db.get_current_gc_mark()
623
624        if not current_mark or mark != current_mark:
625            raise bb.asyncrpc.InvokeError(
626                f"'{mark}' is not the current mark. Refusing to sweep"
627            )
628
629        count = await self.db.gc_sweep()
630
631        return {"count": count}
632
633    @permissions(DB_ADMIN_PERM)
634    async def handle_gc_status(self, request):
635        (keep_rows, remove_rows, current_mark) = await self.db.gc_status()
636        return {
637            "keep": keep_rows,
638            "remove": remove_rows,
639            "mark": current_mark,
640        }
641
642    @permissions(DB_ADMIN_PERM)
643    async def handle_clean_unused(self, request):
644        max_age = request["max_age_seconds"]
645        oldest = datetime.now() - timedelta(seconds=-max_age)
646        return {"count": await self.db.clean_unused(oldest)}
647
648    @permissions(DB_ADMIN_PERM)
649    async def handle_get_db_usage(self, request):
650        return {"usage": await self.db.get_usage()}
651
652    @permissions(DB_ADMIN_PERM)
653    async def handle_get_db_query_columns(self, request):
654        return {"columns": await self.db.get_query_columns()}
655
656    # The authentication API is always allowed
657    async def handle_auth(self, request):
658        username = str(request["username"])
659        token = str(request["token"])
660
661        async def fail_auth():
662            nonlocal username
663            # Rate limit bad login attempts
664            await asyncio.sleep(1)
665            raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}")
666
667        user, db_token = await self.db.lookup_user_token(username)
668
669        if not user or not db_token:
670            await fail_auth()
671
672        try:
673            algo, salt, _ = db_token.split(":")
674        except ValueError:
675            await fail_auth()
676
677        if hash_token(algo, salt, token) != db_token:
678            await fail_auth()
679
680        self.user = user
681
682        self.logger.info("Authenticated as %s", username)
683
684        return {
685            "result": True,
686            "username": self.user.username,
687            "permissions": sorted(list(self.user.permissions)),
688        }
689
690    @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
691    async def handle_refresh_token(self, request):
692        username = str(request["username"])
693
694        token = await new_token()
695
696        updated = await self.db.set_user_token(
697            username,
698            hash_token(TOKEN_ALGORITHM, new_salt(), token),
699        )
700        if not updated:
701            self.raise_no_user_error(username)
702
703        return {"username": username, "token": token}
704
705    def get_perm_arg(self, arg):
706        if not isinstance(arg, list):
707            raise bb.asyncrpc.InvokeError("Unexpected type for permissions")
708
709        arg = set(arg)
710        try:
711            arg.remove(NONE_PERM)
712        except KeyError:
713            pass
714
715        unknown_perms = arg - ALL_PERMISSIONS
716        if unknown_perms:
717            raise bb.asyncrpc.InvokeError(
718                "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms)))
719            )
720
721        return sorted(list(arg))
722
723    def return_perms(self, permissions):
724        if ALL_PERM in permissions:
725            return sorted(list(ALL_PERMISSIONS))
726        return sorted(list(permissions))
727
728    @permissions(USER_ADMIN_PERM, allow_anon=False)
729    async def handle_set_perms(self, request):
730        username = str(request["username"])
731        permissions = self.get_perm_arg(request["permissions"])
732
733        if not await self.db.set_user_perms(username, permissions):
734            self.raise_no_user_error(username)
735
736        return {
737            "username": username,
738            "permissions": self.return_perms(permissions),
739        }
740
741    @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
742    async def handle_get_user(self, request):
743        username = str(request["username"])
744
745        user = await self.db.lookup_user(username)
746        if user is None:
747            return None
748
749        return {
750            "username": user.username,
751            "permissions": self.return_perms(user.permissions),
752        }
753
754    @permissions(USER_ADMIN_PERM, allow_anon=False)
755    async def handle_get_all_users(self, request):
756        users = await self.db.get_all_users()
757        return {
758            "users": [
759                {
760                    "username": u.username,
761                    "permissions": self.return_perms(u.permissions),
762                }
763                for u in users
764            ]
765        }
766
767    @permissions(USER_ADMIN_PERM, allow_anon=False)
768    async def handle_new_user(self, request):
769        username = str(request["username"])
770        permissions = self.get_perm_arg(request["permissions"])
771
772        token = await new_token()
773
774        inserted = await self.db.new_user(
775            username,
776            permissions,
777            hash_token(TOKEN_ALGORITHM, new_salt(), token),
778        )
779        if not inserted:
780            raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'")
781
782        return {
783            "username": username,
784            "permissions": self.return_perms(permissions),
785            "token": token,
786        }
787
788    @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
789    async def handle_delete_user(self, request):
790        username = str(request["username"])
791
792        if not await self.db.delete_user(username):
793            self.raise_no_user_error(username)
794
795        return {"username": username}
796
797    @permissions(USER_ADMIN_PERM, allow_anon=False)
798    async def handle_become_user(self, request):
799        username = str(request["username"])
800
801        user = await self.db.lookup_user(username)
802        if user is None:
803            raise bb.asyncrpc.InvokeError(f"User {username} doesn't exist")
804
805        self.user = user
806
807        self.logger.info("Became user %s", username)
808
809        return {
810            "username": self.user.username,
811            "permissions": self.return_perms(self.user.permissions),
812        }
813
814
815class Server(bb.asyncrpc.AsyncServer):
816    def __init__(
817        self,
818        db_engine,
819        upstream=None,
820        read_only=False,
821        anon_perms=DEFAULT_ANON_PERMS,
822        admin_username=None,
823        admin_password=None,
824    ):
825        if upstream and read_only:
826            raise bb.asyncrpc.ServerError(
827                "Read-only hashserv cannot pull from an upstream server"
828            )
829
830        disallowed_perms = set(anon_perms) - set(
831            [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM]
832        )
833
834        if disallowed_perms:
835            raise bb.asyncrpc.ServerError(
836                f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users"
837            )
838
839        super().__init__(logger)
840
841        self.request_stats = Stats()
842        self.db_engine = db_engine
843        self.upstream = upstream
844        self.read_only = read_only
845        self.backfill_queue = None
846        self.anon_perms = set(anon_perms)
847        self.admin_username = admin_username
848        self.admin_password = admin_password
849
850        self.logger.info(
851            "Anonymous user permissions are: %s", ", ".join(self.anon_perms)
852        )
853
854    def accept_client(self, socket):
855        return ServerClient(socket, self)
856
857    async def create_admin_user(self):
858        admin_permissions = (ALL_PERM,)
859        async with self.db_engine.connect(self.logger) as db:
860            added = await db.new_user(
861                self.admin_username,
862                admin_permissions,
863                hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
864            )
865            if added:
866                self.logger.info("Created admin user '%s'", self.admin_username)
867            else:
868                await db.set_user_perms(
869                    self.admin_username,
870                    admin_permissions,
871                )
872                await db.set_user_token(
873                    self.admin_username,
874                    hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
875                )
876                self.logger.info("Admin user '%s' updated", self.admin_username)
877
878    async def backfill_worker_task(self):
879        async with await create_async_client(
880            self.upstream
881        ) as client, self.db_engine.connect(self.logger) as db:
882            while True:
883                item = await self.backfill_queue.get()
884                if item is None:
885                    self.backfill_queue.task_done()
886                    break
887
888                method, taskhash = item
889                d = await client.get_taskhash(method, taskhash)
890                if d is not None:
891                    await db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
892                self.backfill_queue.task_done()
893
894    def start(self):
895        tasks = super().start()
896        if self.upstream:
897            self.backfill_queue = asyncio.Queue()
898            tasks += [self.backfill_worker_task()]
899
900        self.loop.run_until_complete(self.db_engine.create())
901
902        if self.admin_username:
903            self.loop.run_until_complete(self.create_admin_user())
904
905        return tasks
906
907    async def stop(self):
908        if self.backfill_queue is not None:
909            await self.backfill_queue.put(None)
910        await super().stop()
911