xref: /openbmc/openbmc/poky/bitbake/lib/hashserv/server.py (revision 73bd93f1d0a338767f36fd1acb54c52ad057db39)
1 # Copyright (C) 2019 Garmin Ltd.
2 #
3 # SPDX-License-Identifier: GPL-2.0-only
4 #
5 
6 from datetime import datetime, timedelta
7 import asyncio
8 import logging
9 import math
10 import time
11 import os
12 import base64
13 import hashlib
14 from . import create_async_client
15 import bb.asyncrpc
16 
17 logger = logging.getLogger("hashserv.server")
18 
19 
20 # This permission only exists to match nothing
21 NONE_PERM = "@none"
22 
23 READ_PERM = "@read"
24 REPORT_PERM = "@report"
25 DB_ADMIN_PERM = "@db-admin"
26 USER_ADMIN_PERM = "@user-admin"
27 ALL_PERM = "@all"
28 
29 ALL_PERMISSIONS = {
30     READ_PERM,
31     REPORT_PERM,
32     DB_ADMIN_PERM,
33     USER_ADMIN_PERM,
34     ALL_PERM,
35 }
36 
37 DEFAULT_ANON_PERMS = (
38     READ_PERM,
39     REPORT_PERM,
40     DB_ADMIN_PERM,
41 )
42 
43 TOKEN_ALGORITHM = "sha256"
44 
45 # 48 bytes of random data will result in 64 characters when base64
46 # encoded. This number also ensures that the base64 encoding won't have any
47 # trailing '=' characters.
48 TOKEN_SIZE = 48
49 
50 SALT_SIZE = 8
51 
52 
53 class Measurement(object):
54     def __init__(self, sample):
55         self.sample = sample
56 
57     def start(self):
58         self.start_time = time.perf_counter()
59 
60     def end(self):
61         self.sample.add(time.perf_counter() - self.start_time)
62 
63     def __enter__(self):
64         self.start()
65         return self
66 
67     def __exit__(self, *args, **kwargs):
68         self.end()
69 
70 
71 class Sample(object):
72     def __init__(self, stats):
73         self.stats = stats
74         self.num_samples = 0
75         self.elapsed = 0
76 
77     def measure(self):
78         return Measurement(self)
79 
80     def __enter__(self):
81         return self
82 
83     def __exit__(self, *args, **kwargs):
84         self.end()
85 
86     def add(self, elapsed):
87         self.num_samples += 1
88         self.elapsed += elapsed
89 
90     def end(self):
91         if self.num_samples:
92             self.stats.add(self.elapsed)
93             self.num_samples = 0
94             self.elapsed = 0
95 
96 
97 class Stats(object):
98     def __init__(self):
99         self.reset()
100 
101     def reset(self):
102         self.num = 0
103         self.total_time = 0
104         self.max_time = 0
105         self.m = 0
106         self.s = 0
107         self.current_elapsed = None
108 
109     def add(self, elapsed):
110         self.num += 1
111         if self.num == 1:
112             self.m = elapsed
113             self.s = 0
114         else:
115             last_m = self.m
116             self.m = last_m + (elapsed - last_m) / self.num
117             self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
118 
119         self.total_time += elapsed
120 
121         if self.max_time < elapsed:
122             self.max_time = elapsed
123 
124     def start_sample(self):
125         return Sample(self)
126 
127     @property
128     def average(self):
129         if self.num == 0:
130             return 0
131         return self.total_time / self.num
132 
133     @property
134     def stdev(self):
135         if self.num <= 1:
136             return 0
137         return math.sqrt(self.s / (self.num - 1))
138 
139     def todict(self):
140         return {
141             k: getattr(self, k)
142             for k in ("num", "total_time", "max_time", "average", "stdev")
143         }
144 
145 
146 token_refresh_semaphore = asyncio.Lock()
147 
148 
149 async def new_token():
150     # Prevent malicious users from using this API to deduce the entropy
151     # pool on the server and thus be able to guess a token. *All* token
152     # refresh requests lock the same global semaphore and then sleep for a
153     # short time. The effectively rate limits the total number of requests
154     # than can be made across all clients to 10/second, which should be enough
155     # since you have to be an authenticated users to make the request in the
156     # first place
157     async with token_refresh_semaphore:
158         await asyncio.sleep(0.1)
159         raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK)
160 
161     return base64.b64encode(raw, b"._").decode("utf-8")
162 
163 
164 def new_salt():
165     return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex()
166 
167 
168 def hash_token(algo, salt, token):
169     h = hashlib.new(algo)
170     h.update(salt.encode("utf-8"))
171     h.update(token.encode("utf-8"))
172     return ":".join([algo, salt, h.hexdigest()])
173 
174 
175 def permissions(*permissions, allow_anon=True, allow_self_service=False):
176     """
177     Function decorator that can be used to decorate an RPC function call and
178     check that the current users permissions match the require permissions.
179 
180     If allow_anon is True, the user will also be allowed to make the RPC call
181     if the anonymous user permissions match the permissions.
182 
183     If allow_self_service is True, and the "username" property in the request
184     is the currently logged in user, or not specified, the user will also be
185     allowed to make the request. This allows users to access normal privileged
186     API, as long as they are only modifying their own user properties (e.g.
187     users can be allowed to reset their own token without @user-admin
188     permissions, but not the token for any other user.
189     """
190 
191     def wrapper(func):
192         async def wrap(self, request):
193             if allow_self_service and self.user is not None:
194                 username = request.get("username", self.user.username)
195                 if username == self.user.username:
196                     request["username"] = self.user.username
197                     return await func(self, request)
198 
199             if not self.user_has_permissions(*permissions, allow_anon=allow_anon):
200                 if not self.user:
201                     username = "Anonymous user"
202                     user_perms = self.server.anon_perms
203                 else:
204                     username = self.user.username
205                     user_perms = self.user.permissions
206 
207                 self.logger.info(
208                     "User %s with permissions %r denied from calling %s. Missing permissions(s) %r",
209                     username,
210                     ", ".join(user_perms),
211                     func.__name__,
212                     ", ".join(permissions),
213                 )
214                 raise bb.asyncrpc.InvokeError(
215                     f"{username} is not allowed to access permissions(s) {', '.join(permissions)}"
216                 )
217 
218             return await func(self, request)
219 
220         return wrap
221 
222     return wrapper
223 
224 
225 class ServerClient(bb.asyncrpc.AsyncServerConnection):
226     def __init__(self, socket, server):
227         super().__init__(socket, "OEHASHEQUIV", server.logger)
228         self.server = server
229         self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
230         self.user = None
231 
232         self.handlers.update(
233             {
234                 "get": self.handle_get,
235                 "get-outhash": self.handle_get_outhash,
236                 "get-stream": self.handle_get_stream,
237                 "exists-stream": self.handle_exists_stream,
238                 "get-stats": self.handle_get_stats,
239                 "get-db-usage": self.handle_get_db_usage,
240                 "get-db-query-columns": self.handle_get_db_query_columns,
241                 # Not always read-only, but internally checks if the server is
242                 # read-only
243                 "report": self.handle_report,
244                 "auth": self.handle_auth,
245                 "get-user": self.handle_get_user,
246                 "get-all-users": self.handle_get_all_users,
247                 "become-user": self.handle_become_user,
248             }
249         )
250 
251         if not self.server.read_only:
252             self.handlers.update(
253                 {
254                     "report-equiv": self.handle_equivreport,
255                     "reset-stats": self.handle_reset_stats,
256                     "backfill-wait": self.handle_backfill_wait,
257                     "remove": self.handle_remove,
258                     "gc-mark": self.handle_gc_mark,
259                     "gc-sweep": self.handle_gc_sweep,
260                     "gc-status": self.handle_gc_status,
261                     "clean-unused": self.handle_clean_unused,
262                     "refresh-token": self.handle_refresh_token,
263                     "set-user-perms": self.handle_set_perms,
264                     "new-user": self.handle_new_user,
265                     "delete-user": self.handle_delete_user,
266                 }
267             )
268 
269     def raise_no_user_error(self, username):
270         raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists")
271 
272     def user_has_permissions(self, *permissions, allow_anon=True):
273         permissions = set(permissions)
274         if allow_anon:
275             if ALL_PERM in self.server.anon_perms:
276                 return True
277 
278             if not permissions - self.server.anon_perms:
279                 return True
280 
281         if self.user is None:
282             return False
283 
284         if ALL_PERM in self.user.permissions:
285             return True
286 
287         if not permissions - self.user.permissions:
288             return True
289 
290         return False
291 
292     def validate_proto_version(self):
293         return self.proto_version > (1, 0) and self.proto_version <= (1, 1)
294 
295     async def process_requests(self):
296         async with self.server.db_engine.connect(self.logger) as db:
297             self.db = db
298             if self.server.upstream is not None:
299                 self.upstream_client = await create_async_client(self.server.upstream)
300             else:
301                 self.upstream_client = None
302 
303             try:
304                 await super().process_requests()
305             finally:
306                 if self.upstream_client is not None:
307                     await self.upstream_client.close()
308 
309     async def dispatch_message(self, msg):
310         for k in self.handlers.keys():
311             if k in msg:
312                 self.logger.debug("Handling %s" % k)
313                 if "stream" in k:
314                     return await self.handlers[k](msg[k])
315                 else:
316                     with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure():
317                         return await self.handlers[k](msg[k])
318 
319         raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
320 
321     @permissions(READ_PERM)
322     async def handle_get(self, request):
323         method = request["method"]
324         taskhash = request["taskhash"]
325         fetch_all = request.get("all", False)
326 
327         return await self.get_unihash(method, taskhash, fetch_all)
328 
329     async def get_unihash(self, method, taskhash, fetch_all=False):
330         d = None
331 
332         if fetch_all:
333             row = await self.db.get_unihash_by_taskhash_full(method, taskhash)
334             if row is not None:
335                 d = {k: row[k] for k in row.keys()}
336             elif self.upstream_client is not None:
337                 d = await self.upstream_client.get_taskhash(method, taskhash, True)
338                 await self.update_unified(d)
339         else:
340             row = await self.db.get_equivalent(method, taskhash)
341 
342             if row is not None:
343                 d = {k: row[k] for k in row.keys()}
344             elif self.upstream_client is not None:
345                 d = await self.upstream_client.get_taskhash(method, taskhash)
346                 await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
347 
348         return d
349 
350     @permissions(READ_PERM)
351     async def handle_get_outhash(self, request):
352         method = request["method"]
353         outhash = request["outhash"]
354         taskhash = request["taskhash"]
355         with_unihash = request.get("with_unihash", True)
356 
357         return await self.get_outhash(method, outhash, taskhash, with_unihash)
358 
359     async def get_outhash(self, method, outhash, taskhash, with_unihash=True):
360         d = None
361         if with_unihash:
362             row = await self.db.get_unihash_by_outhash(method, outhash)
363         else:
364             row = await self.db.get_outhash(method, outhash)
365 
366         if row is not None:
367             d = {k: row[k] for k in row.keys()}
368         elif self.upstream_client is not None:
369             d = await self.upstream_client.get_outhash(method, outhash, taskhash)
370             await self.update_unified(d)
371 
372         return d
373 
374     async def update_unified(self, data):
375         if data is None:
376             return
377 
378         await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
379         await self.db.insert_outhash(data)
380 
381     async def _stream_handler(self, handler):
382         await self.socket.send_message("ok")
383 
384         while True:
385             upstream = None
386 
387             l = await self.socket.recv()
388             if not l:
389                 break
390 
391             try:
392                 # This inner loop is very sensitive and must be as fast as
393                 # possible (which is why the request sample is handled manually
394                 # instead of using 'with', and also why logging statements are
395                 # commented out.
396                 self.request_sample = self.server.request_stats.start_sample()
397                 request_measure = self.request_sample.measure()
398                 request_measure.start()
399 
400                 if l == "END":
401                     break
402 
403                 msg = await handler(l)
404                 await self.socket.send(msg)
405             finally:
406                 request_measure.end()
407                 self.request_sample.end()
408 
409         await self.socket.send("ok")
410         return self.NO_RESPONSE
411 
412     @permissions(READ_PERM)
413     async def handle_get_stream(self, request):
414         async def handler(l):
415             (method, taskhash) = l.split()
416             # self.logger.debug('Looking up %s %s' % (method, taskhash))
417             row = await self.db.get_equivalent(method, taskhash)
418 
419             if row is not None:
420                 # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
421                 return row["unihash"]
422 
423             if self.upstream_client is not None:
424                 upstream = await self.upstream_client.get_unihash(method, taskhash)
425                 if upstream:
426                     await self.server.backfill_queue.put((method, taskhash))
427                     return upstream
428 
429             return ""
430 
431         return await self._stream_handler(handler)
432 
433     @permissions(READ_PERM)
434     async def handle_exists_stream(self, request):
435         async def handler(l):
436             if await self.db.unihash_exists(l):
437                 return "true"
438 
439             if self.upstream_client is not None:
440                 if await self.upstream_client.unihash_exists(l):
441                     return "true"
442 
443             return "false"
444 
445         return await self._stream_handler(handler)
446 
447     async def report_readonly(self, data):
448         method = data["method"]
449         outhash = data["outhash"]
450         taskhash = data["taskhash"]
451 
452         info = await self.get_outhash(method, outhash, taskhash)
453         if info:
454             unihash = info["unihash"]
455         else:
456             unihash = data["unihash"]
457 
458         return {
459             "taskhash": taskhash,
460             "method": method,
461             "unihash": unihash,
462         }
463 
464     # Since this can be called either read only or to report, the check to
465     # report is made inside the function
466     @permissions(READ_PERM)
467     async def handle_report(self, data):
468         if self.server.read_only or not self.user_has_permissions(REPORT_PERM):
469             return await self.report_readonly(data)
470 
471         outhash_data = {
472             "method": data["method"],
473             "outhash": data["outhash"],
474             "taskhash": data["taskhash"],
475             "created": datetime.now(),
476         }
477 
478         for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"):
479             if k in data:
480                 outhash_data[k] = data[k]
481 
482         if self.user:
483             outhash_data["owner"] = self.user.username
484 
485         # Insert the new entry, unless it already exists
486         if await self.db.insert_outhash(outhash_data):
487             # If this row is new, check if it is equivalent to another
488             # output hash
489             row = await self.db.get_equivalent_for_outhash(
490                 data["method"], data["outhash"], data["taskhash"]
491             )
492 
493             if row is not None:
494                 # A matching output hash was found. Set our taskhash to the
495                 # same unihash since they are equivalent
496                 unihash = row["unihash"]
497             else:
498                 # No matching output hash was found. This is probably the
499                 # first outhash to be added.
500                 unihash = data["unihash"]
501 
502                 # Query upstream to see if it has a unihash we can use
503                 if self.upstream_client is not None:
504                     upstream_data = await self.upstream_client.get_outhash(
505                         data["method"], data["outhash"], data["taskhash"]
506                     )
507                     if upstream_data is not None:
508                         unihash = upstream_data["unihash"]
509 
510             await self.db.insert_unihash(data["method"], data["taskhash"], unihash)
511 
512         unihash_data = await self.get_unihash(data["method"], data["taskhash"])
513         if unihash_data is not None:
514             unihash = unihash_data["unihash"]
515         else:
516             unihash = data["unihash"]
517 
518         return {
519             "taskhash": data["taskhash"],
520             "method": data["method"],
521             "unihash": unihash,
522         }
523 
524     @permissions(READ_PERM, REPORT_PERM)
525     async def handle_equivreport(self, data):
526         await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"])
527 
528         # Fetch the unihash that will be reported for the taskhash. If the
529         # unihash matches, it means this row was inserted (or the mapping
530         # was already valid)
531         row = await self.db.get_equivalent(data["method"], data["taskhash"])
532 
533         if row["unihash"] == data["unihash"]:
534             self.logger.info(
535                 "Adding taskhash equivalence for %s with unihash %s",
536                 data["taskhash"],
537                 row["unihash"],
538             )
539 
540         return {k: row[k] for k in ("taskhash", "method", "unihash")}
541 
542     @permissions(READ_PERM)
543     async def handle_get_stats(self, request):
544         return {
545             "requests": self.server.request_stats.todict(),
546         }
547 
548     @permissions(DB_ADMIN_PERM)
549     async def handle_reset_stats(self, request):
550         d = {
551             "requests": self.server.request_stats.todict(),
552         }
553 
554         self.server.request_stats.reset()
555         return d
556 
557     @permissions(READ_PERM)
558     async def handle_backfill_wait(self, request):
559         d = {
560             "tasks": self.server.backfill_queue.qsize(),
561         }
562         await self.server.backfill_queue.join()
563         return d
564 
565     @permissions(DB_ADMIN_PERM)
566     async def handle_remove(self, request):
567         condition = request["where"]
568         if not isinstance(condition, dict):
569             raise TypeError("Bad condition type %s" % type(condition))
570 
571         return {"count": await self.db.remove(condition)}
572 
573     @permissions(DB_ADMIN_PERM)
574     async def handle_gc_mark(self, request):
575         condition = request["where"]
576         mark = request["mark"]
577 
578         if not isinstance(condition, dict):
579             raise TypeError("Bad condition type %s" % type(condition))
580 
581         if not isinstance(mark, str):
582             raise TypeError("Bad mark type %s" % type(mark))
583 
584         return {"count": await self.db.gc_mark(mark, condition)}
585 
586     @permissions(DB_ADMIN_PERM)
587     async def handle_gc_sweep(self, request):
588         mark = request["mark"]
589 
590         if not isinstance(mark, str):
591             raise TypeError("Bad mark type %s" % type(mark))
592 
593         current_mark = await self.db.get_current_gc_mark()
594 
595         if not current_mark or mark != current_mark:
596             raise bb.asyncrpc.InvokeError(
597                 f"'{mark}' is not the current mark. Refusing to sweep"
598             )
599 
600         count = await self.db.gc_sweep()
601 
602         return {"count": count}
603 
604     @permissions(DB_ADMIN_PERM)
605     async def handle_gc_status(self, request):
606         (keep_rows, remove_rows, current_mark) = await self.db.gc_status()
607         return {
608             "keep": keep_rows,
609             "remove": remove_rows,
610             "mark": current_mark,
611         }
612 
613     @permissions(DB_ADMIN_PERM)
614     async def handle_clean_unused(self, request):
615         max_age = request["max_age_seconds"]
616         oldest = datetime.now() - timedelta(seconds=-max_age)
617         return {"count": await self.db.clean_unused(oldest)}
618 
619     @permissions(DB_ADMIN_PERM)
620     async def handle_get_db_usage(self, request):
621         return {"usage": await self.db.get_usage()}
622 
623     @permissions(DB_ADMIN_PERM)
624     async def handle_get_db_query_columns(self, request):
625         return {"columns": await self.db.get_query_columns()}
626 
627     # The authentication API is always allowed
628     async def handle_auth(self, request):
629         username = str(request["username"])
630         token = str(request["token"])
631 
632         async def fail_auth():
633             nonlocal username
634             # Rate limit bad login attempts
635             await asyncio.sleep(1)
636             raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}")
637 
638         user, db_token = await self.db.lookup_user_token(username)
639 
640         if not user or not db_token:
641             await fail_auth()
642 
643         try:
644             algo, salt, _ = db_token.split(":")
645         except ValueError:
646             await fail_auth()
647 
648         if hash_token(algo, salt, token) != db_token:
649             await fail_auth()
650 
651         self.user = user
652 
653         self.logger.info("Authenticated as %s", username)
654 
655         return {
656             "result": True,
657             "username": self.user.username,
658             "permissions": sorted(list(self.user.permissions)),
659         }
660 
661     @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
662     async def handle_refresh_token(self, request):
663         username = str(request["username"])
664 
665         token = await new_token()
666 
667         updated = await self.db.set_user_token(
668             username,
669             hash_token(TOKEN_ALGORITHM, new_salt(), token),
670         )
671         if not updated:
672             self.raise_no_user_error(username)
673 
674         return {"username": username, "token": token}
675 
676     def get_perm_arg(self, arg):
677         if not isinstance(arg, list):
678             raise bb.asyncrpc.InvokeError("Unexpected type for permissions")
679 
680         arg = set(arg)
681         try:
682             arg.remove(NONE_PERM)
683         except KeyError:
684             pass
685 
686         unknown_perms = arg - ALL_PERMISSIONS
687         if unknown_perms:
688             raise bb.asyncrpc.InvokeError(
689                 "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms)))
690             )
691 
692         return sorted(list(arg))
693 
694     def return_perms(self, permissions):
695         if ALL_PERM in permissions:
696             return sorted(list(ALL_PERMISSIONS))
697         return sorted(list(permissions))
698 
699     @permissions(USER_ADMIN_PERM, allow_anon=False)
700     async def handle_set_perms(self, request):
701         username = str(request["username"])
702         permissions = self.get_perm_arg(request["permissions"])
703 
704         if not await self.db.set_user_perms(username, permissions):
705             self.raise_no_user_error(username)
706 
707         return {
708             "username": username,
709             "permissions": self.return_perms(permissions),
710         }
711 
712     @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
713     async def handle_get_user(self, request):
714         username = str(request["username"])
715 
716         user = await self.db.lookup_user(username)
717         if user is None:
718             return None
719 
720         return {
721             "username": user.username,
722             "permissions": self.return_perms(user.permissions),
723         }
724 
725     @permissions(USER_ADMIN_PERM, allow_anon=False)
726     async def handle_get_all_users(self, request):
727         users = await self.db.get_all_users()
728         return {
729             "users": [
730                 {
731                     "username": u.username,
732                     "permissions": self.return_perms(u.permissions),
733                 }
734                 for u in users
735             ]
736         }
737 
738     @permissions(USER_ADMIN_PERM, allow_anon=False)
739     async def handle_new_user(self, request):
740         username = str(request["username"])
741         permissions = self.get_perm_arg(request["permissions"])
742 
743         token = await new_token()
744 
745         inserted = await self.db.new_user(
746             username,
747             permissions,
748             hash_token(TOKEN_ALGORITHM, new_salt(), token),
749         )
750         if not inserted:
751             raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'")
752 
753         return {
754             "username": username,
755             "permissions": self.return_perms(permissions),
756             "token": token,
757         }
758 
759     @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False)
760     async def handle_delete_user(self, request):
761         username = str(request["username"])
762 
763         if not await self.db.delete_user(username):
764             self.raise_no_user_error(username)
765 
766         return {"username": username}
767 
768     @permissions(USER_ADMIN_PERM, allow_anon=False)
769     async def handle_become_user(self, request):
770         username = str(request["username"])
771 
772         user = await self.db.lookup_user(username)
773         if user is None:
774             raise bb.asyncrpc.InvokeError(f"User {username} doesn't exist")
775 
776         self.user = user
777 
778         self.logger.info("Became user %s", username)
779 
780         return {
781             "username": self.user.username,
782             "permissions": self.return_perms(self.user.permissions),
783         }
784 
785 
786 class Server(bb.asyncrpc.AsyncServer):
787     def __init__(
788         self,
789         db_engine,
790         upstream=None,
791         read_only=False,
792         anon_perms=DEFAULT_ANON_PERMS,
793         admin_username=None,
794         admin_password=None,
795     ):
796         if upstream and read_only:
797             raise bb.asyncrpc.ServerError(
798                 "Read-only hashserv cannot pull from an upstream server"
799             )
800 
801         disallowed_perms = set(anon_perms) - set(
802             [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM]
803         )
804 
805         if disallowed_perms:
806             raise bb.asyncrpc.ServerError(
807                 f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users"
808             )
809 
810         super().__init__(logger)
811 
812         self.request_stats = Stats()
813         self.db_engine = db_engine
814         self.upstream = upstream
815         self.read_only = read_only
816         self.backfill_queue = None
817         self.anon_perms = set(anon_perms)
818         self.admin_username = admin_username
819         self.admin_password = admin_password
820 
821         self.logger.info(
822             "Anonymous user permissions are: %s", ", ".join(self.anon_perms)
823         )
824 
825     def accept_client(self, socket):
826         return ServerClient(socket, self)
827 
828     async def create_admin_user(self):
829         admin_permissions = (ALL_PERM,)
830         async with self.db_engine.connect(self.logger) as db:
831             added = await db.new_user(
832                 self.admin_username,
833                 admin_permissions,
834                 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
835             )
836             if added:
837                 self.logger.info("Created admin user '%s'", self.admin_username)
838             else:
839                 await db.set_user_perms(
840                     self.admin_username,
841                     admin_permissions,
842                 )
843                 await db.set_user_token(
844                     self.admin_username,
845                     hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password),
846                 )
847                 self.logger.info("Admin user '%s' updated", self.admin_username)
848 
849     async def backfill_worker_task(self):
850         async with await create_async_client(
851             self.upstream
852         ) as client, self.db_engine.connect(self.logger) as db:
853             while True:
854                 item = await self.backfill_queue.get()
855                 if item is None:
856                     self.backfill_queue.task_done()
857                     break
858 
859                 method, taskhash = item
860                 d = await client.get_taskhash(method, taskhash)
861                 if d is not None:
862                     await db.insert_unihash(d["method"], d["taskhash"], d["unihash"])
863                 self.backfill_queue.task_done()
864 
865     def start(self):
866         tasks = super().start()
867         if self.upstream:
868             self.backfill_queue = asyncio.Queue()
869             tasks += [self.backfill_worker_task()]
870 
871         self.loop.run_until_complete(self.db_engine.create())
872 
873         if self.admin_username:
874             self.loop.run_until_complete(self.create_admin_user())
875 
876         return tasks
877 
878     async def stop(self):
879         if self.backfill_queue is not None:
880             await self.backfill_queue.put(None)
881         await super().stop()
882