xref: /openbmc/openbmc/poky/bitbake/lib/prserv/serv.py (revision eb8dc403)
1import os,sys,logging
2import signal, time
3from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
4import threading
5import queue
6import socket
7import io
8import sqlite3
9import bb.server.xmlrpcclient
10import prserv
11import prserv.db
12import errno
13import select
14
15logger = logging.getLogger("BitBake.PRserv")
16
17if sys.hexversion < 0x020600F0:
18    print("Sorry, python 2.6 or later is required.")
19    sys.exit(1)
20
21class Handler(SimpleXMLRPCRequestHandler):
22    def _dispatch(self,method,params):
23        try:
24            value=self.server.funcs[method](*params)
25        except:
26            import traceback
27            traceback.print_exc()
28            raise
29        return value
30
31PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
32singleton = None
33
34
35class PRServer(SimpleXMLRPCServer):
36    def __init__(self, dbfile, logfile, interface, daemon=True):
37        ''' constructor '''
38        try:
39            SimpleXMLRPCServer.__init__(self, interface,
40                                        logRequests=False, allow_none=True)
41        except socket.error:
42            ip=socket.gethostbyname(interface[0])
43            port=interface[1]
44            msg="PR Server unable to bind to %s:%s\n" % (ip, port)
45            sys.stderr.write(msg)
46            raise PRServiceConfigError
47
48        self.dbfile=dbfile
49        self.daemon=daemon
50        self.logfile=logfile
51        self.working_thread=None
52        self.host, self.port = self.socket.getsockname()
53        self.pidfile=PIDPREFIX % (self.host, self.port)
54
55        self.register_function(self.getPR, "getPR")
56        self.register_function(self.quit, "quit")
57        self.register_function(self.ping, "ping")
58        self.register_function(self.export, "export")
59        self.register_function(self.dump_db, "dump_db")
60        self.register_function(self.importone, "importone")
61        self.register_introspection_functions()
62
63        self.quitpipein, self.quitpipeout = os.pipe()
64
65        self.requestqueue = queue.Queue()
66        self.handlerthread = threading.Thread(target = self.process_request_thread)
67        self.handlerthread.daemon = False
68
69    def process_request_thread(self):
70        """Same as in BaseServer but as a thread.
71
72        In addition, exception handling is done here.
73
74        """
75        iter_count = 1
76        # 60 iterations between syncs or sync if dirty every ~30 seconds
77        iterations_between_sync = 60
78
79        bb.utils.set_process_name("PRServ Handler")
80
81        while not self.quitflag:
82            try:
83                (request, client_address) = self.requestqueue.get(True, 30)
84            except queue.Empty:
85                self.table.sync_if_dirty()
86                continue
87            if request is None:
88                continue
89            try:
90                self.finish_request(request, client_address)
91                self.shutdown_request(request)
92                iter_count = (iter_count + 1) % iterations_between_sync
93                if iter_count == 0:
94                    self.table.sync_if_dirty()
95            except:
96                self.handle_error(request, client_address)
97                self.shutdown_request(request)
98                self.table.sync()
99            self.table.sync_if_dirty()
100
101    def sigint_handler(self, signum, stack):
102        if self.table:
103            self.table.sync()
104
105    def sigterm_handler(self, signum, stack):
106        if self.table:
107            self.table.sync()
108        self.quit()
109        self.requestqueue.put((None, None))
110
111    def process_request(self, request, client_address):
112        self.requestqueue.put((request, client_address))
113
114    def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
115        try:
116            return self.table.export(version, pkgarch, checksum, colinfo)
117        except sqlite3.Error as exc:
118            logger.error(str(exc))
119            return None
120
121    def dump_db(self):
122        """
123        Returns a script (string) that reconstructs the state of the
124        entire database at the time this function is called. The script
125        language is defined by the backing database engine, which is a
126        function of server configuration.
127        Returns None if the database engine does not support dumping to
128        script or if some other error is encountered in processing.
129        """
130        buff = io.StringIO()
131        try:
132            self.table.sync()
133            self.table.dump_db(buff)
134            return buff.getvalue()
135        except Exception as exc:
136            logger.error(str(exc))
137            return None
138        finally:
139            buff.close()
140
141    def importone(self, version, pkgarch, checksum, value):
142        return self.table.importone(version, pkgarch, checksum, value)
143
144    def ping(self):
145        return not self.quitflag
146
147    def getinfo(self):
148        return (self.host, self.port)
149
150    def getPR(self, version, pkgarch, checksum):
151        try:
152            return self.table.getValue(version, pkgarch, checksum)
153        except prserv.NotFoundError:
154            logger.error("can not find value for (%s, %s)",version, checksum)
155            return None
156        except sqlite3.Error as exc:
157            logger.error(str(exc))
158            return None
159
160    def quit(self):
161        self.quitflag=True
162        os.write(self.quitpipeout, b"q")
163        os.close(self.quitpipeout)
164        return
165
166    def work_forever(self,):
167        self.quitflag = False
168        # This timeout applies to the poll in TCPServer, we need the select
169        # below to wake on our quit pipe closing. We only ever call into handle_request
170        # if there is data there.
171        self.timeout = 0.01
172
173        bb.utils.set_process_name("PRServ")
174
175        # DB connection must be created after all forks
176        self.db = prserv.db.PRData(self.dbfile)
177        self.table = self.db["PRMAIN"]
178
179        logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
180                     (self.dbfile, self.host, self.port, str(os.getpid())))
181
182        self.handlerthread.start()
183        while not self.quitflag:
184            ready = select.select([self.fileno(), self.quitpipein], [], [], 30)
185            if self.quitflag:
186                break
187            if self.fileno() in ready[0]:
188                self.handle_request()
189        self.handlerthread.join()
190        self.db.disconnect()
191        logger.info("PRServer: stopping...")
192        self.server_close()
193        os.close(self.quitpipein)
194        return
195
196    def start(self):
197        if self.daemon:
198            pid = self.daemonize()
199        else:
200            pid = self.fork()
201            self.pid = pid
202
203        # Ensure both the parent sees this and the child from the work_forever log entry above
204        logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
205                     (self.dbfile, self.host, self.port, str(pid)))
206
207    def delpid(self):
208        os.remove(self.pidfile)
209
210    def daemonize(self):
211        """
212        See Advanced Programming in the UNIX, Sec 13.3
213        """
214        try:
215            pid = os.fork()
216            if pid > 0:
217                os.waitpid(pid, 0)
218                #parent return instead of exit to give control
219                return pid
220        except OSError as e:
221            raise Exception("%s [%d]" % (e.strerror, e.errno))
222
223        os.setsid()
224        """
225        fork again to make sure the daemon is not session leader,
226        which prevents it from acquiring controlling terminal
227        """
228        try:
229            pid = os.fork()
230            if pid > 0: #parent
231                os._exit(0)
232        except OSError as e:
233            raise Exception("%s [%d]" % (e.strerror, e.errno))
234
235        self.cleanup_handles()
236        os._exit(0)
237
238    def fork(self):
239        try:
240            pid = os.fork()
241            if pid > 0:
242                return pid
243        except OSError as e:
244            raise Exception("%s [%d]" % (e.strerror, e.errno))
245
246        bb.utils.signal_on_parent_exit("SIGTERM")
247        self.cleanup_handles()
248        os._exit(0)
249
250    def cleanup_handles(self):
251        signal.signal(signal.SIGINT, self.sigint_handler)
252        signal.signal(signal.SIGTERM, self.sigterm_handler)
253        os.chdir("/")
254
255        sys.stdout.flush()
256        sys.stderr.flush()
257
258        # We could be called from a python thread with io.StringIO as
259        # stdout/stderr or it could be 'real' unix fd forking where we need
260        # to physically close the fds to prevent the program launching us from
261        # potentially hanging on a pipe. Handle both cases.
262        si = open('/dev/null', 'r')
263        try:
264            os.dup2(si.fileno(),sys.stdin.fileno())
265        except (AttributeError, io.UnsupportedOperation):
266            sys.stdin = si
267        so = open(self.logfile, 'a+')
268        try:
269            os.dup2(so.fileno(),sys.stdout.fileno())
270        except (AttributeError, io.UnsupportedOperation):
271            sys.stdout = so
272        try:
273            os.dup2(so.fileno(),sys.stderr.fileno())
274        except (AttributeError, io.UnsupportedOperation):
275            sys.stderr = so
276
277        # Clear out all log handlers prior to the fork() to avoid calling
278        # event handlers not part of the PRserver
279        for logger_iter in logging.Logger.manager.loggerDict.keys():
280            logging.getLogger(logger_iter).handlers = []
281
282        # Ensure logging makes it to the logfile
283        streamhandler = logging.StreamHandler()
284        streamhandler.setLevel(logging.DEBUG)
285        formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
286        streamhandler.setFormatter(formatter)
287        logger.addHandler(streamhandler)
288
289        # write pidfile
290        pid = str(os.getpid())
291        pf = open(self.pidfile, 'w')
292        pf.write("%s\n" % pid)
293        pf.close()
294
295        self.work_forever()
296        self.delpid()
297
298class PRServSingleton(object):
299    def __init__(self, dbfile, logfile, interface):
300        self.dbfile = dbfile
301        self.logfile = logfile
302        self.interface = interface
303        self.host = None
304        self.port = None
305
306    def start(self):
307        self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False)
308        self.prserv.start()
309        self.host, self.port = self.prserv.getinfo()
310
311    def getinfo(self):
312        return (self.host, self.port)
313
314class PRServerConnection(object):
315    def __init__(self, host, port):
316        if is_local_special(host, port):
317            host, port = singleton.getinfo()
318        self.host = host
319        self.port = port
320        self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
321
322    def terminate(self):
323        try:
324            logger.info("Terminating PRServer...")
325            self.connection.quit()
326        except Exception as exc:
327            sys.stderr.write("%s\n" % str(exc))
328
329    def getPR(self, version, pkgarch, checksum):
330        return self.connection.getPR(version, pkgarch, checksum)
331
332    def ping(self):
333        return self.connection.ping()
334
335    def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
336        return self.connection.export(version, pkgarch, checksum, colinfo)
337
338    def dump_db(self):
339        return self.connection.dump_db()
340
341    def importone(self, version, pkgarch, checksum, value):
342        return self.connection.importone(version, pkgarch, checksum, value)
343
344    def getinfo(self):
345        return self.host, self.port
346
347def start_daemon(dbfile, host, port, logfile):
348    ip = socket.gethostbyname(host)
349    pidfile = PIDPREFIX % (ip, port)
350    try:
351        pf = open(pidfile,'r')
352        pid = int(pf.readline().strip())
353        pf.close()
354    except IOError:
355        pid = None
356
357    if pid:
358        sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
359                            % pidfile)
360        return 1
361
362    server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
363    server.start()
364
365    # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
366    # the one the server actually is listening, so at least warn the user about it
367    _,rport = server.getinfo()
368    if port != rport:
369        sys.stdout.write("Server is listening at port %s instead of %s\n"
370                         % (rport,port))
371    return 0
372
373def stop_daemon(host, port):
374    import glob
375    ip = socket.gethostbyname(host)
376    pidfile = PIDPREFIX % (ip, port)
377    try:
378        pf = open(pidfile,'r')
379        pid = int(pf.readline().strip())
380        pf.close()
381    except IOError:
382        pid = None
383
384    if not pid:
385        # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
386        # so at least advise the user which ports the corresponding server is listening
387        ports = []
388        portstr = ""
389        for pf in glob.glob(PIDPREFIX % (ip,'*')):
390            bn = os.path.basename(pf)
391            root, _ = os.path.splitext(bn)
392            ports.append(root.split('_')[-1])
393        if len(ports):
394            portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports))
395
396        sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
397                         % (pidfile,portstr))
398        return 1
399
400    try:
401        PRServerConnection(ip, port).terminate()
402    except:
403        logger.critical("Stop PRService %s:%d failed" % (host,port))
404
405    try:
406        if pid:
407            wait_timeout = 0
408            print("Waiting for pr-server to exit.")
409            while is_running(pid) and wait_timeout < 50:
410                time.sleep(0.1)
411                wait_timeout += 1
412
413            if is_running(pid):
414                print("Sending SIGTERM to pr-server.")
415                os.kill(pid,signal.SIGTERM)
416                time.sleep(0.1)
417
418            if os.path.exists(pidfile):
419                os.remove(pidfile)
420
421    except OSError as e:
422        err = str(e)
423        if err.find("No such process") <= 0:
424            raise e
425
426    return 0
427
428def is_running(pid):
429    try:
430        os.kill(pid, 0)
431    except OSError as err:
432        if err.errno == errno.ESRCH:
433            return False
434    return True
435
436def is_local_special(host, port):
437    if host.strip().upper() == 'localhost'.upper() and (not port):
438        return True
439    else:
440        return False
441
442class PRServiceConfigError(Exception):
443    pass
444
445def auto_start(d):
446    global singleton
447
448    # Shutdown any existing PR Server
449    auto_shutdown()
450
451    host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':')))
452    if not host_params:
453        return None
454
455    if len(host_params) != 2:
456        logger.critical('\n'.join(['PRSERV_HOST: incorrect format',
457                'Usage: PRSERV_HOST = "<hostname>:<port>"']))
458        raise PRServiceConfigError
459
460    if is_local_special(host_params[0], int(host_params[1])) and not singleton:
461        import bb.utils
462        cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
463        if not cachedir:
464            logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
465            raise PRServiceConfigError
466        bb.utils.mkdirhier(cachedir)
467        dbfile = os.path.join(cachedir, "prserv.sqlite3")
468        logfile = os.path.join(cachedir, "prserv.log")
469        singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0))
470        singleton.start()
471    if singleton:
472        host, port = singleton.getinfo()
473    else:
474        host = host_params[0]
475        port = int(host_params[1])
476
477    try:
478        connection = PRServerConnection(host,port)
479        connection.ping()
480        realhost, realport = connection.getinfo()
481        return str(realhost) + ":" + str(realport)
482
483    except Exception:
484        logger.critical("PRservice %s:%d not available" % (host, port))
485        raise PRServiceConfigError
486
487def auto_shutdown():
488    global singleton
489    if singleton:
490        host, port = singleton.getinfo()
491        try:
492            PRServerConnection(host, port).terminate()
493        except:
494            logger.critical("Stop PRService %s:%d failed" % (host,port))
495
496        try:
497            os.waitpid(singleton.prserv.pid, 0)
498        except ChildProcessError:
499            pass
500        singleton = None
501
502def ping(host, port):
503    conn=PRServerConnection(host, port)
504    return conn.ping()
505