108902b01SBrad Bishop# Copyright (C) 2018-2019 Garmin Ltd.
219323693SBrad Bishop#
3c342db35SBrad Bishop# SPDX-License-Identifier: GPL-2.0-only
419323693SBrad Bishop#
519323693SBrad Bishop
6a34c030eSBrad Bishopfrom contextlib import closing
7a34c030eSBrad Bishopimport re
819323693SBrad Bishopimport sqlite3
9*475cb72dSAndrew Geisslerimport itertools
10*475cb72dSAndrew Geisslerimport json
1119323693SBrad Bishop
12a34c030eSBrad BishopUNIX_PREFIX = "unix://"
1319323693SBrad Bishop
14a34c030eSBrad BishopADDR_TYPE_UNIX = 0
15a34c030eSBrad BishopADDR_TYPE_TCP = 1
1619323693SBrad Bishop
17*475cb72dSAndrew Geissler# The Python async server defaults to a 64K receive buffer, so we hardcode our
18*475cb72dSAndrew Geissler# maximum chunk size. It would be better if the client and server reported to
19*475cb72dSAndrew Geissler# each other what the maximum chunk sizes were, but that will slow down the
20*475cb72dSAndrew Geissler# connection setup with a round trip delay so I'd rather not do that unless it
21*475cb72dSAndrew Geissler# is necessary
22*475cb72dSAndrew GeisslerDEFAULT_MAX_CHUNK = 32 * 1024
2308902b01SBrad Bishop
24a34c030eSBrad Bishopdef setup_database(database, sync=True):
25a34c030eSBrad Bishop    db = sqlite3.connect(database)
2619323693SBrad Bishop    db.row_factory = sqlite3.Row
2719323693SBrad Bishop
28a34c030eSBrad Bishop    with closing(db.cursor()) as cursor:
2919323693SBrad Bishop        cursor.execute('''
3008902b01SBrad Bishop            CREATE TABLE IF NOT EXISTS tasks_v2 (
3119323693SBrad Bishop                id INTEGER PRIMARY KEY AUTOINCREMENT,
3219323693SBrad Bishop                method TEXT NOT NULL,
3319323693SBrad Bishop                outhash TEXT NOT NULL,
3419323693SBrad Bishop                taskhash TEXT NOT NULL,
3519323693SBrad Bishop                unihash TEXT NOT NULL,
3619323693SBrad Bishop                created DATETIME,
3719323693SBrad Bishop
3819323693SBrad Bishop                -- Optional fields
3919323693SBrad Bishop                owner TEXT,
4019323693SBrad Bishop                PN TEXT,
4119323693SBrad Bishop                PV TEXT,
4219323693SBrad Bishop                PR TEXT,
4319323693SBrad Bishop                task TEXT,
4408902b01SBrad Bishop                outhash_siginfo TEXT,
4508902b01SBrad Bishop
4608902b01SBrad Bishop                UNIQUE(method, outhash, taskhash)
4719323693SBrad Bishop                )
4819323693SBrad Bishop            ''')
49a34c030eSBrad Bishop        cursor.execute('PRAGMA journal_mode = WAL')
50a34c030eSBrad Bishop        cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
5119323693SBrad Bishop
52a34c030eSBrad Bishop        # Drop old indexes
53a34c030eSBrad Bishop        cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
54a34c030eSBrad Bishop        cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
5508902b01SBrad Bishop
56a34c030eSBrad Bishop        # Create new indexes
57a34c030eSBrad Bishop        cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)')
58a34c030eSBrad Bishop        cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)')
5908902b01SBrad Bishop
60a34c030eSBrad Bishop    return db
61a34c030eSBrad Bishop
62a34c030eSBrad Bishop
63a34c030eSBrad Bishopdef parse_address(addr):
64a34c030eSBrad Bishop    if addr.startswith(UNIX_PREFIX):
65a34c030eSBrad Bishop        return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
66a34c030eSBrad Bishop    else:
67a34c030eSBrad Bishop        m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
68a34c030eSBrad Bishop        if m is not None:
69a34c030eSBrad Bishop            host = m.group('host')
70a34c030eSBrad Bishop            port = m.group('port')
71a34c030eSBrad Bishop        else:
72a34c030eSBrad Bishop            host, port = addr.split(':')
73a34c030eSBrad Bishop
74a34c030eSBrad Bishop        return (ADDR_TYPE_TCP, (host, int(port)))
75a34c030eSBrad Bishop
76a34c030eSBrad Bishop
77*475cb72dSAndrew Geisslerdef chunkify(msg, max_chunk):
78*475cb72dSAndrew Geissler    if len(msg) < max_chunk - 1:
79*475cb72dSAndrew Geissler        yield ''.join((msg, "\n"))
80*475cb72dSAndrew Geissler    else:
81*475cb72dSAndrew Geissler        yield ''.join((json.dumps({
82*475cb72dSAndrew Geissler                'chunk-stream': None
83*475cb72dSAndrew Geissler            }), "\n"))
84*475cb72dSAndrew Geissler
85*475cb72dSAndrew Geissler        args = [iter(msg)] * (max_chunk - 1)
86*475cb72dSAndrew Geissler        for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
87*475cb72dSAndrew Geissler            yield ''.join(itertools.chain(m, "\n"))
88*475cb72dSAndrew Geissler        yield "\n"
89*475cb72dSAndrew Geissler
90*475cb72dSAndrew Geissler
91a34c030eSBrad Bishopdef create_server(addr, dbname, *, sync=True):
92a34c030eSBrad Bishop    from . import server
93a34c030eSBrad Bishop    db = setup_database(dbname, sync=sync)
94a34c030eSBrad Bishop    s = server.Server(db)
95a34c030eSBrad Bishop
96a34c030eSBrad Bishop    (typ, a) = parse_address(addr)
97a34c030eSBrad Bishop    if typ == ADDR_TYPE_UNIX:
98a34c030eSBrad Bishop        s.start_unix_server(*a)
99a34c030eSBrad Bishop    else:
100a34c030eSBrad Bishop        s.start_tcp_server(*a)
101a34c030eSBrad Bishop
102a34c030eSBrad Bishop    return s
103a34c030eSBrad Bishop
104a34c030eSBrad Bishop
105a34c030eSBrad Bishopdef create_client(addr):
106a34c030eSBrad Bishop    from . import client
107a34c030eSBrad Bishop    c = client.Client()
108a34c030eSBrad Bishop
109a34c030eSBrad Bishop    (typ, a) = parse_address(addr)
110a34c030eSBrad Bishop    if typ == ADDR_TYPE_UNIX:
111a34c030eSBrad Bishop        c.connect_unix(*a)
112a34c030eSBrad Bishop    else:
113a34c030eSBrad Bishop        c.connect_tcp(*a)
114a34c030eSBrad Bishop
115a34c030eSBrad Bishop    return c
116