1# Copyright (C) 2019 Garmin Ltd. 2# 3# SPDX-License-Identifier: GPL-2.0-only 4# 5 6from datetime import datetime, timedelta 7import asyncio 8import logging 9import math 10import time 11import os 12import base64 13import json 14import hashlib 15from . import create_async_client 16import bb.asyncrpc 17 18logger = logging.getLogger("hashserv.server") 19 20 21# This permission only exists to match nothing 22NONE_PERM = "@none" 23 24READ_PERM = "@read" 25REPORT_PERM = "@report" 26DB_ADMIN_PERM = "@db-admin" 27USER_ADMIN_PERM = "@user-admin" 28ALL_PERM = "@all" 29 30ALL_PERMISSIONS = { 31 READ_PERM, 32 REPORT_PERM, 33 DB_ADMIN_PERM, 34 USER_ADMIN_PERM, 35 ALL_PERM, 36} 37 38DEFAULT_ANON_PERMS = ( 39 READ_PERM, 40 REPORT_PERM, 41 DB_ADMIN_PERM, 42) 43 44TOKEN_ALGORITHM = "sha256" 45 46# 48 bytes of random data will result in 64 characters when base64 47# encoded. This number also ensures that the base64 encoding won't have any 48# trailing '=' characters. 49TOKEN_SIZE = 48 50 51SALT_SIZE = 8 52 53 54class Measurement(object): 55 def __init__(self, sample): 56 self.sample = sample 57 58 def start(self): 59 self.start_time = time.perf_counter() 60 61 def end(self): 62 self.sample.add(time.perf_counter() - self.start_time) 63 64 def __enter__(self): 65 self.start() 66 return self 67 68 def __exit__(self, *args, **kwargs): 69 self.end() 70 71 72class Sample(object): 73 def __init__(self, stats): 74 self.stats = stats 75 self.num_samples = 0 76 self.elapsed = 0 77 78 def measure(self): 79 return Measurement(self) 80 81 def __enter__(self): 82 return self 83 84 def __exit__(self, *args, **kwargs): 85 self.end() 86 87 def add(self, elapsed): 88 self.num_samples += 1 89 self.elapsed += elapsed 90 91 def end(self): 92 if self.num_samples: 93 self.stats.add(self.elapsed) 94 self.num_samples = 0 95 self.elapsed = 0 96 97 98class Stats(object): 99 def __init__(self): 100 self.reset() 101 102 def reset(self): 103 self.num = 0 104 self.total_time = 0 105 self.max_time = 0 106 self.m = 0 107 self.s = 0 108 self.current_elapsed = None 109 110 def add(self, elapsed): 111 self.num += 1 112 if self.num == 1: 113 self.m = elapsed 114 self.s = 0 115 else: 116 last_m = self.m 117 self.m = last_m + (elapsed - last_m) / self.num 118 self.s = self.s + (elapsed - last_m) * (elapsed - self.m) 119 120 self.total_time += elapsed 121 122 if self.max_time < elapsed: 123 self.max_time = elapsed 124 125 def start_sample(self): 126 return Sample(self) 127 128 @property 129 def average(self): 130 if self.num == 0: 131 return 0 132 return self.total_time / self.num 133 134 @property 135 def stdev(self): 136 if self.num <= 1: 137 return 0 138 return math.sqrt(self.s / (self.num - 1)) 139 140 def todict(self): 141 return { 142 k: getattr(self, k) 143 for k in ("num", "total_time", "max_time", "average", "stdev") 144 } 145 146 147token_refresh_semaphore = asyncio.Lock() 148 149 150async def new_token(): 151 # Prevent malicious users from using this API to deduce the entropy 152 # pool on the server and thus be able to guess a token. *All* token 153 # refresh requests lock the same global semaphore and then sleep for a 154 # short time. The effectively rate limits the total number of requests 155 # than can be made across all clients to 10/second, which should be enough 156 # since you have to be an authenticated users to make the request in the 157 # first place 158 async with token_refresh_semaphore: 159 await asyncio.sleep(0.1) 160 raw = os.getrandom(TOKEN_SIZE, os.GRND_NONBLOCK) 161 162 return base64.b64encode(raw, b"._").decode("utf-8") 163 164 165def new_salt(): 166 return os.getrandom(SALT_SIZE, os.GRND_NONBLOCK).hex() 167 168 169def hash_token(algo, salt, token): 170 h = hashlib.new(algo) 171 h.update(salt.encode("utf-8")) 172 h.update(token.encode("utf-8")) 173 return ":".join([algo, salt, h.hexdigest()]) 174 175 176def permissions(*permissions, allow_anon=True, allow_self_service=False): 177 """ 178 Function decorator that can be used to decorate an RPC function call and 179 check that the current users permissions match the require permissions. 180 181 If allow_anon is True, the user will also be allowed to make the RPC call 182 if the anonymous user permissions match the permissions. 183 184 If allow_self_service is True, and the "username" property in the request 185 is the currently logged in user, or not specified, the user will also be 186 allowed to make the request. This allows users to access normal privileged 187 API, as long as they are only modifying their own user properties (e.g. 188 users can be allowed to reset their own token without @user-admin 189 permissions, but not the token for any other user. 190 """ 191 192 def wrapper(func): 193 async def wrap(self, request): 194 if allow_self_service and self.user is not None: 195 username = request.get("username", self.user.username) 196 if username == self.user.username: 197 request["username"] = self.user.username 198 return await func(self, request) 199 200 if not self.user_has_permissions(*permissions, allow_anon=allow_anon): 201 if not self.user: 202 username = "Anonymous user" 203 user_perms = self.server.anon_perms 204 else: 205 username = self.user.username 206 user_perms = self.user.permissions 207 208 self.logger.info( 209 "User %s with permissions %r denied from calling %s. Missing permissions(s) %r", 210 username, 211 ", ".join(user_perms), 212 func.__name__, 213 ", ".join(permissions), 214 ) 215 raise bb.asyncrpc.InvokeError( 216 f"{username} is not allowed to access permissions(s) {', '.join(permissions)}" 217 ) 218 219 return await func(self, request) 220 221 return wrap 222 223 return wrapper 224 225 226class ServerClient(bb.asyncrpc.AsyncServerConnection): 227 def __init__(self, socket, server): 228 super().__init__(socket, "OEHASHEQUIV", server.logger) 229 self.server = server 230 self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 231 self.user = None 232 233 self.handlers.update( 234 { 235 "get": self.handle_get, 236 "get-outhash": self.handle_get_outhash, 237 "get-stream": self.handle_get_stream, 238 "exists-stream": self.handle_exists_stream, 239 "get-stats": self.handle_get_stats, 240 "get-db-usage": self.handle_get_db_usage, 241 "get-db-query-columns": self.handle_get_db_query_columns, 242 # Not always read-only, but internally checks if the server is 243 # read-only 244 "report": self.handle_report, 245 "auth": self.handle_auth, 246 "get-user": self.handle_get_user, 247 "get-all-users": self.handle_get_all_users, 248 "become-user": self.handle_become_user, 249 } 250 ) 251 252 if not self.server.read_only: 253 self.handlers.update( 254 { 255 "report-equiv": self.handle_equivreport, 256 "reset-stats": self.handle_reset_stats, 257 "backfill-wait": self.handle_backfill_wait, 258 "remove": self.handle_remove, 259 "gc-mark": self.handle_gc_mark, 260 "gc-mark-stream": self.handle_gc_mark_stream, 261 "gc-sweep": self.handle_gc_sweep, 262 "gc-status": self.handle_gc_status, 263 "clean-unused": self.handle_clean_unused, 264 "refresh-token": self.handle_refresh_token, 265 "set-user-perms": self.handle_set_perms, 266 "new-user": self.handle_new_user, 267 "delete-user": self.handle_delete_user, 268 } 269 ) 270 271 def raise_no_user_error(self, username): 272 raise bb.asyncrpc.InvokeError(f"No user named '{username}' exists") 273 274 def user_has_permissions(self, *permissions, allow_anon=True): 275 permissions = set(permissions) 276 if allow_anon: 277 if ALL_PERM in self.server.anon_perms: 278 return True 279 280 if not permissions - self.server.anon_perms: 281 return True 282 283 if self.user is None: 284 return False 285 286 if ALL_PERM in self.user.permissions: 287 return True 288 289 if not permissions - self.user.permissions: 290 return True 291 292 return False 293 294 def validate_proto_version(self): 295 return self.proto_version > (1, 0) and self.proto_version <= (1, 1) 296 297 async def process_requests(self): 298 async with self.server.db_engine.connect(self.logger) as db: 299 self.db = db 300 if self.server.upstream is not None: 301 self.upstream_client = await create_async_client(self.server.upstream) 302 else: 303 self.upstream_client = None 304 305 try: 306 await super().process_requests() 307 finally: 308 if self.upstream_client is not None: 309 await self.upstream_client.close() 310 311 async def dispatch_message(self, msg): 312 for k in self.handlers.keys(): 313 if k in msg: 314 self.logger.debug("Handling %s" % k) 315 if "stream" in k: 316 return await self.handlers[k](msg[k]) 317 else: 318 with self.server.request_stats.start_sample() as self.request_sample, self.request_sample.measure(): 319 return await self.handlers[k](msg[k]) 320 321 raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 322 323 @permissions(READ_PERM) 324 async def handle_get(self, request): 325 method = request["method"] 326 taskhash = request["taskhash"] 327 fetch_all = request.get("all", False) 328 329 return await self.get_unihash(method, taskhash, fetch_all) 330 331 async def get_unihash(self, method, taskhash, fetch_all=False): 332 d = None 333 334 if fetch_all: 335 row = await self.db.get_unihash_by_taskhash_full(method, taskhash) 336 if row is not None: 337 d = {k: row[k] for k in row.keys()} 338 elif self.upstream_client is not None: 339 d = await self.upstream_client.get_taskhash(method, taskhash, True) 340 await self.update_unified(d) 341 else: 342 row = await self.db.get_equivalent(method, taskhash) 343 344 if row is not None: 345 d = {k: row[k] for k in row.keys()} 346 elif self.upstream_client is not None: 347 d = await self.upstream_client.get_taskhash(method, taskhash) 348 await self.db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) 349 350 return d 351 352 @permissions(READ_PERM) 353 async def handle_get_outhash(self, request): 354 method = request["method"] 355 outhash = request["outhash"] 356 taskhash = request["taskhash"] 357 with_unihash = request.get("with_unihash", True) 358 359 return await self.get_outhash(method, outhash, taskhash, with_unihash) 360 361 async def get_outhash(self, method, outhash, taskhash, with_unihash=True): 362 d = None 363 if with_unihash: 364 row = await self.db.get_unihash_by_outhash(method, outhash) 365 else: 366 row = await self.db.get_outhash(method, outhash) 367 368 if row is not None: 369 d = {k: row[k] for k in row.keys()} 370 elif self.upstream_client is not None: 371 d = await self.upstream_client.get_outhash(method, outhash, taskhash) 372 await self.update_unified(d) 373 374 return d 375 376 async def update_unified(self, data): 377 if data is None: 378 return 379 380 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) 381 await self.db.insert_outhash(data) 382 383 async def _stream_handler(self, handler): 384 await self.socket.send_message("ok") 385 386 while True: 387 upstream = None 388 389 l = await self.socket.recv() 390 if not l: 391 break 392 393 try: 394 # This inner loop is very sensitive and must be as fast as 395 # possible (which is why the request sample is handled manually 396 # instead of using 'with', and also why logging statements are 397 # commented out. 398 self.request_sample = self.server.request_stats.start_sample() 399 request_measure = self.request_sample.measure() 400 request_measure.start() 401 402 if l == "END": 403 break 404 405 msg = await handler(l) 406 await self.socket.send(msg) 407 finally: 408 request_measure.end() 409 self.request_sample.end() 410 411 await self.socket.send("ok") 412 return self.NO_RESPONSE 413 414 @permissions(READ_PERM) 415 async def handle_get_stream(self, request): 416 async def handler(l): 417 (method, taskhash) = l.split() 418 # self.logger.debug('Looking up %s %s' % (method, taskhash)) 419 row = await self.db.get_equivalent(method, taskhash) 420 421 if row is not None: 422 # self.logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 423 return row["unihash"] 424 425 if self.upstream_client is not None: 426 upstream = await self.upstream_client.get_unihash(method, taskhash) 427 if upstream: 428 await self.server.backfill_queue.put((method, taskhash)) 429 return upstream 430 431 return "" 432 433 return await self._stream_handler(handler) 434 435 @permissions(READ_PERM) 436 async def handle_exists_stream(self, request): 437 async def handler(l): 438 if await self.db.unihash_exists(l): 439 return "true" 440 441 if self.upstream_client is not None: 442 if await self.upstream_client.unihash_exists(l): 443 return "true" 444 445 return "false" 446 447 return await self._stream_handler(handler) 448 449 async def report_readonly(self, data): 450 method = data["method"] 451 outhash = data["outhash"] 452 taskhash = data["taskhash"] 453 454 info = await self.get_outhash(method, outhash, taskhash) 455 if info: 456 unihash = info["unihash"] 457 else: 458 unihash = data["unihash"] 459 460 return { 461 "taskhash": taskhash, 462 "method": method, 463 "unihash": unihash, 464 } 465 466 # Since this can be called either read only or to report, the check to 467 # report is made inside the function 468 @permissions(READ_PERM) 469 async def handle_report(self, data): 470 if self.server.read_only or not self.user_has_permissions(REPORT_PERM): 471 return await self.report_readonly(data) 472 473 outhash_data = { 474 "method": data["method"], 475 "outhash": data["outhash"], 476 "taskhash": data["taskhash"], 477 "created": datetime.now(), 478 } 479 480 for k in ("owner", "PN", "PV", "PR", "task", "outhash_siginfo"): 481 if k in data: 482 outhash_data[k] = data[k] 483 484 if self.user: 485 outhash_data["owner"] = self.user.username 486 487 # Insert the new entry, unless it already exists 488 if await self.db.insert_outhash(outhash_data): 489 # If this row is new, check if it is equivalent to another 490 # output hash 491 row = await self.db.get_equivalent_for_outhash( 492 data["method"], data["outhash"], data["taskhash"] 493 ) 494 495 if row is not None: 496 # A matching output hash was found. Set our taskhash to the 497 # same unihash since they are equivalent 498 unihash = row["unihash"] 499 else: 500 # No matching output hash was found. This is probably the 501 # first outhash to be added. 502 unihash = data["unihash"] 503 504 # Query upstream to see if it has a unihash we can use 505 if self.upstream_client is not None: 506 upstream_data = await self.upstream_client.get_outhash( 507 data["method"], data["outhash"], data["taskhash"] 508 ) 509 if upstream_data is not None: 510 unihash = upstream_data["unihash"] 511 512 await self.db.insert_unihash(data["method"], data["taskhash"], unihash) 513 514 unihash_data = await self.get_unihash(data["method"], data["taskhash"]) 515 if unihash_data is not None: 516 unihash = unihash_data["unihash"] 517 else: 518 unihash = data["unihash"] 519 520 return { 521 "taskhash": data["taskhash"], 522 "method": data["method"], 523 "unihash": unihash, 524 } 525 526 @permissions(READ_PERM, REPORT_PERM) 527 async def handle_equivreport(self, data): 528 await self.db.insert_unihash(data["method"], data["taskhash"], data["unihash"]) 529 530 # Fetch the unihash that will be reported for the taskhash. If the 531 # unihash matches, it means this row was inserted (or the mapping 532 # was already valid) 533 row = await self.db.get_equivalent(data["method"], data["taskhash"]) 534 535 if row["unihash"] == data["unihash"]: 536 self.logger.info( 537 "Adding taskhash equivalence for %s with unihash %s", 538 data["taskhash"], 539 row["unihash"], 540 ) 541 542 return {k: row[k] for k in ("taskhash", "method", "unihash")} 543 544 @permissions(READ_PERM) 545 async def handle_get_stats(self, request): 546 return { 547 "requests": self.server.request_stats.todict(), 548 } 549 550 @permissions(DB_ADMIN_PERM) 551 async def handle_reset_stats(self, request): 552 d = { 553 "requests": self.server.request_stats.todict(), 554 } 555 556 self.server.request_stats.reset() 557 return d 558 559 @permissions(READ_PERM) 560 async def handle_backfill_wait(self, request): 561 d = { 562 "tasks": self.server.backfill_queue.qsize(), 563 } 564 await self.server.backfill_queue.join() 565 return d 566 567 @permissions(DB_ADMIN_PERM) 568 async def handle_remove(self, request): 569 condition = request["where"] 570 if not isinstance(condition, dict): 571 raise TypeError("Bad condition type %s" % type(condition)) 572 573 return {"count": await self.db.remove(condition)} 574 575 @permissions(DB_ADMIN_PERM) 576 async def handle_gc_mark(self, request): 577 condition = request["where"] 578 mark = request["mark"] 579 580 if not isinstance(condition, dict): 581 raise TypeError("Bad condition type %s" % type(condition)) 582 583 if not isinstance(mark, str): 584 raise TypeError("Bad mark type %s" % type(mark)) 585 586 return {"count": await self.db.gc_mark(mark, condition)} 587 588 @permissions(DB_ADMIN_PERM) 589 async def handle_gc_mark_stream(self, request): 590 async def handler(line): 591 try: 592 decoded_line = json.loads(line) 593 except json.JSONDecodeError as exc: 594 raise bb.asyncrpc.InvokeError( 595 "Could not decode JSONL input '%s'" % line 596 ) from exc 597 598 try: 599 mark = decoded_line["mark"] 600 condition = decoded_line["where"] 601 if not isinstance(mark, str): 602 raise TypeError("Bad mark type %s" % type(mark)) 603 604 if not isinstance(condition, dict): 605 raise TypeError("Bad condition type %s" % type(condition)) 606 except KeyError as exc: 607 raise bb.asyncrpc.InvokeError( 608 "Input line is missing key '%s' " % exc 609 ) from exc 610 611 return json.dumps({"count": await self.db.gc_mark(mark, condition)}) 612 613 return await self._stream_handler(handler) 614 615 @permissions(DB_ADMIN_PERM) 616 async def handle_gc_sweep(self, request): 617 mark = request["mark"] 618 619 if not isinstance(mark, str): 620 raise TypeError("Bad mark type %s" % type(mark)) 621 622 current_mark = await self.db.get_current_gc_mark() 623 624 if not current_mark or mark != current_mark: 625 raise bb.asyncrpc.InvokeError( 626 f"'{mark}' is not the current mark. Refusing to sweep" 627 ) 628 629 count = await self.db.gc_sweep() 630 631 return {"count": count} 632 633 @permissions(DB_ADMIN_PERM) 634 async def handle_gc_status(self, request): 635 (keep_rows, remove_rows, current_mark) = await self.db.gc_status() 636 return { 637 "keep": keep_rows, 638 "remove": remove_rows, 639 "mark": current_mark, 640 } 641 642 @permissions(DB_ADMIN_PERM) 643 async def handle_clean_unused(self, request): 644 max_age = request["max_age_seconds"] 645 oldest = datetime.now() - timedelta(seconds=-max_age) 646 return {"count": await self.db.clean_unused(oldest)} 647 648 @permissions(DB_ADMIN_PERM) 649 async def handle_get_db_usage(self, request): 650 return {"usage": await self.db.get_usage()} 651 652 @permissions(DB_ADMIN_PERM) 653 async def handle_get_db_query_columns(self, request): 654 return {"columns": await self.db.get_query_columns()} 655 656 # The authentication API is always allowed 657 async def handle_auth(self, request): 658 username = str(request["username"]) 659 token = str(request["token"]) 660 661 async def fail_auth(): 662 nonlocal username 663 # Rate limit bad login attempts 664 await asyncio.sleep(1) 665 raise bb.asyncrpc.InvokeError(f"Unable to authenticate as {username}") 666 667 user, db_token = await self.db.lookup_user_token(username) 668 669 if not user or not db_token: 670 await fail_auth() 671 672 try: 673 algo, salt, _ = db_token.split(":") 674 except ValueError: 675 await fail_auth() 676 677 if hash_token(algo, salt, token) != db_token: 678 await fail_auth() 679 680 self.user = user 681 682 self.logger.info("Authenticated as %s", username) 683 684 return { 685 "result": True, 686 "username": self.user.username, 687 "permissions": sorted(list(self.user.permissions)), 688 } 689 690 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False) 691 async def handle_refresh_token(self, request): 692 username = str(request["username"]) 693 694 token = await new_token() 695 696 updated = await self.db.set_user_token( 697 username, 698 hash_token(TOKEN_ALGORITHM, new_salt(), token), 699 ) 700 if not updated: 701 self.raise_no_user_error(username) 702 703 return {"username": username, "token": token} 704 705 def get_perm_arg(self, arg): 706 if not isinstance(arg, list): 707 raise bb.asyncrpc.InvokeError("Unexpected type for permissions") 708 709 arg = set(arg) 710 try: 711 arg.remove(NONE_PERM) 712 except KeyError: 713 pass 714 715 unknown_perms = arg - ALL_PERMISSIONS 716 if unknown_perms: 717 raise bb.asyncrpc.InvokeError( 718 "Unknown permissions %s" % ", ".join(sorted(list(unknown_perms))) 719 ) 720 721 return sorted(list(arg)) 722 723 def return_perms(self, permissions): 724 if ALL_PERM in permissions: 725 return sorted(list(ALL_PERMISSIONS)) 726 return sorted(list(permissions)) 727 728 @permissions(USER_ADMIN_PERM, allow_anon=False) 729 async def handle_set_perms(self, request): 730 username = str(request["username"]) 731 permissions = self.get_perm_arg(request["permissions"]) 732 733 if not await self.db.set_user_perms(username, permissions): 734 self.raise_no_user_error(username) 735 736 return { 737 "username": username, 738 "permissions": self.return_perms(permissions), 739 } 740 741 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False) 742 async def handle_get_user(self, request): 743 username = str(request["username"]) 744 745 user = await self.db.lookup_user(username) 746 if user is None: 747 return None 748 749 return { 750 "username": user.username, 751 "permissions": self.return_perms(user.permissions), 752 } 753 754 @permissions(USER_ADMIN_PERM, allow_anon=False) 755 async def handle_get_all_users(self, request): 756 users = await self.db.get_all_users() 757 return { 758 "users": [ 759 { 760 "username": u.username, 761 "permissions": self.return_perms(u.permissions), 762 } 763 for u in users 764 ] 765 } 766 767 @permissions(USER_ADMIN_PERM, allow_anon=False) 768 async def handle_new_user(self, request): 769 username = str(request["username"]) 770 permissions = self.get_perm_arg(request["permissions"]) 771 772 token = await new_token() 773 774 inserted = await self.db.new_user( 775 username, 776 permissions, 777 hash_token(TOKEN_ALGORITHM, new_salt(), token), 778 ) 779 if not inserted: 780 raise bb.asyncrpc.InvokeError(f"Cannot create new user '{username}'") 781 782 return { 783 "username": username, 784 "permissions": self.return_perms(permissions), 785 "token": token, 786 } 787 788 @permissions(USER_ADMIN_PERM, allow_self_service=True, allow_anon=False) 789 async def handle_delete_user(self, request): 790 username = str(request["username"]) 791 792 if not await self.db.delete_user(username): 793 self.raise_no_user_error(username) 794 795 return {"username": username} 796 797 @permissions(USER_ADMIN_PERM, allow_anon=False) 798 async def handle_become_user(self, request): 799 username = str(request["username"]) 800 801 user = await self.db.lookup_user(username) 802 if user is None: 803 raise bb.asyncrpc.InvokeError(f"User {username} doesn't exist") 804 805 self.user = user 806 807 self.logger.info("Became user %s", username) 808 809 return { 810 "username": self.user.username, 811 "permissions": self.return_perms(self.user.permissions), 812 } 813 814 815class Server(bb.asyncrpc.AsyncServer): 816 def __init__( 817 self, 818 db_engine, 819 upstream=None, 820 read_only=False, 821 anon_perms=DEFAULT_ANON_PERMS, 822 admin_username=None, 823 admin_password=None, 824 ): 825 if upstream and read_only: 826 raise bb.asyncrpc.ServerError( 827 "Read-only hashserv cannot pull from an upstream server" 828 ) 829 830 disallowed_perms = set(anon_perms) - set( 831 [NONE_PERM, READ_PERM, REPORT_PERM, DB_ADMIN_PERM] 832 ) 833 834 if disallowed_perms: 835 raise bb.asyncrpc.ServerError( 836 f"Permission(s) {' '.join(disallowed_perms)} are not allowed for anonymous users" 837 ) 838 839 super().__init__(logger) 840 841 self.request_stats = Stats() 842 self.db_engine = db_engine 843 self.upstream = upstream 844 self.read_only = read_only 845 self.backfill_queue = None 846 self.anon_perms = set(anon_perms) 847 self.admin_username = admin_username 848 self.admin_password = admin_password 849 850 self.logger.info( 851 "Anonymous user permissions are: %s", ", ".join(self.anon_perms) 852 ) 853 854 def accept_client(self, socket): 855 return ServerClient(socket, self) 856 857 async def create_admin_user(self): 858 admin_permissions = (ALL_PERM,) 859 async with self.db_engine.connect(self.logger) as db: 860 added = await db.new_user( 861 self.admin_username, 862 admin_permissions, 863 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password), 864 ) 865 if added: 866 self.logger.info("Created admin user '%s'", self.admin_username) 867 else: 868 await db.set_user_perms( 869 self.admin_username, 870 admin_permissions, 871 ) 872 await db.set_user_token( 873 self.admin_username, 874 hash_token(TOKEN_ALGORITHM, new_salt(), self.admin_password), 875 ) 876 self.logger.info("Admin user '%s' updated", self.admin_username) 877 878 async def backfill_worker_task(self): 879 async with await create_async_client( 880 self.upstream 881 ) as client, self.db_engine.connect(self.logger) as db: 882 while True: 883 item = await self.backfill_queue.get() 884 if item is None: 885 self.backfill_queue.task_done() 886 break 887 888 method, taskhash = item 889 d = await client.get_taskhash(method, taskhash) 890 if d is not None: 891 await db.insert_unihash(d["method"], d["taskhash"], d["unihash"]) 892 self.backfill_queue.task_done() 893 894 def start(self): 895 tasks = super().start() 896 if self.upstream: 897 self.backfill_queue = asyncio.Queue() 898 tasks += [self.backfill_worker_task()] 899 900 self.loop.run_until_complete(self.db_engine.create()) 901 902 if self.admin_username: 903 self.loop.run_until_complete(self.create_admin_user()) 904 905 return tasks 906 907 async def stop(self): 908 if self.backfill_queue is not None: 909 await self.backfill_queue.put(None) 910 await super().stop() 911