1*08902b01SBrad Bishop# Copyright (C) 2018-2019 Garmin Ltd.
219323693SBrad Bishop#
3c342db35SBrad Bishop# SPDX-License-Identifier: GPL-2.0-only
419323693SBrad Bishop#
519323693SBrad Bishop
619323693SBrad Bishopfrom http.server import BaseHTTPRequestHandler, HTTPServer
719323693SBrad Bishopimport contextlib
819323693SBrad Bishopimport urllib.parse
919323693SBrad Bishopimport sqlite3
1019323693SBrad Bishopimport json
1119323693SBrad Bishopimport traceback
1219323693SBrad Bishopimport logging
13*08902b01SBrad Bishopimport socketserver
14*08902b01SBrad Bishopimport queue
15*08902b01SBrad Bishopimport threading
16*08902b01SBrad Bishopimport signal
17*08902b01SBrad Bishopimport socket
18*08902b01SBrad Bishopimport struct
1919323693SBrad Bishopfrom datetime import datetime
2019323693SBrad Bishop
2119323693SBrad Bishoplogger = logging.getLogger('hashserv')
2219323693SBrad Bishop
2319323693SBrad Bishopclass HashEquivalenceServer(BaseHTTPRequestHandler):
2419323693SBrad Bishop    def log_message(self, f, *args):
2519323693SBrad Bishop        logger.debug(f, *args)
2619323693SBrad Bishop
27*08902b01SBrad Bishop    def opendb(self):
28*08902b01SBrad Bishop        self.db = sqlite3.connect(self.dbname)
29*08902b01SBrad Bishop        self.db.row_factory = sqlite3.Row
30*08902b01SBrad Bishop        self.db.execute("PRAGMA synchronous = OFF;")
31*08902b01SBrad Bishop        self.db.execute("PRAGMA journal_mode = MEMORY;")
32*08902b01SBrad Bishop
3319323693SBrad Bishop    def do_GET(self):
3419323693SBrad Bishop        try:
35*08902b01SBrad Bishop            if not self.db:
36*08902b01SBrad Bishop                self.opendb()
37*08902b01SBrad Bishop
3819323693SBrad Bishop            p = urllib.parse.urlparse(self.path)
3919323693SBrad Bishop
4019323693SBrad Bishop            if p.path != self.prefix + '/v1/equivalent':
4119323693SBrad Bishop                self.send_error(404)
4219323693SBrad Bishop                return
4319323693SBrad Bishop
4419323693SBrad Bishop            query = urllib.parse.parse_qs(p.query, strict_parsing=True)
4519323693SBrad Bishop            method = query['method'][0]
4619323693SBrad Bishop            taskhash = query['taskhash'][0]
4719323693SBrad Bishop
4819323693SBrad Bishop            d = None
4919323693SBrad Bishop            with contextlib.closing(self.db.cursor()) as cursor:
50*08902b01SBrad Bishop                cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
5119323693SBrad Bishop                        {'method': method, 'taskhash': taskhash})
5219323693SBrad Bishop
5319323693SBrad Bishop                row = cursor.fetchone()
5419323693SBrad Bishop
5519323693SBrad Bishop                if row is not None:
5619323693SBrad Bishop                    logger.debug('Found equivalent task %s', row['taskhash'])
5719323693SBrad Bishop                    d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
5819323693SBrad Bishop
5919323693SBrad Bishop            self.send_response(200)
6019323693SBrad Bishop            self.send_header('Content-Type', 'application/json; charset=utf-8')
6119323693SBrad Bishop            self.end_headers()
6219323693SBrad Bishop            self.wfile.write(json.dumps(d).encode('utf-8'))
6319323693SBrad Bishop        except:
6419323693SBrad Bishop            logger.exception('Error in GET')
6519323693SBrad Bishop            self.send_error(400, explain=traceback.format_exc())
6619323693SBrad Bishop            return
6719323693SBrad Bishop
6819323693SBrad Bishop    def do_POST(self):
6919323693SBrad Bishop        try:
70*08902b01SBrad Bishop            if not self.db:
71*08902b01SBrad Bishop                self.opendb()
72*08902b01SBrad Bishop
7319323693SBrad Bishop            p = urllib.parse.urlparse(self.path)
7419323693SBrad Bishop
7519323693SBrad Bishop            if p.path != self.prefix + '/v1/equivalent':
7619323693SBrad Bishop                self.send_error(404)
7719323693SBrad Bishop                return
7819323693SBrad Bishop
7919323693SBrad Bishop            length = int(self.headers['content-length'])
8019323693SBrad Bishop            data = json.loads(self.rfile.read(length).decode('utf-8'))
8119323693SBrad Bishop
8219323693SBrad Bishop            with contextlib.closing(self.db.cursor()) as cursor:
8319323693SBrad Bishop                cursor.execute('''
84*08902b01SBrad Bishop                    -- Find tasks with a matching outhash (that is, tasks that
85*08902b01SBrad Bishop                    -- are equivalent)
86*08902b01SBrad Bishop                    SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash
87*08902b01SBrad Bishop
88*08902b01SBrad Bishop                    -- If there is an exact match on the taskhash, return it.
89*08902b01SBrad Bishop                    -- Otherwise return the oldest matching outhash of any
90*08902b01SBrad Bishop                    -- taskhash
9119323693SBrad Bishop                    ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
9219323693SBrad Bishop                        created ASC
93*08902b01SBrad Bishop
94*08902b01SBrad Bishop                    -- Only return one row
9519323693SBrad Bishop                    LIMIT 1
9619323693SBrad Bishop                    ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})
9719323693SBrad Bishop
9819323693SBrad Bishop                row = cursor.fetchone()
9919323693SBrad Bishop
100*08902b01SBrad Bishop                # If no matching outhash was found, or one *was* found but it
101*08902b01SBrad Bishop                # wasn't an exact match on the taskhash, a new entry for this
102*08902b01SBrad Bishop                # taskhash should be added
10319323693SBrad Bishop                if row is None or row['taskhash'] != data['taskhash']:
104*08902b01SBrad Bishop                    # If a row matching the outhash was found, the unihash for
105*08902b01SBrad Bishop                    # the new taskhash should be the same as that one.
106*08902b01SBrad Bishop                    # Otherwise the caller provided unihash is used.
10719323693SBrad Bishop                    unihash = data['unihash']
10819323693SBrad Bishop                    if row is not None:
10919323693SBrad Bishop                        unihash = row['unihash']
11019323693SBrad Bishop
11119323693SBrad Bishop                    insert_data = {
11219323693SBrad Bishop                            'method': data['method'],
11319323693SBrad Bishop                            'outhash': data['outhash'],
11419323693SBrad Bishop                            'taskhash': data['taskhash'],
11519323693SBrad Bishop                            'unihash': unihash,
11619323693SBrad Bishop                            'created': datetime.now()
11719323693SBrad Bishop                            }
11819323693SBrad Bishop
11919323693SBrad Bishop                    for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
12019323693SBrad Bishop                        if k in data:
12119323693SBrad Bishop                            insert_data[k] = data[k]
12219323693SBrad Bishop
123*08902b01SBrad Bishop                    cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % (
12419323693SBrad Bishop                            ', '.join(sorted(insert_data.keys())),
12519323693SBrad Bishop                            ', '.join(':' + k for k in sorted(insert_data.keys()))),
12619323693SBrad Bishop                        insert_data)
12719323693SBrad Bishop
12819323693SBrad Bishop                    logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash)
12919323693SBrad Bishop
13019323693SBrad Bishop                    self.db.commit()
131*08902b01SBrad Bishop                    d = {'taskhash': data['taskhash'], 'method': data['method'], 'unihash': unihash}
132*08902b01SBrad Bishop                else:
13319323693SBrad Bishop                    d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
13419323693SBrad Bishop
13519323693SBrad Bishop                self.send_response(200)
13619323693SBrad Bishop                self.send_header('Content-Type', 'application/json; charset=utf-8')
13719323693SBrad Bishop                self.end_headers()
13819323693SBrad Bishop                self.wfile.write(json.dumps(d).encode('utf-8'))
13919323693SBrad Bishop        except:
14019323693SBrad Bishop            logger.exception('Error in POST')
14119323693SBrad Bishop            self.send_error(400, explain=traceback.format_exc())
14219323693SBrad Bishop            return
14319323693SBrad Bishop
144*08902b01SBrad Bishopclass ThreadedHTTPServer(HTTPServer):
145*08902b01SBrad Bishop    quit = False
146*08902b01SBrad Bishop
147*08902b01SBrad Bishop    def serve_forever(self):
148*08902b01SBrad Bishop        self.requestqueue = queue.Queue()
149*08902b01SBrad Bishop        self.handlerthread = threading.Thread(target=self.process_request_thread)
150*08902b01SBrad Bishop        self.handlerthread.daemon = False
151*08902b01SBrad Bishop
152*08902b01SBrad Bishop        self.handlerthread.start()
153*08902b01SBrad Bishop
154*08902b01SBrad Bishop        signal.signal(signal.SIGTERM, self.sigterm_exception)
155*08902b01SBrad Bishop        super().serve_forever()
156*08902b01SBrad Bishop        os._exit(0)
157*08902b01SBrad Bishop
158*08902b01SBrad Bishop    def sigterm_exception(self, signum, stackframe):
159*08902b01SBrad Bishop        self.server_close()
160*08902b01SBrad Bishop        os._exit(0)
161*08902b01SBrad Bishop
162*08902b01SBrad Bishop    def server_bind(self):
163*08902b01SBrad Bishop        HTTPServer.server_bind(self)
164*08902b01SBrad Bishop        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
165*08902b01SBrad Bishop
166*08902b01SBrad Bishop    def process_request_thread(self):
167*08902b01SBrad Bishop        while not self.quit:
168*08902b01SBrad Bishop            try:
169*08902b01SBrad Bishop                (request, client_address) = self.requestqueue.get(True)
170*08902b01SBrad Bishop            except queue.Empty:
171*08902b01SBrad Bishop                continue
172*08902b01SBrad Bishop            if request is None:
173*08902b01SBrad Bishop                continue
174*08902b01SBrad Bishop            try:
175*08902b01SBrad Bishop                self.finish_request(request, client_address)
176*08902b01SBrad Bishop            except Exception:
177*08902b01SBrad Bishop                self.handle_error(request, client_address)
178*08902b01SBrad Bishop            finally:
179*08902b01SBrad Bishop                self.shutdown_request(request)
180*08902b01SBrad Bishop        os._exit(0)
181*08902b01SBrad Bishop
182*08902b01SBrad Bishop    def process_request(self, request, client_address):
183*08902b01SBrad Bishop        self.requestqueue.put((request, client_address))
184*08902b01SBrad Bishop
185*08902b01SBrad Bishop    def server_close(self):
186*08902b01SBrad Bishop        super().server_close()
187*08902b01SBrad Bishop        self.quit = True
188*08902b01SBrad Bishop        self.requestqueue.put((None, None))
189*08902b01SBrad Bishop        self.handlerthread.join()
190*08902b01SBrad Bishop
191*08902b01SBrad Bishopdef create_server(addr, dbname, prefix=''):
19219323693SBrad Bishop    class Handler(HashEquivalenceServer):
19319323693SBrad Bishop        pass
19419323693SBrad Bishop
195*08902b01SBrad Bishop    db = sqlite3.connect(dbname)
19619323693SBrad Bishop    db.row_factory = sqlite3.Row
19719323693SBrad Bishop
198*08902b01SBrad Bishop    Handler.prefix = prefix
199*08902b01SBrad Bishop    Handler.db = None
200*08902b01SBrad Bishop    Handler.dbname = dbname
201*08902b01SBrad Bishop
20219323693SBrad Bishop    with contextlib.closing(db.cursor()) as cursor:
20319323693SBrad Bishop        cursor.execute('''
204*08902b01SBrad Bishop            CREATE TABLE IF NOT EXISTS tasks_v2 (
20519323693SBrad Bishop                id INTEGER PRIMARY KEY AUTOINCREMENT,
20619323693SBrad Bishop                method TEXT NOT NULL,
20719323693SBrad Bishop                outhash TEXT NOT NULL,
20819323693SBrad Bishop                taskhash TEXT NOT NULL,
20919323693SBrad Bishop                unihash TEXT NOT NULL,
21019323693SBrad Bishop                created DATETIME,
21119323693SBrad Bishop
21219323693SBrad Bishop                -- Optional fields
21319323693SBrad Bishop                owner TEXT,
21419323693SBrad Bishop                PN TEXT,
21519323693SBrad Bishop                PV TEXT,
21619323693SBrad Bishop                PR TEXT,
21719323693SBrad Bishop                task TEXT,
218*08902b01SBrad Bishop                outhash_siginfo TEXT,
219*08902b01SBrad Bishop
220*08902b01SBrad Bishop                UNIQUE(method, outhash, taskhash)
22119323693SBrad Bishop                )
22219323693SBrad Bishop            ''')
223*08902b01SBrad Bishop        cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)')
224*08902b01SBrad Bishop        cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)')
22519323693SBrad Bishop
226*08902b01SBrad Bishop    ret = ThreadedHTTPServer(addr, Handler)
227*08902b01SBrad Bishop
228*08902b01SBrad Bishop    logger.info('Starting server on %s\n', ret.server_port)
229*08902b01SBrad Bishop
230*08902b01SBrad Bishop    return ret
231