1# Copyright (C) 2019 Garmin Ltd. 2# 3# SPDX-License-Identifier: GPL-2.0-only 4# 5 6from datetime import datetime, timedelta 7import asyncio 8import logging 9import math 10import time 11import os 12import base64 13import hashlib 14from . import create_async_client 15import bb.asyncrpc 16 17logger = logging.getLogger("hashserv.server") 18 19 20# This permission only exists to match nothing 21NONE_PERM = "@none" 22 23READ_PERM = "@read" 24REPORT_PERM = "@report" 25DB_ADMIN_PERM = "@db-admin" 26USER_ADMIN_PERM = "@user-admin" 27ALL_PERM = "@all" 28 29ALL_PERMISSIONS = { 30 READ_PERM, 31 REPORT_PERM, 32 DB_ADMIN_PERM, 33 USER_ADMIN_PERM, 34 ALL_PERM, 35} 36 37DEFAULT_ANON_PERMS = ( 38 READ_PERM, 39 REPORT_PERM, 40 DB_ADMIN_PERM, 41) 42 43TOKEN_ALGORITHM = "sha256" 44 45# 48 bytes of random data will result in 64 characters when base64 46# encoded. This number also ensures that the base64 encoding won't have any 47# trailing '=' characters. 48TOKEN_SIZE = 48 49 50SALT_SIZE = 8 51 52 53class Measurement(object): 54 def __init__(self, sample): 55 self.sample = sample 56 57 def start(self): 58 self.start_time = time.perf_counter() 59 60 def end(self): 61 self.sample.add(time.perf_counter() - self.start_time) 62 63 def __enter__(self): 64 self.start() 65 return self 66 67 def __exit__(self, *args, **kwargs): 68 self.end() 69 70 71class Sample(object): 72 def __init__(self, stats): 73 self.stats = stats 74 self.num_samples = 0 75 self.elapsed = 0 76 77 def measure(self): 78 return Measurement(self) 79 80 def __enter__(self): 81 return self 82 83 def __exit__(self, *args, **kwargs): 84 self.end() 85 86 def add(self, elapsed): 87 self.num_samples += 1 88 self.elapsed += elapsed 89 90 def end(self): 91 if self.num_samples: 92 self.stats.add(self.elapsed) 93 self.num_samples = 0 94 self.elapsed = 0 95 96 97class Stats(object): 98 def __init__(self): 99 self.reset() 100 101 def reset(self): 102 self.num = 0 103 self.total_time = 0 104 self.max_time = 0 105 self.m = 0 106 self.s = 0 107 self.current_elapsed = None 108 109 def add(self, elapsed): 110 self.num += 1 111 if self.num == 1: 112 self.m = elapsed 113 self.s = 0 114 else: 115 last_m = self.m 116 self.m = last_m + (elapsed - last_m) / self.num 117 self.s = self.s + (elapsed - last_m) * (elapsed - self.m) 118 119 self.total_time += elapsed 120 121 if self.max_time < elapsed: 122 self.max_time = elapsed 123 124 def start_sample(self): 125 return Sample(self) 126 127 @property 128 def average(self): 129 if self.num == 0: 130 return 0 131 return self.total_time / self.num 132 133 @property 134 def stdev(self): 135 if self.num <= 1: 136 return 0 137 return math.sqrt(self.s / (self.num - 1)) 138 139 def todict(self): 140 return { 141 k: getattr(self, k) 142 for k in ("num", "total_time", "max_time", "average", "stdev") 143 } 144 145 146token_refresh_semaphore = asyncio.Lock() 147 148 149async def new_token(): 150 # Prevent malicious users from using this API to deduce the entropy 151 # pool on the server and thus be able to guess a token. *All* token 152 # refresh requests lock the same global semaphore and then sleep for a 153 # short time. The effectively rate limits the total number of requests 154 # than can be made across all clients to 10/second, which should be enough 155 # since you have to be an authenticated users to make the request in the 156 # first place 157 async with token_refresh_semaphore: 158 await asyncio.sleep(0.1) 159 raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK) 160 161 return base64.b64encode(raw, b"._").decode("utf-8") 162 163 164def new_salt(): 165 return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex() 166 167 168def hash_token(algo, salt, token): 169 h = hashlib.new(algo) 170 h.update(salt.encode("utf-8")) 171 h.update(token.encode("utf-8")) 172 return ":".join([algo, salt, h.hexdigest()]) 173 174 175def permissions(*permissions, allow_anon=True, allow_self_service=False): 176 """ 177 Function decorator that can be used to decorate an RPC function call and 178 check that the current users permissions match the require permissions. 179 180 If allow_anon is True, the user will also be allowed to make the RPC call 181 if the anonymous user permissions match the permissions. 182 183 If allow_self_service is True, and the "username" property in the request 184 is the currently logged in user, or not specified, the user will also be 185 allowed to make the request. This allows users to access normal privileged 186 API, as long as they are only modifying their own user properties (e.g. 187 users can be allowed to reset their own token without @user-admin 188 permissions, but not the token for any other user. 189 """ 190 191 def wrapper(func): 192 async def wrap(self, request): 193 if allow_self_service and self.user is not None: 194 username = request.get("username", self.user.username) 195 if username == self.user.username: 196 request["username"] = self.user.username 197 return await func(self, request) 198 199 if not self.user_has_permissions(*permissions, allow_anon=allow_anon): 200 if not self.user: 201 username = "Anonymous user" 202 user_perms = self.server.anon_perms 203 else: 204 username = self.user.username 205 user_perms = self.user.permissions 206 207 self.logger.info( 208 "User %s with permissions %r denied from calling %s. Missing permissions(s) %r", 209 username, 210 ", ".join(user_perms), 211 func.__name__, 212 ", ".join(permissions), 213 ) 214 raise bb.asyncrpc.InvokeError( 215 f"{username} is not allowed to access permissions(s) {', '.join(permissions)}" 216 ) 217 218 return await func(self, request) 219 220 return wrap 221 222 return wrapper 223 224 225class ServerClient(bb.asyncrpc.AsyncServerConnection): 226 def __init__(self, socket, server): 227 super().__init__(socket, "OEHASHEQUIV", server.logger) 228 self.server = server 229 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 230 self.user = None 231 232 self.handlers.update( 233 { 234 "get": self.handle_get, 235 "get-outhash": self.handle_get_outhash, 236 "get-stream": self.handle_get_stream, 237 "exists-stream": self.handle_exists_stream, 238 "get-stats": self.handle_get_stats, 239 "get-db-usage": self.handle_get_db_usage, 240 "get-db-query-columns": self.handle_get_db_query_columns, 241 # Not always read-only, but internally checks if the server is 242 # read-only 243 "report": self.handle_report, 244 "auth": self.handle_auth, 245 "get-user": self.handle_get_user, 246 "get-all-users": self.handle_get_all_users, 247 "become-user": self.handle_become_user, 248 } 249 ) 250 251 if not self.server.read_only: 252 self.handlers.update( 253 { 254 "report-equiv": self.handle_equivreport, 255 "reset-stats": self.handle_reset_stats, 256 "backfill-wait": self.handle_backfill_wait, 257 "remove": self.handle_remove, 258 "gc-mark": self.handle_gc_mark, 259 "gc-sweep": self.handle_gc_sweep, 260 "gc-status": self.handle_gc_status, 261 "clean-unused": self.handle_clean_unused, 262 "refresh-token": self.handle_refresh_token, 263 "set-user-perms": self.handle_set_perms, 264 "new-user": self.handle_new_user, 265 "delete-user": self.handle_delete_user, 266 } 267 ) 268 269 def raise_no_user_error(self, username): 270 raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists") 271 272 def user_has_permissions(self, *permissions, allow_anon=True): 273 permissions = set(permissions) 274 if allow_anon: 275 if ALL_PERM in self.server.anon_perms: 276 return True 277 278 if not permissions - self.server.anon_perms: 279 return True 280 281 if self.user is None: 282 return False 283 284 if ALL_PERM in self.user.permissions: 285 return True 286 287 if not permissions - self.user.permissions: 288 return True 289 290 return False 291 292 def validate_proto_version(self): 293 return self.proto_version > (1, 0) and self.proto_version <= (1, 1) 294 295 async def process_requests(self): 296 async with self.server.db_engine.connect(self.logger) as db: 297 self.db = db 298 if self.server.upstream is not None: 299 self.upstream_client = await create_async_client(self.server.upstream) 300 else: 301 self.upstream_client = None 302 303 try: 304 await super().process_requests() 305 finally: 306 if self.upstream_client is not None: 307 await self.upstream_client.close() 308 309 async def dispatch_message(self, msg): 310 for k in self.handlers.keys(): 311 if k in msg: 312 self.logger.debug("Handling %s" % k) 313 if "stream" in k: 314 return await self.handlers[k](msg[k]) 315 else: 316 with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): 317 return await self.handlers[k](msg[k]) 318 319 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 320 321 @permissions(READ_PERM) 322 async def handle_get(self, request): 323 method = request["method"] 324 taskhash = request["taskhash"] 325 fetch_all = request.get("all", False) 326 327 return await self.get_unihash(method, taskhash, fetch_all) 328 329 async def get_unihash(self, method, taskhash, fetch_all=False): 330 d = None 331 332 if fetch_all: 333 row = await self.db.get_unihash_by_taskhash_full(method, taskhash) 334 if row is not None: 335 d = {k: row[k] for k in row.keys()} 336 elif self.upstream_client is not None: 337 d = await self.upstream_client.get_taskhash(method, taskhash, True) 338 await self.update_unified(d) 339 else: 340 row = await self.db.get_equivalent(method, taskhash) 341 342 if row is not None: 343 d = {k: row[k] for k in row.keys()} 344 elif self.upstream_client is not None: 345 d = await self.upstream_client.get_taskhash(method, taskhash) 346 await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) 347 348 return d 349 350 @permissions(READ_PERM) 351 async def handle_get_outhash(self, request): 352 method = request["method"] 353 outhash = request["outhash"] 354 taskhash = request["taskhash"] 355 with_unihash = request.get("with_unihash", True) 356 357 return await self.get_outhash(method, outhash, taskhash, with_unihash) 358 359 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 360 d = None 361 if with_unihash: 362 row = await self.db.get_unihash_by_outhash(method, outhash) 363 else: 364 row = await self.db.get_outhash(method, outhash) 365 366 if row is not None: 367 d = {k: row[k] for k in row.keys()} 368 elif self.upstream_client is not None: 369 d = await self.upstream_client.get_outhash(method, outhash, taskhash) 370 await self.update_unified(d) 371 372 return d 373 374 async def update_unified(self, data): 375 if data is None: 376 return 377 378 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) 379 await self.db.insert_outhash(data) 380 381 async def _stream_handler(self, handler): 382 await self.socket.send_message("ok") 383 384 while True: 385 upstream = None 386 387 l = await self.socket.recv() 388 if not l: 389 break 390 391 try: 392 # This inner loop is very sensitive and must be as fast as 393 # possible (which is why the request sample is handled manually 394 # instead of using 'with', and also why logging statements are 395 # commented out. 396 self.request_sample = self.server.request_stats.start_sample() 397 request_measure = self.request_sample.measure() 398 request_measure.start() 399 400 if l == "END": 401 break 402 403 msg = await handler(l) 404 await self.socket.send(msg) 405 finally: 406 request_measure.end() 407 self.request_sample.end() 408 409 await self.socket.send("ok") 410 return self.NO_RESPONSE 411 412 @permissions(READ_PERM) 413 async def handle_get_stream(self, request): 414 async def handler(l): 415 (method, taskhash) = l.split() 416 # self.logger.debug('Looking up %s %s' % (method, taskhash)) 417 row = await self.db.get_equivalent(method, taskhash) 418 419 if row is not None: 420 # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 421 return row["unihash"] 422 423 if self.upstream_client is not None: 424 upstream = await self.upstream_client.get_unihash(method, taskhash) 425 if upstream: 426 await self.server.backfill_queue.put((method, taskhash)) 427 return upstream 428 429 return "" 430 431 return await self._stream_handler(handler) 432 433 @permissions(READ_PERM) 434 async def handle_exists_stream(self, request): 435 async def handler(l): 436 if await self.db.unihash_exists(l): 437 return "true" 438 439 if self.upstream_client is not None: 440 if await self.upstream_client.unihash_exists(l): 441 return "true" 442 443 return "false" 444 445 return await self._stream_handler(handler) 446 447 async def report_readonly(self, data): 448 method = data["method"] 449 outhash = data["outhash"] 450 taskhash = data["taskhash"] 451 452 info = await self.get_outhash(method, outhash, taskhash) 453 if info: 454 unihash = info["unihash"] 455 else: 456 unihash = data["unihash"] 457 458 return { 459 "taskhash": taskhash, 460 "method": method, 461 "unihash": unihash, 462 } 463 464 # Since this can be called either read only or to report, the check to 465 # report is made inside the function 466 @permissions(READ_PERM) 467 async def handle_report(self, data): 468 if self.server.read_only or not self.user_has_permissions(REPORT_PERM): 469 return await self.report_readonly(data) 470 471 outhash_data = { 472 "method": data["method"], 473 "outhash": data["outhash"], 474 "taskhash": data["taskhash"], 475 "created": datetime.now(), 476 } 477 478 for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"): 479 if k in data: 480 outhash_data[k] = data[k] 481 482 if self.user: 483 outhash_data["owner"] = self.user.username 484 485 # Insert the new entry, unless it already exists 486 if await self.db.insert_outhash(outhash_data): 487 # If this row is new, check if it is equivalent to another 488 # output hash 489 row = await self.db.get_equivalent_for_outhash( 490 data["method"], data["outhash"], data["taskhash"] 491 ) 492 493 if row is not None: 494 # A matching output hash was found. Set our taskhash to the 495 # same unihash since they are equivalent 496 unihash = row["unihash"] 497 else: 498 # No matching output hash was found. This is probably the 499 # first outhash to be added. 500 unihash = data["unihash"] 501 502 # Query upstream to see if it has a unihash we can use 503 if self.upstream_client is not None: 504 upstream_data = await self.upstream_client.get_outhash( 505 data["method"], data["outhash"], data["taskhash"] 506 ) 507 if upstream_data is not None: 508 unihash = upstream_data["unihash"] 509 510 await self.db.insert_unihash(data["method"], data["taskhash"], unihash) 511 512 unihash_data = await self.get_unihash(data["method"], data["taskhash"]) 513 if unihash_data is not None: 514 unihash = unihash_data["unihash"] 515 else: 516 unihash = data["unihash"] 517 518 return { 519 "taskhash": data["taskhash"], 520 "method": data["method"], 521 "unihash": unihash, 522 } 523 524 @permissions(READ_PERM, REPORT_PERM) 525 async def handle_equivreport(self, data): 526 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) 527 528 # Fetch the unihash that will be reported for the taskhash. If the 529 # unihash matches, it means this row was inserted (or the mapping 530 # was already valid) 531 row = await self.db.get_equivalent(data["method"], data["taskhash"]) 532 533 if row["unihash"] == data["unihash"]: 534 self.logger.info( 535 "Adding taskhash equivalence for %s with unihash %s", 536 data["taskhash"], 537 row["unihash"], 538 ) 539 540 return {k: row[k] for k in ("taskhash", "method", "unihash")} 541 542 @permissions(READ_PERM) 543 async def handle_get_stats(self, request): 544 return { 545 "requests": self.server.request_stats.todict(), 546 } 547 548 @permissions(DB_ADMIN_PERM) 549 async def handle_reset_stats(self, request): 550 d = { 551 "requests": self.server.request_stats.todict(), 552 } 553 554 self.server.request_stats.reset() 555 return d 556 557 @permissions(READ_PERM) 558 async def handle_backfill_wait(self, request): 559 d = { 560 "tasks": self.server.backfill_queue.qsize(), 561 } 562 await self.server.backfill_queue.join() 563 return d 564 565 @permissions(DB_ADMIN_PERM) 566 async def handle_remove(self, request): 567 condition = request["where"] 568 if not isinstance(condition, dict): 569 raise TypeError("Bad condition type %s" % type(condition)) 570 571 return {"count": await self.db.remove(condition)} 572 573 @permissions(DB_ADMIN_PERM) 574 async def handle_gc_mark(self, request): 575 condition = request["where"] 576 mark = request["mark"] 577 578 if not isinstance(condition, dict): 579 raise TypeError("Bad condition type %s" % type(condition)) 580 581 if not isinstance(mark, str): 582 raise TypeError("Bad mark type %s" % type(mark)) 583 584 return {"count": await self.db.gc_mark(mark, condition)} 585 586 @permissions(DB_ADMIN_PERM) 587 async def handle_gc_sweep(self, request): 588 mark = request["mark"] 589 590 if not isinstance(mark, str): 591 raise TypeError("Bad mark type %s" % type(mark)) 592 593 current_mark = await self.db.get_current_gc_mark() 594 595 if not current_mark or mark != current_mark: 596 raise bb.asyncrpc.InvokeError( 597 f"'{mark}' is not the current mark. Refusing to sweep" 598 ) 599 600 count = await self.db.gc_sweep() 601 602 return {"count": count} 603 604 @permissions(DB_ADMIN_PERM) 605 async def handle_gc_status(self, request): 606 (keep_rows, remove_rows, current_mark) = await self.db.gc_status() 607 return { 608 "keep": keep_rows, 609 "remove": remove_rows, 610 "mark": current_mark, 611 } 612 613 @permissions(DB_ADMIN_PERM) 614 async def handle_clean_unused(self, request): 615 max_age = request["max_age_seconds"] 616 oldest = datetime.now() - timedelta(seconds=-max_age) 617 return {"count": await self.db.clean_unused(oldest)} 618 619 @permissions(DB_ADMIN_PERM) 620 async def handle_get_db_usage(self, request): 621 return {"usage": await self.db.get_usage()} 622 623 @permissions(DB_ADMIN_PERM) 624 async def handle_get_db_query_columns(self, request): 625 return {"columns": await self.db.get_query_columns()} 626 627 # The authentication API is always allowed 628 async def handle_auth(self, request): 629 username = str(request["username"]) 630 token = str(request["token"]) 631 632 async def fail_auth(): 633 nonlocal username 634 # Rate limit bad login attempts 635 await asyncio.sleep(1) 636 raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}") 637 638 user, db_token = await self.db.lookup_user_token(username) 639 640 if not user or not db_token: 641 await fail_auth() 642 643 try: 644 algo, salt, _ = db_token.split(":") 645 except ValueError: 646 await fail_auth() 647 648 if hash_token(algo, salt, token) != db_token: 649 await fail_auth() 650 651 self.user = user 652 653 self.logger.info("Authenticated as %s", username) 654 655 return { 656 "result": True, 657 "username": self.user.username, 658 "permissions": sorted(list(self.user.permissions)), 659 } 660 661 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False) 662 async def handle_refresh_token(self, request): 663 username = str(request["username"]) 664 665 token = await new_token() 666 667 updated = await self.db.set_user_token( 668 username, 669 hash_token(TOKEN_ALGORITHM, new_salt(), token), 670 ) 671 if not updated: 672 self.raise_no_user_error(username) 673 674 return {"username": username, "token": token} 675 676 def get_perm_arg(self, arg): 677 if not isinstance(arg, list): 678 raise bb.asyncrpc.InvokeError("Unexpected type for permissions") 679 680 arg = set(arg) 681 try: 682 arg.remove(NONE_PERM) 683 except KeyError: 684 pass 685 686 unknown_perms = arg - ALL_PERMISSIONS 687 if unknown_perms: 688 raise bb.asyncrpc.InvokeError( 689 "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms))) 690 ) 691 692 return sorted(list(arg)) 693 694 def return_perms(self, permissions): 695 if ALL_PERM in permissions: 696 return sorted(list(ALL_PERMISSIONS)) 697 return sorted(list(permissions)) 698 699 @permissions(USER_ADMIN_PERM, allow_anon=False) 700 async def handle_set_perms(self, request): 701 username = str(request["username"]) 702 permissions = self.get_perm_arg(request["permissions"]) 703 704 if not await self.db.set_user_perms(username, permissions): 705 self.raise_no_user_error(username) 706 707 return { 708 "username": username, 709 "permissions": self.return_perms(permissions), 710 } 711 712 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False) 713 async def handle_get_user(self, request): 714 username = str(request["username"]) 715 716 user = await self.db.lookup_user(username) 717 if user is None: 718 return None 719 720 return { 721 "username": user.username, 722 "permissions": self.return_perms(user.permissions), 723 } 724 725 @permissions(USER_ADMIN_PERM, allow_anon=False) 726 async def handle_get_all_users(self, request): 727 users = await self.db.get_all_users() 728 return { 729 "users": [ 730 { 731 "username": u.username, 732 "permissions": self.return_perms(u.permissions), 733 } 734 for u in users 735 ] 736 } 737 738 @permissions(USER_ADMIN_PERM, allow_anon=False) 739 async def handle_new_user(self, request): 740 username = str(request["username"]) 741 permissions = self.get_perm_arg(request["permissions"]) 742 743 token = await new_token() 744 745 inserted = await self.db.new_user( 746 username, 747 permissions, 748 hash_token(TOKEN_ALGORITHM, new_salt(), token), 749 ) 750 if not inserted: 751 raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'") 752 753 return { 754 "username": username, 755 "permissions": self.return_perms(permissions), 756 "token": token, 757 } 758 759 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False) 760 async def handle_delete_user(self, request): 761 username = str(request["username"]) 762 763 if not await self.db.delete_user(username): 764 self.raise_no_user_error(username) 765 766 return {"username": username} 767 768 @permissions(USER_ADMIN_PERM, allow_anon=False) 769 async def handle_become_user(self, request): 770 username = str(request["username"]) 771 772 user = await self.db.lookup_user(username) 773 if user is None: 774 raise bb.asyncrpc.InvokeError(f"User {username} doesn't exist") 775 776 self.user = user 777 778 self.logger.info("Became user %s", username) 779 780 return { 781 "username": self.user.username, 782 "permissions": self.return_perms(self.user.permissions), 783 } 784 785 786class Server(bb.asyncrpc.AsyncServer): 787 def __init__( 788 self, 789 db_engine, 790 upstream=None, 791 read_only=False, 792 anon_perms=DEFAULT_ANON_PERMS, 793 admin_username=None, 794 admin_password=None, 795 ): 796 if upstream and read_only: 797 raise bb.asyncrpc.ServerError( 798 "Read-only hashserv cannot pull from an upstream server" 799 ) 800 801 disallowed_perms = set(anon_perms) - set( 802 [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM] 803 ) 804 805 if disallowed_perms: 806 raise bb.asyncrpc.ServerError( 807 f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users" 808 ) 809 810 super().__init__(logger) 811 812 self.request_stats = Stats() 813 self.db_engine = db_engine 814 self.upstream = upstream 815 self.read_only = read_only 816 self.backfill_queue = None 817 self.anon_perms = set(anon_perms) 818 self.admin_username = admin_username 819 self.admin_password = admin_password 820 821 self.logger.info( 822 "Anonymous user permissions are: %s", ", ".join(self.anon_perms) 823 ) 824 825 def accept_client(self, socket): 826 return ServerClient(socket, self) 827 828 async def create_admin_user(self): 829 admin_permissions = (ALL_PERM,) 830 async with self.db_engine.connect(self.logger) as db: 831 added = await db.new_user( 832 self.admin_username, 833 admin_permissions, 834 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password), 835 ) 836 if added: 837 self.logger.info("Created admin user '%s'", self.admin_username) 838 else: 839 await db.set_user_perms( 840 self.admin_username, 841 admin_permissions, 842 ) 843 await db.set_user_token( 844 self.admin_username, 845 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password), 846 ) 847 self.logger.info("Admin user '%s' updated", self.admin_username) 848 849 async def backfill_worker_task(self): 850 async with await create_async_client( 851 self.upstream 852 ) as client, self.db_engine.connect(self.logger) as db: 853 while True: 854 item = await self.backfill_queue.get() 855 if item is None: 856 self.backfill_queue.task_done() 857 break 858 859 method, taskhash = item 860 d = await client.get_taskhash(method, taskhash) 861 if d is not None: 862 await db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) 863 self.backfill_queue.task_done() 864 865 def start(self): 866 tasks = super().start() 867 if self.upstream: 868 self.backfill_queue = asyncio.Queue() 869 tasks += [self.backfill_worker_task()] 870 871 self.loop.run_until_complete(self.db_engine.create()) 872 873 if self.admin_username: 874 self.loop.run_until_complete(self.create_admin_user()) 875 876 return tasks 877 878 async def stop(self): 879 if self.backfill_queue is not None: 880 await self.backfill_queue.put(None) 881 await super().stop() 882