xref: /openbmc/openbmc/poky/bitbake/lib/prserv/serv.py (revision 44b3caf2)
1#
2# Copyright BitBake Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import os,sys,logging
8import signal, time
9import socket
10import io
11import sqlite3
12import prserv
13import prserv.db
14import errno
15import bb.asyncrpc
16
17logger = logging.getLogger("BitBake.PRserv")
18
19PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
20singleton = None
21
22class PRServerClient(bb.asyncrpc.AsyncServerConnection):
23    def __init__(self, socket, server):
24        super().__init__(socket, "PRSERVICE", server.logger)
25        self.server = server
26
27        self.handlers.update({
28            "get-pr": self.handle_get_pr,
29            "test-pr": self.handle_test_pr,
30            "test-package": self.handle_test_package,
31            "max-package-pr": self.handle_max_package_pr,
32            "import-one": self.handle_import_one,
33            "export": self.handle_export,
34            "is-readonly": self.handle_is_readonly,
35        })
36
37    def validate_proto_version(self):
38        return (self.proto_version == (1, 0))
39
40    async def dispatch_message(self, msg):
41        try:
42            return await super().dispatch_message(msg)
43        except:
44            self.server.table.sync()
45            raise
46        else:
47            self.server.table.sync_if_dirty()
48
49    async def handle_test_pr(self, request):
50        '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value'''
51        version = request["version"]
52        pkgarch = request["pkgarch"]
53        checksum = request["checksum"]
54
55        value = self.server.table.find_value(version, pkgarch, checksum)
56        return {"value": value}
57
58    async def handle_test_package(self, request):
59        '''Tells whether there are entries for (version, pkgarch) in the db. Returns True or False'''
60        version = request["version"]
61        pkgarch = request["pkgarch"]
62
63        value = self.server.table.test_package(version, pkgarch)
64        return {"value": value}
65
66    async def handle_max_package_pr(self, request):
67        '''Finds the greatest PR value for (version, pkgarch) in the db. Returns None if no entry was found'''
68        version = request["version"]
69        pkgarch = request["pkgarch"]
70
71        value = self.server.table.find_max_value(version, pkgarch)
72        return {"value": value}
73
74    async def handle_get_pr(self, request):
75        version = request["version"]
76        pkgarch = request["pkgarch"]
77        checksum = request["checksum"]
78
79        response = None
80        try:
81            value = self.server.table.get_value(version, pkgarch, checksum)
82            response = {"value": value}
83        except prserv.NotFoundError:
84            self.logger.error("failure storing value in database for (%s, %s)",version, checksum)
85
86        return response
87
88    async def handle_import_one(self, request):
89        response = None
90        if not self.server.read_only:
91            version = request["version"]
92            pkgarch = request["pkgarch"]
93            checksum = request["checksum"]
94            value = request["value"]
95
96            value = self.server.table.importone(version, pkgarch, checksum, value)
97            if value is not None:
98                response = {"value": value}
99
100        return response
101
102    async def handle_export(self, request):
103        version = request["version"]
104        pkgarch = request["pkgarch"]
105        checksum = request["checksum"]
106        colinfo = request["colinfo"]
107
108        try:
109            (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo)
110        except sqlite3.Error as exc:
111            self.logger.error(str(exc))
112            metainfo = datainfo = None
113
114        return {"metainfo": metainfo, "datainfo": datainfo}
115
116    async def handle_is_readonly(self, request):
117        return {"readonly": self.server.read_only}
118
119class PRServer(bb.asyncrpc.AsyncServer):
120    def __init__(self, dbfile, read_only=False):
121        super().__init__(logger)
122        self.dbfile = dbfile
123        self.table = None
124        self.read_only = read_only
125
126    def accept_client(self, socket):
127        return PRServerClient(socket, self)
128
129    def start(self):
130        tasks = super().start()
131        self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
132        self.table = self.db["PRMAIN"]
133
134        self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
135                     (self.dbfile, self.address, str(os.getpid())))
136
137        return tasks
138
139    async def stop(self):
140        self.table.sync_if_dirty()
141        self.db.disconnect()
142        await super().stop()
143
144    def signal_handler(self):
145        super().signal_handler()
146        if self.table:
147            self.table.sync()
148
149class PRServSingleton(object):
150    def __init__(self, dbfile, logfile, host, port):
151        self.dbfile = dbfile
152        self.logfile = logfile
153        self.host = host
154        self.port = port
155
156    def start(self):
157        self.prserv = PRServer(self.dbfile)
158        self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
159        self.process = self.prserv.serve_as_process(log_level=logging.WARNING)
160
161        if not self.prserv.address:
162            raise PRServiceConfigError
163        if not self.port:
164            self.port = int(self.prserv.address.rsplit(":", 1)[1])
165
166def run_as_daemon(func, pidfile, logfile):
167    """
168    See Advanced Programming in the UNIX, Sec 13.3
169    """
170    try:
171        pid = os.fork()
172        if pid > 0:
173            os.waitpid(pid, 0)
174            #parent return instead of exit to give control
175            return pid
176    except OSError as e:
177        raise Exception("%s [%d]" % (e.strerror, e.errno))
178
179    os.setsid()
180    """
181    fork again to make sure the daemon is not session leader,
182    which prevents it from acquiring controlling terminal
183    """
184    try:
185        pid = os.fork()
186        if pid > 0: #parent
187            os._exit(0)
188    except OSError as e:
189        raise Exception("%s [%d]" % (e.strerror, e.errno))
190
191    os.chdir("/")
192
193    sys.stdout.flush()
194    sys.stderr.flush()
195
196    # We could be called from a python thread with io.StringIO as
197    # stdout/stderr or it could be 'real' unix fd forking where we need
198    # to physically close the fds to prevent the program launching us from
199    # potentially hanging on a pipe. Handle both cases.
200    si = open("/dev/null", "r")
201    try:
202        os.dup2(si.fileno(), sys.stdin.fileno())
203    except (AttributeError, io.UnsupportedOperation):
204        sys.stdin = si
205    so = open(logfile, "a+")
206    try:
207        os.dup2(so.fileno(), sys.stdout.fileno())
208    except (AttributeError, io.UnsupportedOperation):
209        sys.stdout = so
210    try:
211        os.dup2(so.fileno(), sys.stderr.fileno())
212    except (AttributeError, io.UnsupportedOperation):
213        sys.stderr = so
214
215    # Clear out all log handlers prior to the fork() to avoid calling
216    # event handlers not part of the PRserver
217    for logger_iter in logging.Logger.manager.loggerDict.keys():
218        logging.getLogger(logger_iter).handlers = []
219
220    # Ensure logging makes it to the logfile
221    streamhandler = logging.StreamHandler()
222    streamhandler.setLevel(logging.DEBUG)
223    formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
224    streamhandler.setFormatter(formatter)
225    logger.addHandler(streamhandler)
226
227    # write pidfile
228    pid = str(os.getpid())
229    with open(pidfile, "w") as pf:
230        pf.write("%s\n" % pid)
231
232    func()
233    os.remove(pidfile)
234    os._exit(0)
235
236def start_daemon(dbfile, host, port, logfile, read_only=False):
237    ip = socket.gethostbyname(host)
238    pidfile = PIDPREFIX % (ip, port)
239    try:
240        with open(pidfile) as pf:
241            pid = int(pf.readline().strip())
242    except IOError:
243        pid = None
244
245    if pid:
246        sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
247                            % pidfile)
248        return 1
249
250    dbfile = os.path.abspath(dbfile)
251    def daemon_main():
252        server = PRServer(dbfile, read_only=read_only)
253        server.start_tcp_server(ip, port)
254        server.serve_forever()
255
256    run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
257    return 0
258
259def stop_daemon(host, port):
260    import glob
261    ip = socket.gethostbyname(host)
262    pidfile = PIDPREFIX % (ip, port)
263    try:
264        with open(pidfile) as pf:
265            pid = int(pf.readline().strip())
266    except IOError:
267        pid = None
268
269    if not pid:
270        # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
271        # so at least advise the user which ports the corresponding server is listening
272        ports = []
273        portstr = ""
274        for pf in glob.glob(PIDPREFIX % (ip, "*")):
275            bn = os.path.basename(pf)
276            root, _ = os.path.splitext(bn)
277            ports.append(root.split("_")[-1])
278        if len(ports):
279            portstr = "Wrong port? Other ports listening at %s: %s" % (host, " ".join(ports))
280
281        sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
282                         % (pidfile, portstr))
283        return 1
284
285    try:
286        if is_running(pid):
287            print("Sending SIGTERM to pr-server.")
288            os.kill(pid, signal.SIGTERM)
289            time.sleep(0.1)
290
291        try:
292            os.remove(pidfile)
293        except FileNotFoundError:
294            # The PID file might have been removed by the exiting process
295            pass
296
297    except OSError as e:
298        err = str(e)
299        if err.find("No such process") <= 0:
300            raise e
301
302    return 0
303
304def is_running(pid):
305    try:
306        os.kill(pid, 0)
307    except OSError as err:
308        if err.errno == errno.ESRCH:
309            return False
310    return True
311
312def is_local_special(host, port):
313    if (host == "localhost" or host == "127.0.0.1") and not port:
314        return True
315    else:
316        return False
317
318class PRServiceConfigError(Exception):
319    pass
320
321def auto_start(d):
322    global singleton
323
324    host_params = list(filter(None, (d.getVar("PRSERV_HOST") or "").split(":")))
325    if not host_params:
326        # Shutdown any existing PR Server
327        auto_shutdown()
328        return None
329
330    if len(host_params) != 2:
331        # Shutdown any existing PR Server
332        auto_shutdown()
333        logger.critical("\n".join(["PRSERV_HOST: incorrect format",
334                'Usage: PRSERV_HOST = "<hostname>:<port>"']))
335        raise PRServiceConfigError
336
337    host = host_params[0].strip().lower()
338    port = int(host_params[1])
339    if is_local_special(host, port):
340        import bb.utils
341        cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
342        if not cachedir:
343            logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
344            raise PRServiceConfigError
345        dbfile = os.path.join(cachedir, "prserv.sqlite3")
346        logfile = os.path.join(cachedir, "prserv.log")
347        if singleton:
348            if singleton.dbfile != dbfile:
349               # Shutdown any existing PR Server as doesn't match config
350               auto_shutdown()
351        if not singleton:
352            bb.utils.mkdirhier(cachedir)
353            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port)
354            singleton.start()
355    if singleton:
356        host = singleton.host
357        port = singleton.port
358
359    try:
360        ping(host, port)
361        return str(host) + ":" + str(port)
362
363    except Exception:
364        logger.critical("PRservice %s:%d not available" % (host, port))
365        raise PRServiceConfigError
366
367def auto_shutdown():
368    global singleton
369    if singleton and singleton.process:
370        singleton.process.terminate()
371        singleton.process.join()
372        singleton = None
373
374def ping(host, port):
375    from . import client
376
377    with client.PRClient() as conn:
378        conn.connect_tcp(host, port)
379        return conn.ping()
380
381def connect(host, port):
382    from . import client
383
384    global singleton
385
386    if host.strip().lower() == "localhost" and not port:
387        host = "localhost"
388        port = singleton.port
389
390    conn = client.PRClient()
391    conn.connect_tcp(host, port)
392    return conn
393