1#! /usr/bin/env python3 2# 3# Copyright (C) 2023 Garmin Ltd. 4# 5# SPDX-License-Identifier: GPL-2.0-only 6# 7from datetime import datetime, timezone 8import sqlite3 9import logging 10from contextlib import closing 11from . import User 12 13logger = logging.getLogger("hashserv.sqlite") 14 15UNIHASH_TABLE_DEFINITION = ( 16 ("method", "TEXT NOT NULL", "UNIQUE"), 17 ("taskhash", "TEXT NOT NULL", "UNIQUE"), 18 ("unihash", "TEXT NOT NULL", ""), 19 ("gc_mark", "TEXT NOT NULL", ""), 20) 21 22UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION) 23 24OUTHASH_TABLE_DEFINITION = ( 25 ("method", "TEXT NOT NULL", "UNIQUE"), 26 ("taskhash", "TEXT NOT NULL", "UNIQUE"), 27 ("outhash", "TEXT NOT NULL", "UNIQUE"), 28 ("created", "DATETIME", ""), 29 # Optional fields 30 ("owner", "TEXT", ""), 31 ("PN", "TEXT", ""), 32 ("PV", "TEXT", ""), 33 ("PR", "TEXT", ""), 34 ("task", "TEXT", ""), 35 ("outhash_siginfo", "TEXT", ""), 36) 37 38OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION) 39 40USERS_TABLE_DEFINITION = ( 41 ("username", "TEXT NOT NULL", "UNIQUE"), 42 ("token", "TEXT NOT NULL", ""), 43 ("permissions", "TEXT NOT NULL", ""), 44) 45 46USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION) 47 48 49CONFIG_TABLE_DEFINITION = ( 50 ("name", "TEXT NOT NULL", "UNIQUE"), 51 ("value", "TEXT", ""), 52) 53 54CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION) 55 56 57def adapt_datetime_iso(val): 58 """Adapt datetime.datetime to UTC ISO 8601 date.""" 59 return val.astimezone(timezone.utc).isoformat() 60 61 62sqlite3.register_adapter(datetime, adapt_datetime_iso) 63 64 65def convert_datetime(val): 66 """Convert ISO 8601 datetime to datetime.datetime object.""" 67 return datetime.fromisoformat(val.decode()) 68 69 70sqlite3.register_converter("DATETIME", convert_datetime) 71 72 73def _make_table(cursor, name, definition): 74 cursor.execute( 75 """ 76 CREATE TABLE IF NOT EXISTS {name} ( 77 id INTEGER PRIMARY KEY AUTOINCREMENT, 78 {fields} 79 UNIQUE({unique}) 80 ) 81 """.format( 82 name=name, 83 fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition), 84 unique=", ".join( 85 name for name, _, flags in definition if "UNIQUE" in flags 86 ), 87 ) 88 ) 89 90 91def map_user(row): 92 if row is None: 93 return None 94 return User( 95 username=row["username"], 96 permissions=set(row["permissions"].split()), 97 ) 98 99 100def _make_condition_statement(columns, condition): 101 where = {} 102 for c in columns: 103 if c in condition and condition[c] is not None: 104 where[c] = condition[c] 105 106 return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys()) 107 108 109def _get_sqlite_version(cursor): 110 cursor.execute("SELECT sqlite_version()") 111 112 version = [] 113 for v in cursor.fetchone()[0].split("."): 114 try: 115 version.append(int(v)) 116 except ValueError: 117 version.append(v) 118 119 return tuple(version) 120 121 122def _schema_table_name(version): 123 if version >= (3, 33): 124 return "sqlite_schema" 125 126 return "sqlite_master" 127 128 129class DatabaseEngine(object): 130 def __init__(self, dbname, sync): 131 self.dbname = dbname 132 self.logger = logger 133 self.sync = sync 134 135 async def create(self): 136 db = sqlite3.connect(self.dbname) 137 db.row_factory = sqlite3.Row 138 139 with closing(db.cursor()) as cursor: 140 _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION) 141 _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION) 142 _make_table(cursor, "users", USERS_TABLE_DEFINITION) 143 _make_table(cursor, "config", CONFIG_TABLE_DEFINITION) 144 145 cursor.execute("PRAGMA journal_mode = WAL") 146 cursor.execute( 147 "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF") 148 ) 149 150 # Drop old indexes 151 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup") 152 cursor.execute("DROP INDEX IF EXISTS outhash_lookup") 153 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2") 154 cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2") 155 cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3") 156 157 # TODO: Upgrade from tasks_v2? 158 cursor.execute("DROP TABLE IF EXISTS tasks_v2") 159 160 # Create new indexes 161 cursor.execute( 162 "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)" 163 ) 164 cursor.execute( 165 "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)" 166 ) 167 cursor.execute( 168 "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)" 169 ) 170 cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)") 171 172 sqlite_version = _get_sqlite_version(cursor) 173 174 cursor.execute( 175 f""" 176 SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2' 177 """ 178 ) 179 if cursor.fetchone(): 180 self.logger.info("Upgrading Unihashes V2 -> V3...") 181 cursor.execute( 182 """ 183 INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark) 184 SELECT id, method, unihash, taskhash, '' FROM unihashes_v2 185 """ 186 ) 187 cursor.execute("DROP TABLE unihashes_v2") 188 db.commit() 189 self.logger.info("Upgrade complete") 190 191 def connect(self, logger): 192 return Database(logger, self.dbname, self.sync) 193 194 195class Database(object): 196 def __init__(self, logger, dbname, sync): 197 self.dbname = dbname 198 self.logger = logger 199 200 self.db = sqlite3.connect(self.dbname) 201 self.db.row_factory = sqlite3.Row 202 203 with closing(self.db.cursor()) as cursor: 204 cursor.execute("PRAGMA journal_mode = WAL") 205 cursor.execute( 206 "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF") 207 ) 208 209 self.sqlite_version = _get_sqlite_version(cursor) 210 211 async def __aenter__(self): 212 return self 213 214 async def __aexit__(self, exc_type, exc_value, traceback): 215 await self.close() 216 217 async def _set_config(self, cursor, name, value): 218 cursor.execute( 219 """ 220 INSERT OR REPLACE INTO config (id, name, value) VALUES 221 ((SELECT id FROM config WHERE name=:name), :name, :value) 222 """, 223 { 224 "name": name, 225 "value": value, 226 }, 227 ) 228 229 async def _get_config(self, cursor, name): 230 cursor.execute( 231 "SELECT value FROM config WHERE name=:name", 232 { 233 "name": name, 234 }, 235 ) 236 row = cursor.fetchone() 237 if row is None: 238 return None 239 return row["value"] 240 241 async def close(self): 242 self.db.close() 243 244 async def get_unihash_by_taskhash_full(self, method, taskhash): 245 with closing(self.db.cursor()) as cursor: 246 cursor.execute( 247 """ 248 SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 249 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash 250 WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash 251 ORDER BY outhashes_v2.created ASC 252 LIMIT 1 253 """, 254 { 255 "method": method, 256 "taskhash": taskhash, 257 }, 258 ) 259 return cursor.fetchone() 260 261 async def get_unihash_by_outhash(self, method, outhash): 262 with closing(self.db.cursor()) as cursor: 263 cursor.execute( 264 """ 265 SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2 266 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash 267 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash 268 ORDER BY outhashes_v2.created ASC 269 LIMIT 1 270 """, 271 { 272 "method": method, 273 "outhash": outhash, 274 }, 275 ) 276 return cursor.fetchone() 277 278 async def unihash_exists(self, unihash): 279 with closing(self.db.cursor()) as cursor: 280 cursor.execute( 281 """ 282 SELECT * FROM unihashes_v3 WHERE unihash=:unihash 283 LIMIT 1 284 """, 285 { 286 "unihash": unihash, 287 }, 288 ) 289 return cursor.fetchone() is not None 290 291 async def get_outhash(self, method, outhash): 292 with closing(self.db.cursor()) as cursor: 293 cursor.execute( 294 """ 295 SELECT * FROM outhashes_v2 296 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash 297 ORDER BY outhashes_v2.created ASC 298 LIMIT 1 299 """, 300 { 301 "method": method, 302 "outhash": outhash, 303 }, 304 ) 305 return cursor.fetchone() 306 307 async def get_equivalent_for_outhash(self, method, outhash, taskhash): 308 with closing(self.db.cursor()) as cursor: 309 cursor.execute( 310 """ 311 SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2 312 INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash 313 -- Select any matching output hash except the one we just inserted 314 WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash 315 -- Pick the oldest hash 316 ORDER BY outhashes_v2.created ASC 317 LIMIT 1 318 """, 319 { 320 "method": method, 321 "outhash": outhash, 322 "taskhash": taskhash, 323 }, 324 ) 325 return cursor.fetchone() 326 327 async def get_equivalent(self, method, taskhash): 328 with closing(self.db.cursor()) as cursor: 329 cursor.execute( 330 "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash", 331 { 332 "method": method, 333 "taskhash": taskhash, 334 }, 335 ) 336 return cursor.fetchone() 337 338 async def remove(self, condition): 339 def do_remove(columns, table_name, cursor): 340 where, clause = _make_condition_statement(columns, condition) 341 if where: 342 query = f"DELETE FROM {table_name} WHERE {clause}" 343 cursor.execute(query, where) 344 return cursor.rowcount 345 346 return 0 347 348 count = 0 349 with closing(self.db.cursor()) as cursor: 350 count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor) 351 count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor) 352 self.db.commit() 353 354 return count 355 356 async def get_current_gc_mark(self): 357 with closing(self.db.cursor()) as cursor: 358 return await self._get_config(cursor, "gc-mark") 359 360 async def gc_status(self): 361 with closing(self.db.cursor()) as cursor: 362 cursor.execute( 363 """ 364 SELECT COUNT() FROM unihashes_v3 WHERE 365 gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') 366 """ 367 ) 368 keep_rows = cursor.fetchone()[0] 369 370 cursor.execute( 371 """ 372 SELECT COUNT() FROM unihashes_v3 WHERE 373 gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') 374 """ 375 ) 376 remove_rows = cursor.fetchone()[0] 377 378 current_mark = await self._get_config(cursor, "gc-mark") 379 380 return (keep_rows, remove_rows, current_mark) 381 382 async def gc_mark(self, mark, condition): 383 with closing(self.db.cursor()) as cursor: 384 await self._set_config(cursor, "gc-mark", mark) 385 386 where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition) 387 388 new_rows = 0 389 if where: 390 cursor.execute( 391 f""" 392 UPDATE unihashes_v3 SET 393 gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') 394 WHERE {clause} 395 """, 396 where, 397 ) 398 new_rows = cursor.rowcount 399 400 self.db.commit() 401 return new_rows 402 403 async def gc_sweep(self): 404 with closing(self.db.cursor()) as cursor: 405 # NOTE: COALESCE is not used in this query so that if the current 406 # mark is NULL, nothing will happen 407 cursor.execute( 408 """ 409 DELETE FROM unihashes_v3 WHERE 410 gc_mark!=(SELECT value FROM config WHERE name='gc-mark') 411 """ 412 ) 413 count = cursor.rowcount 414 await self._set_config(cursor, "gc-mark", None) 415 416 self.db.commit() 417 return count 418 419 async def clean_unused(self, oldest): 420 with closing(self.db.cursor()) as cursor: 421 cursor.execute( 422 """ 423 DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS ( 424 SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1 425 ) 426 """, 427 { 428 "oldest": oldest, 429 }, 430 ) 431 self.db.commit() 432 return cursor.rowcount 433 434 async def insert_unihash(self, method, taskhash, unihash): 435 with closing(self.db.cursor()) as cursor: 436 prevrowid = cursor.lastrowid 437 cursor.execute( 438 """ 439 INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES 440 ( 441 :method, 442 :taskhash, 443 :unihash, 444 COALESCE((SELECT value FROM config WHERE name='gc-mark'), '') 445 ) 446 """, 447 { 448 "method": method, 449 "taskhash": taskhash, 450 "unihash": unihash, 451 }, 452 ) 453 self.db.commit() 454 return cursor.lastrowid != prevrowid 455 456 async def insert_outhash(self, data): 457 data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS} 458 keys = sorted(data.keys()) 459 query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format( 460 fields=", ".join(keys), 461 values=", ".join(":" + k for k in keys), 462 ) 463 with closing(self.db.cursor()) as cursor: 464 prevrowid = cursor.lastrowid 465 cursor.execute(query, data) 466 self.db.commit() 467 return cursor.lastrowid != prevrowid 468 469 def _get_user(self, username): 470 with closing(self.db.cursor()) as cursor: 471 cursor.execute( 472 """ 473 SELECT username, permissions, token FROM users WHERE username=:username 474 """, 475 { 476 "username": username, 477 }, 478 ) 479 return cursor.fetchone() 480 481 async def lookup_user_token(self, username): 482 row = self._get_user(username) 483 if row is None: 484 return None, None 485 return map_user(row), row["token"] 486 487 async def lookup_user(self, username): 488 return map_user(self._get_user(username)) 489 490 async def set_user_token(self, username, token): 491 with closing(self.db.cursor()) as cursor: 492 cursor.execute( 493 """ 494 UPDATE users SET token=:token WHERE username=:username 495 """, 496 { 497 "username": username, 498 "token": token, 499 }, 500 ) 501 self.db.commit() 502 return cursor.rowcount != 0 503 504 async def set_user_perms(self, username, permissions): 505 with closing(self.db.cursor()) as cursor: 506 cursor.execute( 507 """ 508 UPDATE users SET permissions=:permissions WHERE username=:username 509 """, 510 { 511 "username": username, 512 "permissions": " ".join(permissions), 513 }, 514 ) 515 self.db.commit() 516 return cursor.rowcount != 0 517 518 async def get_all_users(self): 519 with closing(self.db.cursor()) as cursor: 520 cursor.execute("SELECT username, permissions FROM users") 521 return [map_user(r) for r in cursor.fetchall()] 522 523 async def new_user(self, username, permissions, token): 524 with closing(self.db.cursor()) as cursor: 525 try: 526 cursor.execute( 527 """ 528 INSERT INTO users (username, token, permissions) VALUES (:username, :token, :permissions) 529 """, 530 { 531 "username": username, 532 "token": token, 533 "permissions": " ".join(permissions), 534 }, 535 ) 536 self.db.commit() 537 return True 538 except sqlite3.IntegrityError: 539 return False 540 541 async def delete_user(self, username): 542 with closing(self.db.cursor()) as cursor: 543 cursor.execute( 544 """ 545 DELETE FROM users WHERE username=:username 546 """, 547 { 548 "username": username, 549 }, 550 ) 551 self.db.commit() 552 return cursor.rowcount != 0 553 554 async def get_usage(self): 555 usage = {} 556 with closing(self.db.cursor()) as cursor: 557 cursor.execute( 558 f""" 559 SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%' 560 """ 561 ) 562 for row in cursor.fetchall(): 563 cursor.execute( 564 """ 565 SELECT COUNT() FROM %s 566 """ 567 % row["name"], 568 ) 569 usage[row["name"]] = { 570 "rows": cursor.fetchone()[0], 571 } 572 return usage 573 574 async def get_query_columns(self): 575 columns = set() 576 for name, typ, _ in UNIHASH_TABLE_DEFINITION + OUTHASH_TABLE_DEFINITION: 577 if typ.startswith("TEXT"): 578 columns.add(name) 579 return list(columns) 580