1*08902b01SBrad Bishop# Copyright (C) 2018-2019 Garmin Ltd. 219323693SBrad Bishop# 3c342db35SBrad Bishop# SPDX-License-Identifier: GPL-2.0-only 419323693SBrad Bishop# 519323693SBrad Bishop 619323693SBrad Bishopfrom http.server import BaseHTTPRequestHandler, HTTPServer 719323693SBrad Bishopimport contextlib 819323693SBrad Bishopimport urllib.parse 919323693SBrad Bishopimport sqlite3 1019323693SBrad Bishopimport json 1119323693SBrad Bishopimport traceback 1219323693SBrad Bishopimport logging 13*08902b01SBrad Bishopimport socketserver 14*08902b01SBrad Bishopimport queue 15*08902b01SBrad Bishopimport threading 16*08902b01SBrad Bishopimport signal 17*08902b01SBrad Bishopimport socket 18*08902b01SBrad Bishopimport struct 1919323693SBrad Bishopfrom datetime import datetime 2019323693SBrad Bishop 2119323693SBrad Bishoplogger = logging.getLogger('hashserv') 2219323693SBrad Bishop 2319323693SBrad Bishopclass HashEquivalenceServer(BaseHTTPRequestHandler): 2419323693SBrad Bishop def log_message(self, f, *args): 2519323693SBrad Bishop logger.debug(f, *args) 2619323693SBrad Bishop 27*08902b01SBrad Bishop def opendb(self): 28*08902b01SBrad Bishop self.db = sqlite3.connect(self.dbname) 29*08902b01SBrad Bishop self.db.row_factory = sqlite3.Row 30*08902b01SBrad Bishop self.db.execute("PRAGMA synchronous = OFF;") 31*08902b01SBrad Bishop self.db.execute("PRAGMA journal_mode = MEMORY;") 32*08902b01SBrad Bishop 3319323693SBrad Bishop def do_GET(self): 3419323693SBrad Bishop try: 35*08902b01SBrad Bishop if not self.db: 36*08902b01SBrad Bishop self.opendb() 37*08902b01SBrad Bishop 3819323693SBrad Bishop p = urllib.parse.urlparse(self.path) 3919323693SBrad Bishop 4019323693SBrad Bishop if p.path != self.prefix + '/v1/equivalent': 4119323693SBrad Bishop self.send_error(404) 4219323693SBrad Bishop return 4319323693SBrad Bishop 4419323693SBrad Bishop query = urllib.parse.parse_qs(p.query, strict_parsing=True) 4519323693SBrad Bishop method = query['method'][0] 4619323693SBrad Bishop taskhash = query['taskhash'][0] 4719323693SBrad Bishop 4819323693SBrad Bishop d = None 4919323693SBrad Bishop with contextlib.closing(self.db.cursor()) as cursor: 50*08902b01SBrad Bishop cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1', 5119323693SBrad Bishop {'method': method, 'taskhash': taskhash}) 5219323693SBrad Bishop 5319323693SBrad Bishop row = cursor.fetchone() 5419323693SBrad Bishop 5519323693SBrad Bishop if row is not None: 5619323693SBrad Bishop logger.debug('Found equivalent task %s', row['taskhash']) 5719323693SBrad Bishop d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} 5819323693SBrad Bishop 5919323693SBrad Bishop self.send_response(200) 6019323693SBrad Bishop self.send_header('Content-Type', 'application/json; charset=utf-8') 6119323693SBrad Bishop self.end_headers() 6219323693SBrad Bishop self.wfile.write(json.dumps(d).encode('utf-8')) 6319323693SBrad Bishop except: 6419323693SBrad Bishop logger.exception('Error in GET') 6519323693SBrad Bishop self.send_error(400, explain=traceback.format_exc()) 6619323693SBrad Bishop return 6719323693SBrad Bishop 6819323693SBrad Bishop def do_POST(self): 6919323693SBrad Bishop try: 70*08902b01SBrad Bishop if not self.db: 71*08902b01SBrad Bishop self.opendb() 72*08902b01SBrad Bishop 7319323693SBrad Bishop p = urllib.parse.urlparse(self.path) 7419323693SBrad Bishop 7519323693SBrad Bishop if p.path != self.prefix + '/v1/equivalent': 7619323693SBrad Bishop self.send_error(404) 7719323693SBrad Bishop return 7819323693SBrad Bishop 7919323693SBrad Bishop length = int(self.headers['content-length']) 8019323693SBrad Bishop data = json.loads(self.rfile.read(length).decode('utf-8')) 8119323693SBrad Bishop 8219323693SBrad Bishop with contextlib.closing(self.db.cursor()) as cursor: 8319323693SBrad Bishop cursor.execute(''' 84*08902b01SBrad Bishop -- Find tasks with a matching outhash (that is, tasks that 85*08902b01SBrad Bishop -- are equivalent) 86*08902b01SBrad Bishop SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash 87*08902b01SBrad Bishop 88*08902b01SBrad Bishop -- If there is an exact match on the taskhash, return it. 89*08902b01SBrad Bishop -- Otherwise return the oldest matching outhash of any 90*08902b01SBrad Bishop -- taskhash 9119323693SBrad Bishop ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END, 9219323693SBrad Bishop created ASC 93*08902b01SBrad Bishop 94*08902b01SBrad Bishop -- Only return one row 9519323693SBrad Bishop LIMIT 1 9619323693SBrad Bishop ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')}) 9719323693SBrad Bishop 9819323693SBrad Bishop row = cursor.fetchone() 9919323693SBrad Bishop 100*08902b01SBrad Bishop # If no matching outhash was found, or one *was* found but it 101*08902b01SBrad Bishop # wasn't an exact match on the taskhash, a new entry for this 102*08902b01SBrad Bishop # taskhash should be added 10319323693SBrad Bishop if row is None or row['taskhash'] != data['taskhash']: 104*08902b01SBrad Bishop # If a row matching the outhash was found, the unihash for 105*08902b01SBrad Bishop # the new taskhash should be the same as that one. 106*08902b01SBrad Bishop # Otherwise the caller provided unihash is used. 10719323693SBrad Bishop unihash = data['unihash'] 10819323693SBrad Bishop if row is not None: 10919323693SBrad Bishop unihash = row['unihash'] 11019323693SBrad Bishop 11119323693SBrad Bishop insert_data = { 11219323693SBrad Bishop 'method': data['method'], 11319323693SBrad Bishop 'outhash': data['outhash'], 11419323693SBrad Bishop 'taskhash': data['taskhash'], 11519323693SBrad Bishop 'unihash': unihash, 11619323693SBrad Bishop 'created': datetime.now() 11719323693SBrad Bishop } 11819323693SBrad Bishop 11919323693SBrad Bishop for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): 12019323693SBrad Bishop if k in data: 12119323693SBrad Bishop insert_data[k] = data[k] 12219323693SBrad Bishop 123*08902b01SBrad Bishop cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % ( 12419323693SBrad Bishop ', '.join(sorted(insert_data.keys())), 12519323693SBrad Bishop ', '.join(':' + k for k in sorted(insert_data.keys()))), 12619323693SBrad Bishop insert_data) 12719323693SBrad Bishop 12819323693SBrad Bishop logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash) 12919323693SBrad Bishop 13019323693SBrad Bishop self.db.commit() 131*08902b01SBrad Bishop d = {'taskhash': data['taskhash'], 'method': data['method'], 'unihash': unihash} 132*08902b01SBrad Bishop else: 13319323693SBrad Bishop d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} 13419323693SBrad Bishop 13519323693SBrad Bishop self.send_response(200) 13619323693SBrad Bishop self.send_header('Content-Type', 'application/json; charset=utf-8') 13719323693SBrad Bishop self.end_headers() 13819323693SBrad Bishop self.wfile.write(json.dumps(d).encode('utf-8')) 13919323693SBrad Bishop except: 14019323693SBrad Bishop logger.exception('Error in POST') 14119323693SBrad Bishop self.send_error(400, explain=traceback.format_exc()) 14219323693SBrad Bishop return 14319323693SBrad Bishop 144*08902b01SBrad Bishopclass ThreadedHTTPServer(HTTPServer): 145*08902b01SBrad Bishop quit = False 146*08902b01SBrad Bishop 147*08902b01SBrad Bishop def serve_forever(self): 148*08902b01SBrad Bishop self.requestqueue = queue.Queue() 149*08902b01SBrad Bishop self.handlerthread = threading.Thread(target=self.process_request_thread) 150*08902b01SBrad Bishop self.handlerthread.daemon = False 151*08902b01SBrad Bishop 152*08902b01SBrad Bishop self.handlerthread.start() 153*08902b01SBrad Bishop 154*08902b01SBrad Bishop signal.signal(signal.SIGTERM, self.sigterm_exception) 155*08902b01SBrad Bishop super().serve_forever() 156*08902b01SBrad Bishop os._exit(0) 157*08902b01SBrad Bishop 158*08902b01SBrad Bishop def sigterm_exception(self, signum, stackframe): 159*08902b01SBrad Bishop self.server_close() 160*08902b01SBrad Bishop os._exit(0) 161*08902b01SBrad Bishop 162*08902b01SBrad Bishop def server_bind(self): 163*08902b01SBrad Bishop HTTPServer.server_bind(self) 164*08902b01SBrad Bishop self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0)) 165*08902b01SBrad Bishop 166*08902b01SBrad Bishop def process_request_thread(self): 167*08902b01SBrad Bishop while not self.quit: 168*08902b01SBrad Bishop try: 169*08902b01SBrad Bishop (request, client_address) = self.requestqueue.get(True) 170*08902b01SBrad Bishop except queue.Empty: 171*08902b01SBrad Bishop continue 172*08902b01SBrad Bishop if request is None: 173*08902b01SBrad Bishop continue 174*08902b01SBrad Bishop try: 175*08902b01SBrad Bishop self.finish_request(request, client_address) 176*08902b01SBrad Bishop except Exception: 177*08902b01SBrad Bishop self.handle_error(request, client_address) 178*08902b01SBrad Bishop finally: 179*08902b01SBrad Bishop self.shutdown_request(request) 180*08902b01SBrad Bishop os._exit(0) 181*08902b01SBrad Bishop 182*08902b01SBrad Bishop def process_request(self, request, client_address): 183*08902b01SBrad Bishop self.requestqueue.put((request, client_address)) 184*08902b01SBrad Bishop 185*08902b01SBrad Bishop def server_close(self): 186*08902b01SBrad Bishop super().server_close() 187*08902b01SBrad Bishop self.quit = True 188*08902b01SBrad Bishop self.requestqueue.put((None, None)) 189*08902b01SBrad Bishop self.handlerthread.join() 190*08902b01SBrad Bishop 191*08902b01SBrad Bishopdef create_server(addr, dbname, prefix=''): 19219323693SBrad Bishop class Handler(HashEquivalenceServer): 19319323693SBrad Bishop pass 19419323693SBrad Bishop 195*08902b01SBrad Bishop db = sqlite3.connect(dbname) 19619323693SBrad Bishop db.row_factory = sqlite3.Row 19719323693SBrad Bishop 198*08902b01SBrad Bishop Handler.prefix = prefix 199*08902b01SBrad Bishop Handler.db = None 200*08902b01SBrad Bishop Handler.dbname = dbname 201*08902b01SBrad Bishop 20219323693SBrad Bishop with contextlib.closing(db.cursor()) as cursor: 20319323693SBrad Bishop cursor.execute(''' 204*08902b01SBrad Bishop CREATE TABLE IF NOT EXISTS tasks_v2 ( 20519323693SBrad Bishop id INTEGER PRIMARY KEY AUTOINCREMENT, 20619323693SBrad Bishop method TEXT NOT NULL, 20719323693SBrad Bishop outhash TEXT NOT NULL, 20819323693SBrad Bishop taskhash TEXT NOT NULL, 20919323693SBrad Bishop unihash TEXT NOT NULL, 21019323693SBrad Bishop created DATETIME, 21119323693SBrad Bishop 21219323693SBrad Bishop -- Optional fields 21319323693SBrad Bishop owner TEXT, 21419323693SBrad Bishop PN TEXT, 21519323693SBrad Bishop PV TEXT, 21619323693SBrad Bishop PR TEXT, 21719323693SBrad Bishop task TEXT, 218*08902b01SBrad Bishop outhash_siginfo TEXT, 219*08902b01SBrad Bishop 220*08902b01SBrad Bishop UNIQUE(method, outhash, taskhash) 22119323693SBrad Bishop ) 22219323693SBrad Bishop ''') 223*08902b01SBrad Bishop cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)') 224*08902b01SBrad Bishop cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)') 22519323693SBrad Bishop 226*08902b01SBrad Bishop ret = ThreadedHTTPServer(addr, Handler) 227*08902b01SBrad Bishop 228*08902b01SBrad Bishop logger.info('Starting server on %s\n', ret.server_port) 229*08902b01SBrad Bishop 230*08902b01SBrad Bishop return ret 231