1#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9"""
10    This module implements a multiprocessing.Process based server for bitbake.
11"""
12
13import bb
14import bb.event
15import logging
16import multiprocessing
17import threading
18import array
19import os
20import sys
21import time
22import select
23import socket
24import subprocess
25import errno
26import re
27import datetime
28import pickle
29import traceback
30import gc
31import stat
32import bb.server.xmlrpcserver
33from bb import daemonize
34from multiprocessing import queues
35
36logger = logging.getLogger('BitBake')
37
38class ProcessTimeout(SystemExit):
39    pass
40
41def serverlog(msg):
42    print(str(os.getpid()) + " " +  datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg)
43    sys.stdout.flush()
44
45#
46# When we have lockfile issues, try and find infomation about which process is
47# using the lockfile
48#
49def get_lockfile_process_msg(lockfile):
50    # Some systems may not have lsof available
51    procs = None
52    try:
53        procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
54    except subprocess.CalledProcessError:
55        # File was deleted?
56        pass
57    except OSError as e:
58        if e.errno != errno.ENOENT:
59            raise
60    if procs is None:
61        # Fall back to fuser if lsof is unavailable
62        try:
63            procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
64        except subprocess.CalledProcessError:
65            # File was deleted?
66            pass
67        except OSError as e:
68            if e.errno != errno.ENOENT:
69                raise
70    if procs:
71        return procs.decode("utf-8")
72    return None
73
74class idleFinish():
75    def __init__(self, msg):
76         self.msg = msg
77
78class ProcessServer():
79    profile_filename = "profile.log"
80    profile_processed_filename = "profile.log.processed"
81
82    def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
83        self.command_channel = False
84        self.command_channel_reply = False
85        self.quit = False
86        self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
87        self.next_heartbeat = time.time()
88
89        self.event_handle = None
90        self.hadanyui = False
91        self.haveui = False
92        self.maxuiwait = 30
93        self.xmlrpc = False
94
95        self.idle = None
96        # Need a lock for _idlefuns changes
97        self._idlefuns = {}
98        self._idlefuncsLock = threading.Lock()
99        self.idle_cond = threading.Condition(self._idlefuncsLock)
100
101        self.bitbake_lock = lock
102        self.bitbake_lock_name = lockname
103        self.sock = sock
104        self.sockname = sockname
105        # It is possible the directory may be renamed. Cache the inode of the socket file
106        # so we can tell if things changed.
107        self.sockinode = os.stat(self.sockname)[stat.ST_INO]
108
109        self.server_timeout = server_timeout
110        self.timeout = self.server_timeout
111        self.xmlrpcinterface = xmlrpcinterface
112
113    def register_idle_function(self, function, data):
114        """Register a function to be called while the server is idle"""
115        assert hasattr(function, '__call__')
116        with bb.utils.lock_timeout(self._idlefuncsLock):
117            self._idlefuns[function] = data
118        serverlog("Registering idle function %s" % str(function))
119
120    def run(self):
121
122        if self.xmlrpcinterface[0]:
123            self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
124
125            serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
126
127        try:
128            self.bitbake_lock.seek(0)
129            self.bitbake_lock.truncate()
130            if self.xmlrpc:
131                self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
132            else:
133                self.bitbake_lock.write("%s\n" % (os.getpid()))
134            self.bitbake_lock.flush()
135        except Exception as e:
136            serverlog("Error writing to lock file: %s" % str(e))
137            pass
138
139        if self.cooker.configuration.profile:
140            try:
141                import cProfile as profile
142            except:
143                import profile
144            prof = profile.Profile()
145
146            ret = profile.Profile.runcall(prof, self.main)
147
148            prof.dump_stats("profile.log")
149            bb.utils.process_profilelog("profile.log")
150            serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
151
152        else:
153            ret = self.main()
154
155        return ret
156
157    def _idle_check(self):
158        return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
159
160    def wait_for_idle(self, timeout=30):
161        # Wait for the idle loop to have cleared
162        with bb.utils.lock_timeout(self._idlefuncsLock):
163            return self.idle_cond.wait_for(self._idle_check, timeout) is not False
164
165    def set_async_cmd(self, cmd):
166        with bb.utils.lock_timeout(self._idlefuncsLock):
167            ret = self.idle_cond.wait_for(self._idle_check, 30)
168            if ret is False:
169                return False
170            self.cooker.command.currentAsyncCommand = cmd
171            return True
172
173    def clear_async_cmd(self):
174        with bb.utils.lock_timeout(self._idlefuncsLock):
175            self.cooker.command.currentAsyncCommand = None
176            self.idle_cond.notify_all()
177
178    def get_async_cmd(self):
179        with bb.utils.lock_timeout(self._idlefuncsLock):
180            return self.cooker.command.currentAsyncCommand
181
182    def main(self):
183        self.cooker.pre_serve()
184
185        bb.utils.set_process_name("Cooker")
186
187        ready = []
188        newconnections = []
189
190        self.controllersock = False
191        fds = [self.sock]
192        if self.xmlrpc:
193            fds.append(self.xmlrpc)
194        seendata = False
195        serverlog("Entering server connection loop")
196        serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
197
198        def disconnect_client(self, fds):
199            serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
200            if self.controllersock:
201                fds.remove(self.controllersock)
202                self.controllersock.close()
203                self.controllersock = False
204            if self.haveui:
205                # Wait for the idle loop to have cleared (30s max)
206                if not self.wait_for_idle(30):
207                    serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
208                    self.quit = True
209                fds.remove(self.command_channel)
210                bb.event.unregister_UIHhandler(self.event_handle, True)
211                self.command_channel_reply.writer.close()
212                self.event_writer.writer.close()
213                self.command_channel.close()
214                self.command_channel = False
215                del self.event_writer
216                self.lastui = time.time()
217                self.cooker.clientComplete()
218                self.haveui = False
219            ready = select.select(fds,[],[],0)[0]
220            if newconnections and not self.quit:
221                serverlog("Starting new client")
222                conn = newconnections.pop(-1)
223                fds.append(conn)
224                self.controllersock = conn
225            elif not self.timeout and not ready:
226                serverlog("No timeout, exiting.")
227                self.quit = True
228
229        self.lastui = time.time()
230        while not self.quit:
231            if self.sock in ready:
232                while select.select([self.sock],[],[],0)[0]:
233                    controllersock, address = self.sock.accept()
234                    if self.controllersock:
235                        serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
236                        newconnections.append(controllersock)
237                    else:
238                        serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
239                        self.controllersock = controllersock
240                        fds.append(controllersock)
241            if self.controllersock in ready:
242                try:
243                    serverlog("Processing Client")
244                    ui_fds = recvfds(self.controllersock, 3)
245                    serverlog("Connecting Client")
246
247                    # Where to write events to
248                    writer = ConnectionWriter(ui_fds[0])
249                    self.event_handle = bb.event.register_UIHhandler(writer, True)
250                    self.event_writer = writer
251
252                    # Where to read commands from
253                    reader = ConnectionReader(ui_fds[1])
254                    fds.append(reader)
255                    self.command_channel = reader
256
257                    # Where to send command return values to
258                    writer = ConnectionWriter(ui_fds[2])
259                    self.command_channel_reply = writer
260
261                    self.haveui = True
262                    self.hadanyui = True
263
264                except (EOFError, OSError):
265                    disconnect_client(self, fds)
266
267            if not self.timeout == -1.0 and not self.haveui and self.timeout and \
268                    (self.lastui + self.timeout) < time.time():
269                serverlog("Server timeout, exiting.")
270                self.quit = True
271
272            # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
273            # one. We have had issue with processes hanging indefinitely so timing out UI-less
274            # servers is useful.
275            if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
276                serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
277                self.quit = True
278
279            if self.command_channel in ready:
280                try:
281                    command = self.command_channel.get()
282                except EOFError:
283                    # Client connection shutting down
284                    ready = []
285                    disconnect_client(self, fds)
286                    continue
287                if command[0] == "terminateServer":
288                    self.quit = True
289                    continue
290                try:
291                    serverlog("Running command %s" % command)
292                    self.command_channel_reply.send(self.cooker.command.runCommand(command, self))
293                    serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
294                except Exception as e:
295                   stack = traceback.format_exc()
296                   serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
297                   logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
298
299            if self.xmlrpc in ready:
300                self.xmlrpc.handle_requests()
301
302            if not seendata and hasattr(self.cooker, "data"):
303                heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
304                if heartbeat_event:
305                    try:
306                        self.heartbeat_seconds = float(heartbeat_event)
307                    except:
308                        bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
309
310                self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
311                try:
312                    if self.timeout:
313                        self.timeout = float(self.timeout)
314                except:
315                    bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
316                seendata = True
317
318            ready = self.idle_commands(.1, fds)
319
320        if self.idle:
321            self.idle.join()
322
323        serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
324        # Remove the socket file so we don't get any more connections to avoid races
325        # The build directory could have been renamed so if the file isn't the one we created
326        # we shouldn't delete it.
327        try:
328            sockinode = os.stat(self.sockname)[stat.ST_INO]
329            if sockinode == self.sockinode:
330                os.unlink(self.sockname)
331            else:
332                serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
333        except Exception as err:
334            serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
335        self.sock.close()
336
337        try:
338            self.cooker.shutdown(True, idle=False)
339            self.cooker.notifier.stop()
340            self.cooker.confignotifier.stop()
341        except:
342            pass
343
344        self.cooker.post_serve()
345
346        if len(threading.enumerate()) != 1:
347            serverlog("More than one thread left?: " + str(threading.enumerate()))
348
349        # Flush logs before we release the lock
350        sys.stdout.flush()
351        sys.stderr.flush()
352
353        # Finally release the lockfile but warn about other processes holding it open
354        lock = self.bitbake_lock
355        lockfile = self.bitbake_lock_name
356
357        def get_lock_contents(lockfile):
358            try:
359                with open(lockfile, "r") as f:
360                    return f.readlines()
361            except FileNotFoundError:
362                return None
363
364        lock.close()
365        lock = None
366
367        while not lock:
368            i = 0
369            lock = None
370            if not os.path.exists(os.path.basename(lockfile)):
371                serverlog("Lockfile directory gone, exiting.")
372                return
373
374            while not lock and i < 30:
375                lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
376                if not lock:
377                    newlockcontents = get_lock_contents(lockfile)
378                    if not newlockcontents[0].startswith([os.getpid() + "\n", os.getpid() + " "]):
379                        # A new server was started, the lockfile contents changed, we can exit
380                        serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
381                        return
382                    time.sleep(0.1)
383                i += 1
384            if lock:
385                # We hold the lock so we can remove the file (hide stale pid data)
386                # via unlockfile.
387                bb.utils.unlockfile(lock)
388                serverlog("Exiting as we could obtain the lock")
389                return
390
391            if not lock:
392                procs = get_lockfile_process_msg(lockfile)
393                msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
394                if procs:
395                    msg.append(":\n%s" % procs)
396                serverlog("".join(msg))
397
398    def idle_thread(self):
399        def remove_idle_func(function):
400            with bb.utils.lock_timeout(self._idlefuncsLock):
401                del self._idlefuns[function]
402                self.idle_cond.notify_all()
403
404        while not self.quit:
405            nextsleep = 0.1
406            fds = []
407
408            self.cooker.process_inotify_updates()
409
410            with bb.utils.lock_timeout(self._idlefuncsLock):
411                items = list(self._idlefuns.items())
412
413            for function, data in items:
414                try:
415                    retval = function(self, data, False)
416                    if isinstance(retval, idleFinish):
417                        serverlog("Removing idle function %s at idleFinish" % str(function))
418                        remove_idle_func(function)
419                        self.cooker.command.finishAsyncCommand(retval.msg)
420                        nextsleep = None
421                    elif retval is False:
422                        serverlog("Removing idle function %s" % str(function))
423                        remove_idle_func(function)
424                        nextsleep = None
425                    elif retval is True:
426                        nextsleep = None
427                    elif isinstance(retval, float) and nextsleep:
428                        if (retval < nextsleep):
429                            nextsleep = retval
430                    elif nextsleep is None:
431                        continue
432                    else:
433                        fds = fds + retval
434                except SystemExit:
435                    raise
436                except Exception as exc:
437                    if not isinstance(exc, bb.BBHandledException):
438                        logger.exception('Running idle function')
439                    remove_idle_func(function)
440                    serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
441                    self.quit = True
442
443            # Create new heartbeat event?
444            now = time.time()
445            if bb.event._heartbeat_enabled and now >= self.next_heartbeat:
446                # We might have missed heartbeats. Just trigger once in
447                # that case and continue after the usual delay.
448                self.next_heartbeat += self.heartbeat_seconds
449                if self.next_heartbeat <= now:
450                    self.next_heartbeat = now + self.heartbeat_seconds
451                if hasattr(self.cooker, "data"):
452                    heartbeat = bb.event.HeartbeatEvent(now)
453                    try:
454                        bb.event.fire(heartbeat, self.cooker.data)
455                    except Exception as exc:
456                        if not isinstance(exc, bb.BBHandledException):
457                            logger.exception('Running heartbeat function')
458                        serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
459                        self.quit = True
460            if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
461                # Shorten timeout so that we we wake up in time for
462                # the heartbeat.
463                nextsleep = self.next_heartbeat - now
464
465            if nextsleep is not None:
466                select.select(fds,[],[],nextsleep)[0]
467
468    def idle_commands(self, delay, fds=None):
469        nextsleep = delay
470        if not fds:
471            fds = []
472
473        if not self.idle:
474            self.idle = threading.Thread(target=self.idle_thread)
475            self.idle.start()
476
477        if nextsleep is not None:
478            if self.xmlrpc:
479                nextsleep = self.xmlrpc.get_timeout(nextsleep)
480            try:
481                return select.select(fds,[],[],nextsleep)[0]
482            except InterruptedError:
483                # Ignore EINTR
484                return []
485        else:
486            return select.select(fds,[],[],0)[0]
487
488
489class ServerCommunicator():
490    def __init__(self, connection, recv):
491        self.connection = connection
492        self.recv = recv
493
494    def runCommand(self, command):
495        self.connection.send(command)
496        if not self.recv.poll(30):
497            logger.info("No reply from server in 30s")
498            if not self.recv.poll(30):
499                raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)")
500        ret, exc = self.recv.get()
501        # Should probably turn all exceptions in exc back into exceptions?
502        # For now, at least handle BBHandledException
503        if exc and ("BBHandledException" in exc or "SystemExit" in exc):
504            raise bb.BBHandledException()
505        return ret, exc
506
507    def updateFeatureSet(self, featureset):
508        _, error = self.runCommand(["setFeatures", featureset])
509        if error:
510            logger.error("Unable to set the cooker to the correct featureset: %s" % error)
511            raise BaseException(error)
512
513    def getEventHandle(self):
514        handle, error = self.runCommand(["getUIHandlerNum"])
515        if error:
516            logger.error("Unable to get UI Handler Number: %s" % error)
517            raise BaseException(error)
518
519        return handle
520
521    def terminateServer(self):
522        self.connection.send(['terminateServer'])
523        return
524
525class BitBakeProcessServerConnection(object):
526    def __init__(self, ui_channel, recv, eq, sock):
527        self.connection = ServerCommunicator(ui_channel, recv)
528        self.events = eq
529        # Save sock so it doesn't get gc'd for the life of our connection
530        self.socket_connection = sock
531
532    def terminate(self):
533        self.events.close()
534        self.socket_connection.close()
535        self.connection.connection.close()
536        self.connection.recv.close()
537        return
538
539start_log_format = '--- Starting bitbake server pid %s at %s ---'
540start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
541
542class BitBakeServer(object):
543
544    def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
545
546        self.server_timeout = server_timeout
547        self.xmlrpcinterface = xmlrpcinterface
548        self.featureset = featureset
549        self.sockname = sockname
550        self.bitbake_lock = lock
551        self.profile = profile
552        self.readypipe, self.readypipein = os.pipe()
553
554        # Place the log in the builddirectory alongside the lock file
555        logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
556        self.logfile = logfile
557
558        startdatetime = datetime.datetime.now()
559        bb.daemonize.createDaemon(self._startServer, logfile)
560        self.bitbake_lock.close()
561        os.close(self.readypipein)
562
563        ready = ConnectionReader(self.readypipe)
564        r = ready.poll(5)
565        if not r:
566            bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
567            r = ready.poll(90)
568        if r:
569            try:
570                r = ready.get()
571            except EOFError:
572                # Trap the child exiting/closing the pipe and error out
573                r = None
574        if not r or r[0] != "r":
575            ready.close()
576            bb.error("Unable to start bitbake server (%s)" % str(r))
577            if os.path.exists(logfile):
578                logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
579                started = False
580                lines = []
581                lastlines = []
582                with open(logfile, "r") as f:
583                    for line in f:
584                        if started:
585                            lines.append(line)
586                        else:
587                            lastlines.append(line)
588                            res = logstart_re.search(line.rstrip())
589                            if res:
590                                ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
591                                if ldatetime >= startdatetime:
592                                    started = True
593                                    lines.append(line)
594                        if len(lastlines) > 60:
595                            lastlines = lastlines[-60:]
596                if lines:
597                    if len(lines) > 60:
598                        bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
599                    else:
600                        bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
601                elif lastlines:
602                        bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
603            else:
604                bb.error("%s doesn't exist" % logfile)
605
606            raise SystemExit(1)
607
608        ready.close()
609
610    def _startServer(self):
611        os.close(self.readypipe)
612        os.set_inheritable(self.bitbake_lock.fileno(), True)
613        os.set_inheritable(self.readypipein, True)
614        serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
615        os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname,  str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
616
617def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
618
619    import bb.cookerdata
620    import bb.cooker
621
622    serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
623
624    try:
625        bitbake_lock = os.fdopen(lockfd, "w")
626
627        # Create server control socket
628        if os.path.exists(sockname):
629            serverlog("WARNING: removing existing socket file '%s'" % sockname)
630            os.unlink(sockname)
631
632        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
633        # AF_UNIX has path length issues so chdir here to workaround
634        cwd = os.getcwd()
635        try:
636            os.chdir(os.path.dirname(sockname))
637            sock.bind(os.path.basename(sockname))
638        finally:
639            os.chdir(cwd)
640        sock.listen(1)
641
642        server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
643        writer = ConnectionWriter(readypipeinfd)
644        try:
645            featureset = []
646            cooker = bb.cooker.BBCooker(featureset, server)
647            cooker.configuration.profile = profile
648        except bb.BBHandledException:
649            return None
650        writer.send("r")
651        writer.close()
652        server.cooker = cooker
653        serverlog("Started bitbake server pid %d" % os.getpid())
654
655        server.run()
656    finally:
657        # Flush any messages/errors to the logfile before exit
658        sys.stdout.flush()
659        sys.stderr.flush()
660
661def connectProcessServer(sockname, featureset):
662    # Connect to socket
663    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
664    # AF_UNIX has path length issues so chdir here to workaround
665    cwd = os.getcwd()
666
667    readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
668    eq = command_chan_recv = command_chan = None
669
670    sock.settimeout(10)
671
672    try:
673        try:
674            os.chdir(os.path.dirname(sockname))
675            finished = False
676            while not finished:
677                try:
678                    sock.connect(os.path.basename(sockname))
679                    finished = True
680                except IOError as e:
681                    if e.errno == errno.EWOULDBLOCK:
682                        pass
683                    raise
684        finally:
685            os.chdir(cwd)
686
687        # Send an fd for the remote to write events to
688        readfd, writefd = os.pipe()
689        eq = BBUIEventQueue(readfd)
690        # Send an fd for the remote to recieve commands from
691        readfd1, writefd1 = os.pipe()
692        command_chan = ConnectionWriter(writefd1)
693        # Send an fd for the remote to write commands results to
694        readfd2, writefd2 = os.pipe()
695        command_chan_recv = ConnectionReader(readfd2)
696
697        sendfds(sock, [writefd, readfd1, writefd2])
698
699        server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
700
701        # Close the ends of the pipes we won't use
702        for i in [writefd, readfd1, writefd2]:
703            os.close(i)
704
705        server_connection.connection.updateFeatureSet(featureset)
706
707    except (Exception, SystemExit) as e:
708        if command_chan_recv:
709            command_chan_recv.close()
710        if command_chan:
711            command_chan.close()
712        for i in [writefd, readfd1, writefd2]:
713            try:
714                if i:
715                    os.close(i)
716            except OSError:
717                pass
718        sock.close()
719        raise
720
721    return server_connection
722
723def sendfds(sock, fds):
724        '''Send an array of fds over an AF_UNIX socket.'''
725        fds = array.array('i', fds)
726        msg = bytes([len(fds) % 256])
727        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
728
729def recvfds(sock, size):
730        '''Receive an array of fds over an AF_UNIX socket.'''
731        a = array.array('i')
732        bytes_size = a.itemsize * size
733        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
734        if not msg and not ancdata:
735            raise EOFError
736        try:
737            if len(ancdata) != 1:
738                raise RuntimeError('received %d items of ancdata' %
739                                   len(ancdata))
740            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
741            if (cmsg_level == socket.SOL_SOCKET and
742                cmsg_type == socket.SCM_RIGHTS):
743                if len(cmsg_data) % a.itemsize != 0:
744                    raise ValueError
745                a.frombytes(cmsg_data)
746                assert len(a) % 256 == msg[0]
747                return list(a)
748        except (ValueError, IndexError):
749            pass
750        raise RuntimeError('Invalid data received')
751
752class BBUIEventQueue:
753    def __init__(self, readfd):
754
755        self.eventQueue = []
756        self.eventQueueLock = threading.Lock()
757        self.eventQueueNotify = threading.Event()
758
759        self.reader = ConnectionReader(readfd)
760
761        self.t = threading.Thread()
762        self.t.run = self.startCallbackHandler
763        self.t.start()
764
765    def getEvent(self):
766        with bb.utils.lock_timeout(self.eventQueueLock):
767            if len(self.eventQueue) == 0:
768                return None
769
770            item = self.eventQueue.pop(0)
771            if len(self.eventQueue) == 0:
772                self.eventQueueNotify.clear()
773
774        return item
775
776    def waitEvent(self, delay):
777        self.eventQueueNotify.wait(delay)
778        return self.getEvent()
779
780    def queue_event(self, event):
781        with bb.utils.lock_timeout(self.eventQueueLock):
782            self.eventQueue.append(event)
783            self.eventQueueNotify.set()
784
785    def send_event(self, event):
786        self.queue_event(pickle.loads(event))
787
788    def startCallbackHandler(self):
789        bb.utils.set_process_name("UIEventQueue")
790        while True:
791            try:
792                ready = self.reader.wait(0.25)
793                if ready:
794                    event = self.reader.get()
795                    self.queue_event(event)
796            except (EOFError, OSError, TypeError):
797                # Easiest way to exit is to close the file descriptor to cause an exit
798                break
799
800    def close(self):
801        self.reader.close()
802        self.t.join()
803
804class ConnectionReader(object):
805
806    def __init__(self, fd):
807        self.reader = multiprocessing.connection.Connection(fd, writable=False)
808        self.rlock = multiprocessing.Lock()
809
810    def wait(self, timeout=None):
811        return multiprocessing.connection.wait([self.reader], timeout)
812
813    def poll(self, timeout=None):
814        return self.reader.poll(timeout)
815
816    def get(self):
817        with bb.utils.lock_timeout(self.rlock):
818            res = self.reader.recv_bytes()
819        return multiprocessing.reduction.ForkingPickler.loads(res)
820
821    def fileno(self):
822        return self.reader.fileno()
823
824    def close(self):
825        return self.reader.close()
826
827
828class ConnectionWriter(object):
829
830    def __init__(self, fd):
831        self.writer = multiprocessing.connection.Connection(fd, readable=False)
832        self.wlock = multiprocessing.Lock()
833        # Why bb.event needs this I have no idea
834        self.event = self
835
836    def _send(self, obj):
837        gc.disable()
838        with bb.utils.lock_timeout(self.wlock):
839            self.writer.send_bytes(obj)
840        gc.enable()
841
842    def send(self, obj):
843        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
844        # See notes/code in CookerParser
845        # We must not terminate holding this lock else processes will hang.
846        # For SIGTERM, raising afterwards avoids this.
847        # For SIGINT, we don't want to have written partial data to the pipe.
848        # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
849        process = multiprocessing.current_process()
850        if process and hasattr(process, "queue_signals"):
851            with bb.utils.lock_timeout(process.signal_threadlock):
852                process.queue_signals = True
853                self._send(obj)
854                process.queue_signals = False
855                try:
856                    for sig in process.signal_received.pop():
857                        process.handle_sig(sig, None)
858                except IndexError:
859                    pass
860        else:
861            self._send(obj)
862
863    def fileno(self):
864        return self.writer.fileno()
865
866    def close(self):
867        return self.writer.close()
868