1import os,sys,logging 2import signal, time 3from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler 4import threading 5import queue 6import socket 7import io 8import sqlite3 9import bb.server.xmlrpcclient 10import prserv 11import prserv.db 12import errno 13import select 14 15logger = logging.getLogger("BitBake.PRserv") 16 17if sys.hexversion < 0x020600F0: 18 print("Sorry, python 2.6 or later is required.") 19 sys.exit(1) 20 21class Handler(SimpleXMLRPCRequestHandler): 22 def _dispatch(self,method,params): 23 try: 24 value=self.server.funcs[method](*params) 25 except: 26 import traceback 27 traceback.print_exc() 28 raise 29 return value 30 31PIDPREFIX = "/tmp/PRServer_%s_%s.pid" 32singleton = None 33 34 35class PRServer(SimpleXMLRPCServer): 36 def __init__(self, dbfile, logfile, interface, daemon=True): 37 ''' constructor ''' 38 try: 39 SimpleXMLRPCServer.__init__(self, interface, 40 logRequests=False, allow_none=True) 41 except socket.error: 42 ip=socket.gethostbyname(interface[0]) 43 port=interface[1] 44 msg="PR Server unable to bind to %s:%s\n" % (ip, port) 45 sys.stderr.write(msg) 46 raise PRServiceConfigError 47 48 self.dbfile=dbfile 49 self.daemon=daemon 50 self.logfile=logfile 51 self.working_thread=None 52 self.host, self.port = self.socket.getsockname() 53 self.pidfile=PIDPREFIX % (self.host, self.port) 54 55 self.register_function(self.getPR, "getPR") 56 self.register_function(self.quit, "quit") 57 self.register_function(self.ping, "ping") 58 self.register_function(self.export, "export") 59 self.register_function(self.dump_db, "dump_db") 60 self.register_function(self.importone, "importone") 61 self.register_introspection_functions() 62 63 self.quitpipein, self.quitpipeout = os.pipe() 64 65 self.requestqueue = queue.Queue() 66 self.handlerthread = threading.Thread(target = self.process_request_thread) 67 self.handlerthread.daemon = False 68 69 def process_request_thread(self): 70 """Same as in BaseServer but as a thread. 71 72 In addition, exception handling is done here. 73 74 """ 75 iter_count = 1 76 # 60 iterations between syncs or sync if dirty every ~30 seconds 77 iterations_between_sync = 60 78 79 bb.utils.set_process_name("PRServ Handler") 80 81 while not self.quitflag: 82 try: 83 (request, client_address) = self.requestqueue.get(True, 30) 84 except queue.Empty: 85 self.table.sync_if_dirty() 86 continue 87 if request is None: 88 continue 89 try: 90 self.finish_request(request, client_address) 91 self.shutdown_request(request) 92 iter_count = (iter_count + 1) % iterations_between_sync 93 if iter_count == 0: 94 self.table.sync_if_dirty() 95 except: 96 self.handle_error(request, client_address) 97 self.shutdown_request(request) 98 self.table.sync() 99 self.table.sync_if_dirty() 100 101 def sigint_handler(self, signum, stack): 102 if self.table: 103 self.table.sync() 104 105 def sigterm_handler(self, signum, stack): 106 if self.table: 107 self.table.sync() 108 self.quit() 109 self.requestqueue.put((None, None)) 110 111 def process_request(self, request, client_address): 112 self.requestqueue.put((request, client_address)) 113 114 def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): 115 try: 116 return self.table.export(version, pkgarch, checksum, colinfo) 117 except sqlite3.Error as exc: 118 logger.error(str(exc)) 119 return None 120 121 def dump_db(self): 122 """ 123 Returns a script (string) that reconstructs the state of the 124 entire database at the time this function is called. The script 125 language is defined by the backing database engine, which is a 126 function of server configuration. 127 Returns None if the database engine does not support dumping to 128 script or if some other error is encountered in processing. 129 """ 130 buff = io.StringIO() 131 try: 132 self.table.sync() 133 self.table.dump_db(buff) 134 return buff.getvalue() 135 except Exception as exc: 136 logger.error(str(exc)) 137 return None 138 finally: 139 buff.close() 140 141 def importone(self, version, pkgarch, checksum, value): 142 return self.table.importone(version, pkgarch, checksum, value) 143 144 def ping(self): 145 return not self.quitflag 146 147 def getinfo(self): 148 return (self.host, self.port) 149 150 def getPR(self, version, pkgarch, checksum): 151 try: 152 return self.table.getValue(version, pkgarch, checksum) 153 except prserv.NotFoundError: 154 logger.error("can not find value for (%s, %s)",version, checksum) 155 return None 156 except sqlite3.Error as exc: 157 logger.error(str(exc)) 158 return None 159 160 def quit(self): 161 self.quitflag=True 162 os.write(self.quitpipeout, b"q") 163 os.close(self.quitpipeout) 164 return 165 166 def work_forever(self,): 167 self.quitflag = False 168 # This timeout applies to the poll in TCPServer, we need the select 169 # below to wake on our quit pipe closing. We only ever call into handle_request 170 # if there is data there. 171 self.timeout = 0.01 172 173 bb.utils.set_process_name("PRServ") 174 175 # DB connection must be created after all forks 176 self.db = prserv.db.PRData(self.dbfile) 177 self.table = self.db["PRMAIN"] 178 179 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % 180 (self.dbfile, self.host, self.port, str(os.getpid()))) 181 182 self.handlerthread.start() 183 while not self.quitflag: 184 ready = select.select([self.fileno(), self.quitpipein], [], [], 30) 185 if self.quitflag: 186 break 187 if self.fileno() in ready[0]: 188 self.handle_request() 189 self.handlerthread.join() 190 self.db.disconnect() 191 logger.info("PRServer: stopping...") 192 self.server_close() 193 os.close(self.quitpipein) 194 return 195 196 def start(self): 197 if self.daemon: 198 pid = self.daemonize() 199 else: 200 pid = self.fork() 201 self.pid = pid 202 203 # Ensure both the parent sees this and the child from the work_forever log entry above 204 logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % 205 (self.dbfile, self.host, self.port, str(pid))) 206 207 def delpid(self): 208 os.remove(self.pidfile) 209 210 def daemonize(self): 211 """ 212 See Advanced Programming in the UNIX, Sec 13.3 213 """ 214 try: 215 pid = os.fork() 216 if pid > 0: 217 os.waitpid(pid, 0) 218 #parent return instead of exit to give control 219 return pid 220 except OSError as e: 221 raise Exception("%s [%d]" % (e.strerror, e.errno)) 222 223 os.setsid() 224 """ 225 fork again to make sure the daemon is not session leader, 226 which prevents it from acquiring controlling terminal 227 """ 228 try: 229 pid = os.fork() 230 if pid > 0: #parent 231 os._exit(0) 232 except OSError as e: 233 raise Exception("%s [%d]" % (e.strerror, e.errno)) 234 235 self.cleanup_handles() 236 os._exit(0) 237 238 def fork(self): 239 try: 240 pid = os.fork() 241 if pid > 0: 242 return pid 243 except OSError as e: 244 raise Exception("%s [%d]" % (e.strerror, e.errno)) 245 246 bb.utils.signal_on_parent_exit("SIGTERM") 247 self.cleanup_handles() 248 os._exit(0) 249 250 def cleanup_handles(self): 251 signal.signal(signal.SIGINT, self.sigint_handler) 252 signal.signal(signal.SIGTERM, self.sigterm_handler) 253 os.chdir("/") 254 255 sys.stdout.flush() 256 sys.stderr.flush() 257 258 # We could be called from a python thread with io.StringIO as 259 # stdout/stderr or it could be 'real' unix fd forking where we need 260 # to physically close the fds to prevent the program launching us from 261 # potentially hanging on a pipe. Handle both cases. 262 si = open('/dev/null', 'r') 263 try: 264 os.dup2(si.fileno(),sys.stdin.fileno()) 265 except (AttributeError, io.UnsupportedOperation): 266 sys.stdin = si 267 so = open(self.logfile, 'a+') 268 try: 269 os.dup2(so.fileno(),sys.stdout.fileno()) 270 except (AttributeError, io.UnsupportedOperation): 271 sys.stdout = so 272 try: 273 os.dup2(so.fileno(),sys.stderr.fileno()) 274 except (AttributeError, io.UnsupportedOperation): 275 sys.stderr = so 276 277 # Clear out all log handlers prior to the fork() to avoid calling 278 # event handlers not part of the PRserver 279 for logger_iter in logging.Logger.manager.loggerDict.keys(): 280 logging.getLogger(logger_iter).handlers = [] 281 282 # Ensure logging makes it to the logfile 283 streamhandler = logging.StreamHandler() 284 streamhandler.setLevel(logging.DEBUG) 285 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") 286 streamhandler.setFormatter(formatter) 287 logger.addHandler(streamhandler) 288 289 # write pidfile 290 pid = str(os.getpid()) 291 pf = open(self.pidfile, 'w') 292 pf.write("%s\n" % pid) 293 pf.close() 294 295 self.work_forever() 296 self.delpid() 297 298class PRServSingleton(object): 299 def __init__(self, dbfile, logfile, interface): 300 self.dbfile = dbfile 301 self.logfile = logfile 302 self.interface = interface 303 self.host = None 304 self.port = None 305 306 def start(self): 307 self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False) 308 self.prserv.start() 309 self.host, self.port = self.prserv.getinfo() 310 311 def getinfo(self): 312 return (self.host, self.port) 313 314class PRServerConnection(object): 315 def __init__(self, host, port): 316 if is_local_special(host, port): 317 host, port = singleton.getinfo() 318 self.host = host 319 self.port = port 320 self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) 321 322 def terminate(self): 323 try: 324 logger.info("Terminating PRServer...") 325 self.connection.quit() 326 except Exception as exc: 327 sys.stderr.write("%s\n" % str(exc)) 328 329 def getPR(self, version, pkgarch, checksum): 330 return self.connection.getPR(version, pkgarch, checksum) 331 332 def ping(self): 333 return self.connection.ping() 334 335 def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): 336 return self.connection.export(version, pkgarch, checksum, colinfo) 337 338 def dump_db(self): 339 return self.connection.dump_db() 340 341 def importone(self, version, pkgarch, checksum, value): 342 return self.connection.importone(version, pkgarch, checksum, value) 343 344 def getinfo(self): 345 return self.host, self.port 346 347def start_daemon(dbfile, host, port, logfile): 348 ip = socket.gethostbyname(host) 349 pidfile = PIDPREFIX % (ip, port) 350 try: 351 pf = open(pidfile,'r') 352 pid = int(pf.readline().strip()) 353 pf.close() 354 except IOError: 355 pid = None 356 357 if pid: 358 sys.stderr.write("pidfile %s already exist. Daemon already running?\n" 359 % pidfile) 360 return 1 361 362 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) 363 server.start() 364 365 # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with 366 # the one the server actually is listening, so at least warn the user about it 367 _,rport = server.getinfo() 368 if port != rport: 369 sys.stdout.write("Server is listening at port %s instead of %s\n" 370 % (rport,port)) 371 return 0 372 373def stop_daemon(host, port): 374 import glob 375 ip = socket.gethostbyname(host) 376 pidfile = PIDPREFIX % (ip, port) 377 try: 378 pf = open(pidfile,'r') 379 pid = int(pf.readline().strip()) 380 pf.close() 381 except IOError: 382 pid = None 383 384 if not pid: 385 # when server starts at port=0 (i.e. localhost:0), server actually takes another port, 386 # so at least advise the user which ports the corresponding server is listening 387 ports = [] 388 portstr = "" 389 for pf in glob.glob(PIDPREFIX % (ip,'*')): 390 bn = os.path.basename(pf) 391 root, _ = os.path.splitext(bn) 392 ports.append(root.split('_')[-1]) 393 if len(ports): 394 portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports)) 395 396 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" 397 % (pidfile,portstr)) 398 return 1 399 400 try: 401 PRServerConnection(ip, port).terminate() 402 except: 403 logger.critical("Stop PRService %s:%d failed" % (host,port)) 404 405 try: 406 if pid: 407 wait_timeout = 0 408 print("Waiting for pr-server to exit.") 409 while is_running(pid) and wait_timeout < 50: 410 time.sleep(0.1) 411 wait_timeout += 1 412 413 if is_running(pid): 414 print("Sending SIGTERM to pr-server.") 415 os.kill(pid,signal.SIGTERM) 416 time.sleep(0.1) 417 418 if os.path.exists(pidfile): 419 os.remove(pidfile) 420 421 except OSError as e: 422 err = str(e) 423 if err.find("No such process") <= 0: 424 raise e 425 426 return 0 427 428def is_running(pid): 429 try: 430 os.kill(pid, 0) 431 except OSError as err: 432 if err.errno == errno.ESRCH: 433 return False 434 return True 435 436def is_local_special(host, port): 437 if host.strip().upper() == 'localhost'.upper() and (not port): 438 return True 439 else: 440 return False 441 442class PRServiceConfigError(Exception): 443 pass 444 445def auto_start(d): 446 global singleton 447 448 # Shutdown any existing PR Server 449 auto_shutdown() 450 451 host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':'))) 452 if not host_params: 453 return None 454 455 if len(host_params) != 2: 456 logger.critical('\n'.join(['PRSERV_HOST: incorrect format', 457 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) 458 raise PRServiceConfigError 459 460 if is_local_special(host_params[0], int(host_params[1])) and not singleton: 461 import bb.utils 462 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) 463 if not cachedir: 464 logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable") 465 raise PRServiceConfigError 466 bb.utils.mkdirhier(cachedir) 467 dbfile = os.path.join(cachedir, "prserv.sqlite3") 468 logfile = os.path.join(cachedir, "prserv.log") 469 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0)) 470 singleton.start() 471 if singleton: 472 host, port = singleton.getinfo() 473 else: 474 host = host_params[0] 475 port = int(host_params[1]) 476 477 try: 478 connection = PRServerConnection(host,port) 479 connection.ping() 480 realhost, realport = connection.getinfo() 481 return str(realhost) + ":" + str(realport) 482 483 except Exception: 484 logger.critical("PRservice %s:%d not available" % (host, port)) 485 raise PRServiceConfigError 486 487def auto_shutdown(): 488 global singleton 489 if singleton: 490 host, port = singleton.getinfo() 491 try: 492 PRServerConnection(host, port).terminate() 493 except: 494 logger.critical("Stop PRService %s:%d failed" % (host,port)) 495 496 try: 497 os.waitpid(singleton.prserv.pid, 0) 498 except ChildProcessError: 499 pass 500 singleton = None 501 502def ping(host, port): 503 conn=PRServerConnection(host, port) 504 return conn.ping() 505