108902b01SBrad Bishop# Copyright (C) 2018-2019 Garmin Ltd. 219323693SBrad Bishop# 3c342db35SBrad Bishop# SPDX-License-Identifier: GPL-2.0-only 419323693SBrad Bishop# 519323693SBrad Bishop 6a34c030eSBrad Bishopfrom contextlib import closing 7a34c030eSBrad Bishopimport re 819323693SBrad Bishopimport sqlite3 9*475cb72dSAndrew Geisslerimport itertools 10*475cb72dSAndrew Geisslerimport json 1119323693SBrad Bishop 12a34c030eSBrad BishopUNIX_PREFIX = "unix://" 1319323693SBrad Bishop 14a34c030eSBrad BishopADDR_TYPE_UNIX = 0 15a34c030eSBrad BishopADDR_TYPE_TCP = 1 1619323693SBrad Bishop 17*475cb72dSAndrew Geissler# The Python async server defaults to a 64K receive buffer, so we hardcode our 18*475cb72dSAndrew Geissler# maximum chunk size. It would be better if the client and server reported to 19*475cb72dSAndrew Geissler# each other what the maximum chunk sizes were, but that will slow down the 20*475cb72dSAndrew Geissler# connection setup with a round trip delay so I'd rather not do that unless it 21*475cb72dSAndrew Geissler# is necessary 22*475cb72dSAndrew GeisslerDEFAULT_MAX_CHUNK = 32 * 1024 2308902b01SBrad Bishop 24a34c030eSBrad Bishopdef setup_database(database, sync=True): 25a34c030eSBrad Bishop db = sqlite3.connect(database) 2619323693SBrad Bishop db.row_factory = sqlite3.Row 2719323693SBrad Bishop 28a34c030eSBrad Bishop with closing(db.cursor()) as cursor: 2919323693SBrad Bishop cursor.execute(''' 3008902b01SBrad Bishop CREATE TABLE IF NOT EXISTS tasks_v2 ( 3119323693SBrad Bishop id INTEGER PRIMARY KEY AUTOINCREMENT, 3219323693SBrad Bishop method TEXT NOT NULL, 3319323693SBrad Bishop outhash TEXT NOT NULL, 3419323693SBrad Bishop taskhash TEXT NOT NULL, 3519323693SBrad Bishop unihash TEXT NOT NULL, 3619323693SBrad Bishop created DATETIME, 3719323693SBrad Bishop 3819323693SBrad Bishop -- Optional fields 3919323693SBrad Bishop owner TEXT, 4019323693SBrad Bishop PN TEXT, 4119323693SBrad Bishop PV TEXT, 4219323693SBrad Bishop PR TEXT, 4319323693SBrad Bishop task TEXT, 4408902b01SBrad Bishop outhash_siginfo TEXT, 4508902b01SBrad Bishop 4608902b01SBrad Bishop UNIQUE(method, outhash, taskhash) 4719323693SBrad Bishop ) 4819323693SBrad Bishop ''') 49a34c030eSBrad Bishop cursor.execute('PRAGMA journal_mode = WAL') 50a34c030eSBrad Bishop cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) 5119323693SBrad Bishop 52a34c030eSBrad Bishop # Drop old indexes 53a34c030eSBrad Bishop cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') 54a34c030eSBrad Bishop cursor.execute('DROP INDEX IF EXISTS outhash_lookup') 5508902b01SBrad Bishop 56a34c030eSBrad Bishop # Create new indexes 57a34c030eSBrad Bishop cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)') 58a34c030eSBrad Bishop cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)') 5908902b01SBrad Bishop 60a34c030eSBrad Bishop return db 61a34c030eSBrad Bishop 62a34c030eSBrad Bishop 63a34c030eSBrad Bishopdef parse_address(addr): 64a34c030eSBrad Bishop if addr.startswith(UNIX_PREFIX): 65a34c030eSBrad Bishop return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) 66a34c030eSBrad Bishop else: 67a34c030eSBrad Bishop m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr) 68a34c030eSBrad Bishop if m is not None: 69a34c030eSBrad Bishop host = m.group('host') 70a34c030eSBrad Bishop port = m.group('port') 71a34c030eSBrad Bishop else: 72a34c030eSBrad Bishop host, port = addr.split(':') 73a34c030eSBrad Bishop 74a34c030eSBrad Bishop return (ADDR_TYPE_TCP, (host, int(port))) 75a34c030eSBrad Bishop 76a34c030eSBrad Bishop 77*475cb72dSAndrew Geisslerdef chunkify(msg, max_chunk): 78*475cb72dSAndrew Geissler if len(msg) < max_chunk - 1: 79*475cb72dSAndrew Geissler yield ''.join((msg, "\n")) 80*475cb72dSAndrew Geissler else: 81*475cb72dSAndrew Geissler yield ''.join((json.dumps({ 82*475cb72dSAndrew Geissler 'chunk-stream': None 83*475cb72dSAndrew Geissler }), "\n")) 84*475cb72dSAndrew Geissler 85*475cb72dSAndrew Geissler args = [iter(msg)] * (max_chunk - 1) 86*475cb72dSAndrew Geissler for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): 87*475cb72dSAndrew Geissler yield ''.join(itertools.chain(m, "\n")) 88*475cb72dSAndrew Geissler yield "\n" 89*475cb72dSAndrew Geissler 90*475cb72dSAndrew Geissler 91a34c030eSBrad Bishopdef create_server(addr, dbname, *, sync=True): 92a34c030eSBrad Bishop from . import server 93a34c030eSBrad Bishop db = setup_database(dbname, sync=sync) 94a34c030eSBrad Bishop s = server.Server(db) 95a34c030eSBrad Bishop 96a34c030eSBrad Bishop (typ, a) = parse_address(addr) 97a34c030eSBrad Bishop if typ == ADDR_TYPE_UNIX: 98a34c030eSBrad Bishop s.start_unix_server(*a) 99a34c030eSBrad Bishop else: 100a34c030eSBrad Bishop s.start_tcp_server(*a) 101a34c030eSBrad Bishop 102a34c030eSBrad Bishop return s 103a34c030eSBrad Bishop 104a34c030eSBrad Bishop 105a34c030eSBrad Bishopdef create_client(addr): 106a34c030eSBrad Bishop from . import client 107a34c030eSBrad Bishop c = client.Client() 108a34c030eSBrad Bishop 109a34c030eSBrad Bishop (typ, a) = parse_address(addr) 110a34c030eSBrad Bishop if typ == ADDR_TYPE_UNIX: 111a34c030eSBrad Bishop c.connect_unix(*a) 112a34c030eSBrad Bishop else: 113a34c030eSBrad Bishop c.connect_tcp(*a) 114a34c030eSBrad Bishop 115a34c030eSBrad Bishop return c 116