1# 2# Copyright BitBake Contributors 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6 7import os,sys,logging 8import signal, time 9import socket 10import io 11import sqlite3 12import prserv 13import prserv.db 14import errno 15import bb.asyncrpc 16 17logger = logging.getLogger("BitBake.PRserv") 18 19PIDPREFIX = "/tmp/PRServer_%s_%s.pid" 20singleton = None 21 22class PRServerClient(bb.asyncrpc.AsyncServerConnection): 23 def __init__(self, socket, server): 24 super().__init__(socket, "PRSERVICE", server.logger) 25 self.server = server 26 27 self.handlers.update({ 28 "get-pr": self.handle_get_pr, 29 "test-pr": self.handle_test_pr, 30 "test-package": self.handle_test_package, 31 "max-package-pr": self.handle_max_package_pr, 32 "import-one": self.handle_import_one, 33 "export": self.handle_export, 34 "is-readonly": self.handle_is_readonly, 35 }) 36 37 def validate_proto_version(self): 38 return (self.proto_version == (1, 0)) 39 40 async def dispatch_message(self, msg): 41 try: 42 return await super().dispatch_message(msg) 43 except: 44 self.server.table.sync() 45 raise 46 else: 47 self.server.table.sync_if_dirty() 48 49 async def handle_test_pr(self, request): 50 '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value''' 51 version = request["version"] 52 pkgarch = request["pkgarch"] 53 checksum = request["checksum"] 54 55 value = self.server.table.find_value(version, pkgarch, checksum) 56 return {"value": value} 57 58 async def handle_test_package(self, request): 59 '''Tells whether there are entries for (version, pkgarch) in the db. Returns True or False''' 60 version = request["version"] 61 pkgarch = request["pkgarch"] 62 63 value = self.server.table.test_package(version, pkgarch) 64 return {"value": value} 65 66 async def handle_max_package_pr(self, request): 67 '''Finds the greatest PR value for (version, pkgarch) in the db. Returns None if no entry was found''' 68 version = request["version"] 69 pkgarch = request["pkgarch"] 70 71 value = self.server.table.find_max_value(version, pkgarch) 72 return {"value": value} 73 74 async def handle_get_pr(self, request): 75 version = request["version"] 76 pkgarch = request["pkgarch"] 77 checksum = request["checksum"] 78 79 response = None 80 try: 81 value = self.server.table.get_value(version, pkgarch, checksum) 82 response = {"value": value} 83 except prserv.NotFoundError: 84 self.logger.error("failure storing value in database for (%s, %s)",version, checksum) 85 86 return response 87 88 async def handle_import_one(self, request): 89 response = None 90 if not self.server.read_only: 91 version = request["version"] 92 pkgarch = request["pkgarch"] 93 checksum = request["checksum"] 94 value = request["value"] 95 96 value = self.server.table.importone(version, pkgarch, checksum, value) 97 if value is not None: 98 response = {"value": value} 99 100 return response 101 102 async def handle_export(self, request): 103 version = request["version"] 104 pkgarch = request["pkgarch"] 105 checksum = request["checksum"] 106 colinfo = request["colinfo"] 107 108 try: 109 (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo) 110 except sqlite3.Error as exc: 111 self.logger.error(str(exc)) 112 metainfo = datainfo = None 113 114 return {"metainfo": metainfo, "datainfo": datainfo} 115 116 async def handle_is_readonly(self, request): 117 return {"readonly": self.server.read_only} 118 119class PRServer(bb.asyncrpc.AsyncServer): 120 def __init__(self, dbfile, read_only=False): 121 super().__init__(logger) 122 self.dbfile = dbfile 123 self.table = None 124 self.read_only = read_only 125 126 def accept_client(self, socket): 127 return PRServerClient(socket, self) 128 129 def start(self): 130 tasks = super().start() 131 self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) 132 self.table = self.db["PRMAIN"] 133 134 self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % 135 (self.dbfile, self.address, str(os.getpid()))) 136 137 return tasks 138 139 async def stop(self): 140 self.table.sync_if_dirty() 141 self.db.disconnect() 142 await super().stop() 143 144 def signal_handler(self): 145 super().signal_handler() 146 if self.table: 147 self.table.sync() 148 149class PRServSingleton(object): 150 def __init__(self, dbfile, logfile, host, port): 151 self.dbfile = dbfile 152 self.logfile = logfile 153 self.host = host 154 self.port = port 155 156 def start(self): 157 self.prserv = PRServer(self.dbfile) 158 self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) 159 self.process = self.prserv.serve_as_process(log_level=logging.WARNING) 160 161 if not self.prserv.address: 162 raise PRServiceConfigError 163 if not self.port: 164 self.port = int(self.prserv.address.rsplit(":", 1)[1]) 165 166def run_as_daemon(func, pidfile, logfile): 167 """ 168 See Advanced Programming in the UNIX, Sec 13.3 169 """ 170 try: 171 pid = os.fork() 172 if pid > 0: 173 os.waitpid(pid, 0) 174 #parent return instead of exit to give control 175 return pid 176 except OSError as e: 177 raise Exception("%s [%d]" % (e.strerror, e.errno)) 178 179 os.setsid() 180 """ 181 fork again to make sure the daemon is not session leader, 182 which prevents it from acquiring controlling terminal 183 """ 184 try: 185 pid = os.fork() 186 if pid > 0: #parent 187 os._exit(0) 188 except OSError as e: 189 raise Exception("%s [%d]" % (e.strerror, e.errno)) 190 191 os.chdir("/") 192 193 sys.stdout.flush() 194 sys.stderr.flush() 195 196 # We could be called from a python thread with io.StringIO as 197 # stdout/stderr or it could be 'real' unix fd forking where we need 198 # to physically close the fds to prevent the program launching us from 199 # potentially hanging on a pipe. Handle both cases. 200 si = open("/dev/null", "r") 201 try: 202 os.dup2(si.fileno(), sys.stdin.fileno()) 203 except (AttributeError, io.UnsupportedOperation): 204 sys.stdin = si 205 so = open(logfile, "a+") 206 try: 207 os.dup2(so.fileno(), sys.stdout.fileno()) 208 except (AttributeError, io.UnsupportedOperation): 209 sys.stdout = so 210 try: 211 os.dup2(so.fileno(), sys.stderr.fileno()) 212 except (AttributeError, io.UnsupportedOperation): 213 sys.stderr = so 214 215 # Clear out all log handlers prior to the fork() to avoid calling 216 # event handlers not part of the PRserver 217 for logger_iter in logging.Logger.manager.loggerDict.keys(): 218 logging.getLogger(logger_iter).handlers = [] 219 220 # Ensure logging makes it to the logfile 221 streamhandler = logging.StreamHandler() 222 streamhandler.setLevel(logging.DEBUG) 223 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") 224 streamhandler.setFormatter(formatter) 225 logger.addHandler(streamhandler) 226 227 # write pidfile 228 pid = str(os.getpid()) 229 with open(pidfile, "w") as pf: 230 pf.write("%s\n" % pid) 231 232 func() 233 os.remove(pidfile) 234 os._exit(0) 235 236def start_daemon(dbfile, host, port, logfile, read_only=False): 237 ip = socket.gethostbyname(host) 238 pidfile = PIDPREFIX % (ip, port) 239 try: 240 with open(pidfile) as pf: 241 pid = int(pf.readline().strip()) 242 except IOError: 243 pid = None 244 245 if pid: 246 sys.stderr.write("pidfile %s already exist. Daemon already running?\n" 247 % pidfile) 248 return 1 249 250 dbfile = os.path.abspath(dbfile) 251 def daemon_main(): 252 server = PRServer(dbfile, read_only=read_only) 253 server.start_tcp_server(ip, port) 254 server.serve_forever() 255 256 run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile)) 257 return 0 258 259def stop_daemon(host, port): 260 import glob 261 ip = socket.gethostbyname(host) 262 pidfile = PIDPREFIX % (ip, port) 263 try: 264 with open(pidfile) as pf: 265 pid = int(pf.readline().strip()) 266 except IOError: 267 pid = None 268 269 if not pid: 270 # when server starts at port=0 (i.e. localhost:0), server actually takes another port, 271 # so at least advise the user which ports the corresponding server is listening 272 ports = [] 273 portstr = "" 274 for pf in glob.glob(PIDPREFIX % (ip, "*")): 275 bn = os.path.basename(pf) 276 root, _ = os.path.splitext(bn) 277 ports.append(root.split("_")[-1]) 278 if len(ports): 279 portstr = "Wrong port? Other ports listening at %s: %s" % (host, " ".join(ports)) 280 281 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" 282 % (pidfile, portstr)) 283 return 1 284 285 try: 286 if is_running(pid): 287 print("Sending SIGTERM to pr-server.") 288 os.kill(pid, signal.SIGTERM) 289 time.sleep(0.1) 290 291 try: 292 os.remove(pidfile) 293 except FileNotFoundError: 294 # The PID file might have been removed by the exiting process 295 pass 296 297 except OSError as e: 298 err = str(e) 299 if err.find("No such process") <= 0: 300 raise e 301 302 return 0 303 304def is_running(pid): 305 try: 306 os.kill(pid, 0) 307 except OSError as err: 308 if err.errno == errno.ESRCH: 309 return False 310 return True 311 312def is_local_special(host, port): 313 if (host == "localhost" or host == "127.0.0.1") and not port: 314 return True 315 else: 316 return False 317 318class PRServiceConfigError(Exception): 319 pass 320 321def auto_start(d): 322 global singleton 323 324 host_params = list(filter(None, (d.getVar("PRSERV_HOST") or "").split(":"))) 325 if not host_params: 326 # Shutdown any existing PR Server 327 auto_shutdown() 328 return None 329 330 if len(host_params) != 2: 331 # Shutdown any existing PR Server 332 auto_shutdown() 333 logger.critical("\n".join(["PRSERV_HOST: incorrect format", 334 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) 335 raise PRServiceConfigError 336 337 host = host_params[0].strip().lower() 338 port = int(host_params[1]) 339 if is_local_special(host, port): 340 import bb.utils 341 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) 342 if not cachedir: 343 logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable") 344 raise PRServiceConfigError 345 dbfile = os.path.join(cachedir, "prserv.sqlite3") 346 logfile = os.path.join(cachedir, "prserv.log") 347 if singleton: 348 if singleton.dbfile != dbfile: 349 # Shutdown any existing PR Server as doesn't match config 350 auto_shutdown() 351 if not singleton: 352 bb.utils.mkdirhier(cachedir) 353 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) 354 singleton.start() 355 if singleton: 356 host = singleton.host 357 port = singleton.port 358 359 try: 360 ping(host, port) 361 return str(host) + ":" + str(port) 362 363 except Exception: 364 logger.critical("PRservice %s:%d not available" % (host, port)) 365 raise PRServiceConfigError 366 367def auto_shutdown(): 368 global singleton 369 if singleton and singleton.process: 370 singleton.process.terminate() 371 singleton.process.join() 372 singleton = None 373 374def ping(host, port): 375 from . import client 376 377 with client.PRClient() as conn: 378 conn.connect_tcp(host, port) 379 return conn.ping() 380 381def connect(host, port): 382 from . import client 383 384 global singleton 385 386 if host.strip().lower() == "localhost" and not port: 387 host = "localhost" 388 port = singleton.port 389 390 conn = client.PRClient() 391 conn.connect_tcp(host, port) 392 return conn 393