xref: /openbmc/openbmc/poky/bitbake/lib/hashserv/sqlite.py (revision c9537f57ab488bf5d90132917b0184e2527970a5)
1#! /usr/bin/env python3
2#
3# Copyright (C) 2023 Garmin Ltd.
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7from datetime import datetime, timezone
8import sqlite3
9import logging
10from contextlib import closing
11from . import User
12
13logger = logging.getLogger("hashserv.sqlite")
14
15UNIHASH_TABLE_DEFINITION = (
16    ("method", "TEXT NOT NULL", "UNIQUE"),
17    ("taskhash", "TEXT NOT NULL", "UNIQUE"),
18    ("unihash", "TEXT NOT NULL", ""),
19    ("gc_mark", "TEXT NOT NULL", ""),
20)
21
22UNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION)
23
24OUTHASH_TABLE_DEFINITION = (
25    ("method", "TEXT NOT NULL", "UNIQUE"),
26    ("taskhash", "TEXT NOT NULL", "UNIQUE"),
27    ("outhash", "TEXT NOT NULL", "UNIQUE"),
28    ("created", "DATETIME", ""),
29    # Optional fields
30    ("owner", "TEXT", ""),
31    ("PN", "TEXT", ""),
32    ("PV", "TEXT", ""),
33    ("PR", "TEXT", ""),
34    ("task", "TEXT", ""),
35    ("outhash_siginfo", "TEXT", ""),
36)
37
38OUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION)
39
40USERS_TABLE_DEFINITION = (
41    ("username", "TEXT NOT NULL", "UNIQUE"),
42    ("token", "TEXT NOT NULL", ""),
43    ("permissions", "TEXT NOT NULL", ""),
44)
45
46USERS_TABLE_COLUMNS = tuple(name for name, _, _ in USERS_TABLE_DEFINITION)
47
48
49CONFIG_TABLE_DEFINITION = (
50    ("name", "TEXT NOT NULL", "UNIQUE"),
51    ("value", "TEXT", ""),
52)
53
54CONFIG_TABLE_COLUMNS = tuple(name for name, _, _ in CONFIG_TABLE_DEFINITION)
55
56
57def adapt_datetime_iso(val):
58    """Adapt datetime.datetime to UTC ISO 8601 date."""
59    return val.astimezone(timezone.utc).isoformat()
60
61
62sqlite3.register_adapter(datetime, adapt_datetime_iso)
63
64
65def convert_datetime(val):
66    """Convert ISO 8601 datetime to datetime.datetime object."""
67    return datetime.fromisoformat(val.decode())
68
69
70sqlite3.register_converter("DATETIME", convert_datetime)
71
72
73def _make_table(cursor, name, definition):
74    cursor.execute(
75        """
76        CREATE TABLE IF NOT EXISTS {name} (
77            id INTEGER PRIMARY KEY AUTOINCREMENT,
78            {fields}
79            UNIQUE({unique})
80            )
81        """.format(
82            name=name,
83            fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition),
84            unique=", ".join(
85                name for name, _, flags in definition if "UNIQUE" in flags
86            ),
87        )
88    )
89
90
91def map_user(row):
92    if row is None:
93        return None
94    return User(
95        username=row["username"],
96        permissions=set(row["permissions"].split()),
97    )
98
99
100def _make_condition_statement(columns, condition):
101    where = {}
102    for c in columns:
103        if c in condition and condition[c] is not None:
104            where[c] = condition[c]
105
106    return where, " AND ".join("%s=:%s" % (k, k) for k in where.keys())
107
108
109def _get_sqlite_version(cursor):
110    cursor.execute("SELECT sqlite_version()")
111
112    version = []
113    for v in cursor.fetchone()[0].split("."):
114        try:
115            version.append(int(v))
116        except ValueError:
117            version.append(v)
118
119    return tuple(version)
120
121
122def _schema_table_name(version):
123    if version >= (3, 33):
124        return "sqlite_schema"
125
126    return "sqlite_master"
127
128
129class DatabaseEngine(object):
130    def __init__(self, dbname, sync):
131        self.dbname = dbname
132        self.logger = logger
133        self.sync = sync
134
135    async def create(self):
136        db = sqlite3.connect(self.dbname)
137        db.row_factory = sqlite3.Row
138
139        with closing(db.cursor()) as cursor:
140            _make_table(cursor, "unihashes_v3", UNIHASH_TABLE_DEFINITION)
141            _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION)
142            _make_table(cursor, "users", USERS_TABLE_DEFINITION)
143            _make_table(cursor, "config", CONFIG_TABLE_DEFINITION)
144
145            cursor.execute("PRAGMA journal_mode = WAL")
146            cursor.execute(
147                "PRAGMA synchronous = %s" % ("NORMAL" if self.sync else "OFF")
148            )
149
150            # Drop old indexes
151            cursor.execute("DROP INDEX IF EXISTS taskhash_lookup")
152            cursor.execute("DROP INDEX IF EXISTS outhash_lookup")
153            cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v2")
154            cursor.execute("DROP INDEX IF EXISTS outhash_lookup_v2")
155            cursor.execute("DROP INDEX IF EXISTS taskhash_lookup_v3")
156
157            # TODO: Upgrade from tasks_v2?
158            cursor.execute("DROP TABLE IF EXISTS tasks_v2")
159
160            # Create new indexes
161            cursor.execute(
162                "CREATE INDEX IF NOT EXISTS taskhash_lookup_v4 ON unihashes_v3 (method, taskhash)"
163            )
164            cursor.execute(
165                "CREATE INDEX IF NOT EXISTS unihash_lookup_v1 ON unihashes_v3 (unihash)"
166            )
167            cursor.execute(
168                "CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)"
169            )
170            cursor.execute("CREATE INDEX IF NOT EXISTS config_lookup ON config (name)")
171
172            sqlite_version = _get_sqlite_version(cursor)
173
174            cursor.execute(
175                f"""
176                SELECT name FROM {_schema_table_name(sqlite_version)} WHERE type = 'table' AND name = 'unihashes_v2'
177                """
178            )
179            if cursor.fetchone():
180                self.logger.info("Upgrading Unihashes V2 -> V3...")
181                cursor.execute(
182                    """
183                    INSERT INTO unihashes_v3 (id, method, unihash, taskhash, gc_mark)
184                    SELECT id, method, unihash, taskhash, '' FROM unihashes_v2
185                    """
186                )
187                cursor.execute("DROP TABLE unihashes_v2")
188                db.commit()
189                self.logger.info("Upgrade complete")
190
191    def connect(self, logger):
192        return Database(logger, self.dbname, self.sync)
193
194
195class Database(object):
196    def __init__(self, logger, dbname, sync):
197        self.dbname = dbname
198        self.logger = logger
199
200        self.db = sqlite3.connect(self.dbname)
201        self.db.row_factory = sqlite3.Row
202
203        with closing(self.db.cursor()) as cursor:
204            cursor.execute("PRAGMA journal_mode = WAL")
205            cursor.execute(
206                "PRAGMA synchronous = %s" % ("NORMAL" if sync else "OFF")
207            )
208
209            self.sqlite_version = _get_sqlite_version(cursor)
210
211    async def __aenter__(self):
212        return self
213
214    async def __aexit__(self, exc_type, exc_value, traceback):
215        await self.close()
216
217    async def _set_config(self, cursor, name, value):
218        cursor.execute(
219            """
220            INSERT OR REPLACE INTO config (id, name, value) VALUES
221            ((SELECT id FROM config WHERE name=:name), :name, :value)
222            """,
223            {
224                "name": name,
225                "value": value,
226            },
227        )
228
229    async def _get_config(self, cursor, name):
230        cursor.execute(
231            "SELECT value FROM config WHERE name=:name",
232            {
233                "name": name,
234            },
235        )
236        row = cursor.fetchone()
237        if row is None:
238            return None
239        return row["value"]
240
241    async def close(self):
242        self.db.close()
243
244    async def get_unihash_by_taskhash_full(self, method, taskhash):
245        with closing(self.db.cursor()) as cursor:
246            cursor.execute(
247                """
248                SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2
249                INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
250                WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
251                ORDER BY outhashes_v2.created ASC
252                LIMIT 1
253                """,
254                {
255                    "method": method,
256                    "taskhash": taskhash,
257                },
258            )
259            return cursor.fetchone()
260
261    async def get_unihash_by_outhash(self, method, outhash):
262        with closing(self.db.cursor()) as cursor:
263            cursor.execute(
264                """
265                SELECT *, unihashes_v3.unihash AS unihash FROM outhashes_v2
266                INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
267                WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
268                ORDER BY outhashes_v2.created ASC
269                LIMIT 1
270                """,
271                {
272                    "method": method,
273                    "outhash": outhash,
274                },
275            )
276            return cursor.fetchone()
277
278    async def unihash_exists(self, unihash):
279        with closing(self.db.cursor()) as cursor:
280            cursor.execute(
281                """
282                SELECT * FROM unihashes_v3 WHERE unihash=:unihash
283                LIMIT 1
284                """,
285                {
286                    "unihash": unihash,
287                },
288            )
289            return cursor.fetchone() is not None
290
291    async def get_outhash(self, method, outhash):
292        with closing(self.db.cursor()) as cursor:
293            cursor.execute(
294                """
295                SELECT * FROM outhashes_v2
296                WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
297                ORDER BY outhashes_v2.created ASC
298                LIMIT 1
299                """,
300                {
301                    "method": method,
302                    "outhash": outhash,
303                },
304            )
305            return cursor.fetchone()
306
307    async def get_equivalent_for_outhash(self, method, outhash, taskhash):
308        with closing(self.db.cursor()) as cursor:
309            cursor.execute(
310                """
311                SELECT outhashes_v2.taskhash AS taskhash, unihashes_v3.unihash AS unihash FROM outhashes_v2
312                INNER JOIN unihashes_v3 ON unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash
313                -- Select any matching output hash except the one we just inserted
314                WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
315                -- Pick the oldest hash
316                ORDER BY outhashes_v2.created ASC
317                LIMIT 1
318                """,
319                {
320                    "method": method,
321                    "outhash": outhash,
322                    "taskhash": taskhash,
323                },
324            )
325            return cursor.fetchone()
326
327    async def get_equivalent(self, method, taskhash):
328        with closing(self.db.cursor()) as cursor:
329            cursor.execute(
330                "SELECT taskhash, method, unihash FROM unihashes_v3 WHERE method=:method AND taskhash=:taskhash",
331                {
332                    "method": method,
333                    "taskhash": taskhash,
334                },
335            )
336            return cursor.fetchone()
337
338    async def remove(self, condition):
339        def do_remove(columns, table_name, cursor):
340            where, clause = _make_condition_statement(columns, condition)
341            if where:
342                query = f"DELETE FROM {table_name} WHERE {clause}"
343                cursor.execute(query, where)
344                return cursor.rowcount
345
346            return 0
347
348        count = 0
349        with closing(self.db.cursor()) as cursor:
350            count += do_remove(OUTHASH_TABLE_COLUMNS, "outhashes_v2", cursor)
351            count += do_remove(UNIHASH_TABLE_COLUMNS, "unihashes_v3", cursor)
352            self.db.commit()
353
354        return count
355
356    async def get_current_gc_mark(self):
357        with closing(self.db.cursor()) as cursor:
358            return await self._get_config(cursor, "gc-mark")
359
360    async def gc_status(self):
361        with closing(self.db.cursor()) as cursor:
362            cursor.execute(
363                """
364                SELECT COUNT() FROM unihashes_v3 WHERE
365                    gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
366                """
367            )
368            keep_rows = cursor.fetchone()[0]
369
370            cursor.execute(
371                """
372                SELECT COUNT() FROM unihashes_v3 WHERE
373                    gc_mark!=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
374                """
375            )
376            remove_rows = cursor.fetchone()[0]
377
378            current_mark = await self._get_config(cursor, "gc-mark")
379
380            return (keep_rows, remove_rows, current_mark)
381
382    async def gc_mark(self, mark, condition):
383        with closing(self.db.cursor()) as cursor:
384            await self._set_config(cursor, "gc-mark", mark)
385
386            where, clause = _make_condition_statement(UNIHASH_TABLE_COLUMNS, condition)
387
388            new_rows = 0
389            if where:
390                cursor.execute(
391                    f"""
392                    UPDATE unihashes_v3 SET
393                        gc_mark=COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
394                    WHERE {clause}
395                    """,
396                    where,
397                )
398                new_rows = cursor.rowcount
399
400            self.db.commit()
401            return new_rows
402
403    async def gc_sweep(self):
404        with closing(self.db.cursor()) as cursor:
405            # NOTE: COALESCE is not used in this query so that if the current
406            # mark is NULL, nothing will happen
407            cursor.execute(
408                """
409                DELETE FROM unihashes_v3 WHERE
410                    gc_mark!=(SELECT value FROM config WHERE name='gc-mark')
411                """
412            )
413            count = cursor.rowcount
414            await self._set_config(cursor, "gc-mark", None)
415
416            self.db.commit()
417            return count
418
419    async def clean_unused(self, oldest):
420        with closing(self.db.cursor()) as cursor:
421            cursor.execute(
422                """
423                DELETE FROM outhashes_v2 WHERE created<:oldest AND NOT EXISTS (
424                    SELECT unihashes_v3.id FROM unihashes_v3 WHERE unihashes_v3.method=outhashes_v2.method AND unihashes_v3.taskhash=outhashes_v2.taskhash LIMIT 1
425                )
426                """,
427                {
428                    "oldest": oldest,
429                },
430            )
431            self.db.commit()
432            return cursor.rowcount
433
434    async def insert_unihash(self, method, taskhash, unihash):
435        with closing(self.db.cursor()) as cursor:
436            prevrowid = cursor.lastrowid
437            cursor.execute(
438                """
439                INSERT OR IGNORE INTO unihashes_v3 (method, taskhash, unihash, gc_mark) VALUES
440                    (
441                    :method,
442                    :taskhash,
443                    :unihash,
444                    COALESCE((SELECT value FROM config WHERE name='gc-mark'), '')
445                    )
446                """,
447                {
448                    "method": method,
449                    "taskhash": taskhash,
450                    "unihash": unihash,
451                },
452            )
453            self.db.commit()
454            return cursor.lastrowid != prevrowid
455
456    async def insert_outhash(self, data):
457        data = {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}
458        keys = sorted(data.keys())
459        query = "INSERT OR IGNORE INTO outhashes_v2 ({fields}) VALUES({values})".format(
460            fields=", ".join(keys),
461            values=", ".join(":" + k for k in keys),
462        )
463        with closing(self.db.cursor()) as cursor:
464            prevrowid = cursor.lastrowid
465            cursor.execute(query, data)
466            self.db.commit()
467            return cursor.lastrowid != prevrowid
468
469    def _get_user(self, username):
470        with closing(self.db.cursor()) as cursor:
471            cursor.execute(
472                """
473                SELECT username, permissions, token FROM users WHERE username=:username
474                """,
475                {
476                    "username": username,
477                },
478            )
479            return cursor.fetchone()
480
481    async def lookup_user_token(self, username):
482        row = self._get_user(username)
483        if row is None:
484            return None, None
485        return map_user(row), row["token"]
486
487    async def lookup_user(self, username):
488        return map_user(self._get_user(username))
489
490    async def set_user_token(self, username, token):
491        with closing(self.db.cursor()) as cursor:
492            cursor.execute(
493                """
494                UPDATE users SET token=:token WHERE username=:username
495                """,
496                {
497                    "username": username,
498                    "token": token,
499                },
500            )
501            self.db.commit()
502            return cursor.rowcount != 0
503
504    async def set_user_perms(self, username, permissions):
505        with closing(self.db.cursor()) as cursor:
506            cursor.execute(
507                """
508                UPDATE users SET permissions=:permissions WHERE username=:username
509                """,
510                {
511                    "username": username,
512                    "permissions": " ".join(permissions),
513                },
514            )
515            self.db.commit()
516            return cursor.rowcount != 0
517
518    async def get_all_users(self):
519        with closing(self.db.cursor()) as cursor:
520            cursor.execute("SELECT username, permissions FROM users")
521            return [map_user(r) for r in cursor.fetchall()]
522
523    async def new_user(self, username, permissions, token):
524        with closing(self.db.cursor()) as cursor:
525            try:
526                cursor.execute(
527                    """
528                    INSERT INTO users (username, token, permissions) VALUES (:username, :token, :permissions)
529                    """,
530                    {
531                        "username": username,
532                        "token": token,
533                        "permissions": " ".join(permissions),
534                    },
535                )
536                self.db.commit()
537                return True
538            except sqlite3.IntegrityError:
539                return False
540
541    async def delete_user(self, username):
542        with closing(self.db.cursor()) as cursor:
543            cursor.execute(
544                """
545                DELETE FROM users WHERE username=:username
546                """,
547                {
548                    "username": username,
549                },
550            )
551            self.db.commit()
552            return cursor.rowcount != 0
553
554    async def get_usage(self):
555        usage = {}
556        with closing(self.db.cursor()) as cursor:
557            cursor.execute(
558                f"""
559                SELECT name FROM {_schema_table_name(self.sqlite_version)} WHERE type = 'table' AND name NOT LIKE 'sqlite_%'
560                """
561            )
562            for row in cursor.fetchall():
563                cursor.execute(
564                    """
565                    SELECT COUNT() FROM %s
566                    """
567                    % row["name"],
568                )
569                usage[row["name"]] = {
570                    "rows": cursor.fetchone()[0],
571                }
572        return usage
573
574    async def get_query_columns(self):
575        columns = set()
576        for name, typ, _ in UNIHASH_TABLE_DEFINITION + OUTHASH_TABLE_DEFINITION:
577            if typ.startswith("TEXT"):
578                columns.add(name)
579        return list(columns)
580