1#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9"""
10    This module implements a multiprocessing.Process based server for bitbake.
11"""
12
13import bb
14import bb.event
15import logging
16import multiprocessing
17import threading
18import array
19import os
20import sys
21import time
22import select
23import socket
24import subprocess
25import errno
26import re
27import datetime
28import pickle
29import traceback
30import gc
31import stat
32import bb.server.xmlrpcserver
33from bb import daemonize
34from multiprocessing import queues
35
36logger = logging.getLogger('BitBake')
37
38class ProcessTimeout(SystemExit):
39    pass
40
41def currenttime():
42    return datetime.datetime.now().strftime('%H:%M:%S.%f')
43
44def serverlog(msg):
45    print(str(os.getpid()) + " " +  currenttime() + " " + msg)
46    sys.stdout.flush()
47
48#
49# When we have lockfile issues, try and find infomation about which process is
50# using the lockfile
51#
52def get_lockfile_process_msg(lockfile):
53    # Some systems may not have lsof available
54    procs = None
55    try:
56        procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
57    except subprocess.CalledProcessError:
58        # File was deleted?
59        pass
60    except OSError as e:
61        if e.errno != errno.ENOENT:
62            raise
63    if procs is None:
64        # Fall back to fuser if lsof is unavailable
65        try:
66            procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
67        except subprocess.CalledProcessError:
68            # File was deleted?
69            pass
70        except OSError as e:
71            if e.errno != errno.ENOENT:
72                raise
73    if procs:
74        return procs.decode("utf-8")
75    return None
76
77class idleFinish():
78    def __init__(self, msg):
79         self.msg = msg
80
81class ProcessServer():
82    profile_filename = "profile.log"
83    profile_processed_filename = "profile.log.processed"
84
85    def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
86        self.command_channel = False
87        self.command_channel_reply = False
88        self.quit = False
89        self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
90        self.next_heartbeat = time.time()
91
92        self.event_handle = None
93        self.hadanyui = False
94        self.haveui = False
95        self.maxuiwait = 30
96        self.xmlrpc = False
97
98        self.idle = None
99        # Need a lock for _idlefuns changes
100        self._idlefuns = {}
101        self._idlefuncsLock = threading.Lock()
102        self.idle_cond = threading.Condition(self._idlefuncsLock)
103
104        self.bitbake_lock = lock
105        self.bitbake_lock_name = lockname
106        self.sock = sock
107        self.sockname = sockname
108        # It is possible the directory may be renamed. Cache the inode of the socket file
109        # so we can tell if things changed.
110        self.sockinode = os.stat(self.sockname)[stat.ST_INO]
111
112        self.server_timeout = server_timeout
113        self.timeout = self.server_timeout
114        self.xmlrpcinterface = xmlrpcinterface
115
116    def register_idle_function(self, function, data):
117        """Register a function to be called while the server is idle"""
118        assert hasattr(function, '__call__')
119        with bb.utils.lock_timeout(self._idlefuncsLock):
120            self._idlefuns[function] = data
121        serverlog("Registering idle function %s" % str(function))
122
123    def run(self):
124
125        if self.xmlrpcinterface[0]:
126            self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
127
128            serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
129
130        try:
131            self.bitbake_lock.seek(0)
132            self.bitbake_lock.truncate()
133            if self.xmlrpc:
134                self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
135            else:
136                self.bitbake_lock.write("%s\n" % (os.getpid()))
137            self.bitbake_lock.flush()
138        except Exception as e:
139            serverlog("Error writing to lock file: %s" % str(e))
140            pass
141
142        if self.cooker.configuration.profile:
143            try:
144                import cProfile as profile
145            except:
146                import profile
147            prof = profile.Profile()
148
149            ret = profile.Profile.runcall(prof, self.main)
150
151            prof.dump_stats("profile.log")
152            bb.utils.process_profilelog("profile.log")
153            serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
154
155        else:
156            ret = self.main()
157
158        return ret
159
160    def _idle_check(self):
161        return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
162
163    def wait_for_idle(self, timeout=30):
164        # Wait for the idle loop to have cleared
165        with bb.utils.lock_timeout(self._idlefuncsLock):
166            return self.idle_cond.wait_for(self._idle_check, timeout) is not False
167
168    def set_async_cmd(self, cmd):
169        with bb.utils.lock_timeout(self._idlefuncsLock):
170            ret = self.idle_cond.wait_for(self._idle_check, 30)
171            if ret is False:
172                return False
173            self.cooker.command.currentAsyncCommand = cmd
174            return True
175
176    def clear_async_cmd(self):
177        with bb.utils.lock_timeout(self._idlefuncsLock):
178            self.cooker.command.currentAsyncCommand = None
179            self.idle_cond.notify_all()
180
181    def get_async_cmd(self):
182        with bb.utils.lock_timeout(self._idlefuncsLock):
183            return self.cooker.command.currentAsyncCommand
184
185    def main(self):
186        self.cooker.pre_serve()
187
188        bb.utils.set_process_name("Cooker")
189
190        ready = []
191        newconnections = []
192
193        self.controllersock = False
194        fds = [self.sock]
195        if self.xmlrpc:
196            fds.append(self.xmlrpc)
197        seendata = False
198        serverlog("Entering server connection loop")
199        serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
200
201        def disconnect_client(self, fds):
202            serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
203            if self.controllersock:
204                fds.remove(self.controllersock)
205                self.controllersock.close()
206                self.controllersock = False
207            if self.haveui:
208                # Wait for the idle loop to have cleared (30s max)
209                if not self.wait_for_idle(30):
210                    serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
211                    self.quit = True
212                fds.remove(self.command_channel)
213                bb.event.unregister_UIHhandler(self.event_handle, True)
214                self.command_channel_reply.writer.close()
215                self.event_writer.writer.close()
216                self.command_channel.close()
217                self.command_channel = False
218                del self.event_writer
219                self.lastui = time.time()
220                self.cooker.clientComplete()
221                self.haveui = False
222            ready = select.select(fds,[],[],0)[0]
223            if newconnections and not self.quit:
224                serverlog("Starting new client")
225                conn = newconnections.pop(-1)
226                fds.append(conn)
227                self.controllersock = conn
228            elif not self.timeout and not ready:
229                serverlog("No timeout, exiting.")
230                self.quit = True
231
232        self.lastui = time.time()
233        while not self.quit:
234            if self.sock in ready:
235                while select.select([self.sock],[],[],0)[0]:
236                    controllersock, address = self.sock.accept()
237                    if self.controllersock:
238                        serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
239                        newconnections.append(controllersock)
240                    else:
241                        serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
242                        self.controllersock = controllersock
243                        fds.append(controllersock)
244            if self.controllersock in ready:
245                try:
246                    serverlog("Processing Client")
247                    ui_fds = recvfds(self.controllersock, 3)
248                    serverlog("Connecting Client")
249
250                    # Where to write events to
251                    writer = ConnectionWriter(ui_fds[0])
252                    self.event_handle = bb.event.register_UIHhandler(writer, True)
253                    self.event_writer = writer
254
255                    # Where to read commands from
256                    reader = ConnectionReader(ui_fds[1])
257                    fds.append(reader)
258                    self.command_channel = reader
259
260                    # Where to send command return values to
261                    writer = ConnectionWriter(ui_fds[2])
262                    self.command_channel_reply = writer
263
264                    self.haveui = True
265                    self.hadanyui = True
266
267                except (EOFError, OSError):
268                    disconnect_client(self, fds)
269
270            if not self.timeout == -1.0 and not self.haveui and self.timeout and \
271                    (self.lastui + self.timeout) < time.time():
272                serverlog("Server timeout, exiting.")
273                self.quit = True
274
275            # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
276            # one. We have had issue with processes hanging indefinitely so timing out UI-less
277            # servers is useful.
278            if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
279                serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
280                self.quit = True
281
282            if self.command_channel in ready:
283                try:
284                    command = self.command_channel.get()
285                except EOFError:
286                    # Client connection shutting down
287                    ready = []
288                    disconnect_client(self, fds)
289                    continue
290                if command[0] == "terminateServer":
291                    self.quit = True
292                    continue
293                try:
294                    serverlog("Running command %s" % command)
295                    reply = self.cooker.command.runCommand(command, self)
296                    serverlog("Sending reply %s" % repr(reply))
297                    self.command_channel_reply.send(reply)
298                    serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
299                except Exception as e:
300                   stack = traceback.format_exc()
301                   serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
302                   logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
303
304            if self.xmlrpc in ready:
305                self.xmlrpc.handle_requests()
306
307            if not seendata and hasattr(self.cooker, "data"):
308                heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
309                if heartbeat_event:
310                    try:
311                        self.heartbeat_seconds = float(heartbeat_event)
312                    except:
313                        bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
314
315                self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
316                try:
317                    if self.timeout:
318                        self.timeout = float(self.timeout)
319                except:
320                    bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
321                seendata = True
322
323            ready = self.idle_commands(.1, fds)
324
325        if self.idle:
326            self.idle.join()
327
328        serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
329        # Remove the socket file so we don't get any more connections to avoid races
330        # The build directory could have been renamed so if the file isn't the one we created
331        # we shouldn't delete it.
332        try:
333            sockinode = os.stat(self.sockname)[stat.ST_INO]
334            if sockinode == self.sockinode:
335                os.unlink(self.sockname)
336            else:
337                serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
338        except Exception as err:
339            serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
340        self.sock.close()
341
342        try:
343            self.cooker.shutdown(True, idle=False)
344            self.cooker.notifier.stop()
345            self.cooker.confignotifier.stop()
346        except:
347            pass
348
349        self.cooker.post_serve()
350
351        if len(threading.enumerate()) != 1:
352            serverlog("More than one thread left?: " + str(threading.enumerate()))
353
354        # Flush logs before we release the lock
355        sys.stdout.flush()
356        sys.stderr.flush()
357
358        # Finally release the lockfile but warn about other processes holding it open
359        lock = self.bitbake_lock
360        lockfile = self.bitbake_lock_name
361
362        def get_lock_contents(lockfile):
363            try:
364                with open(lockfile, "r") as f:
365                    return f.readlines()
366            except FileNotFoundError:
367                return None
368
369        lock.close()
370        lock = None
371
372        while not lock:
373            i = 0
374            lock = None
375            if not os.path.exists(os.path.basename(lockfile)):
376                serverlog("Lockfile directory gone, exiting.")
377                return
378
379            while not lock and i < 30:
380                lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
381                if not lock:
382                    newlockcontents = get_lock_contents(lockfile)
383                    if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
384                        # A new server was started, the lockfile contents changed, we can exit
385                        serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
386                        return
387                    time.sleep(0.1)
388                i += 1
389            if lock:
390                # We hold the lock so we can remove the file (hide stale pid data)
391                # via unlockfile.
392                bb.utils.unlockfile(lock)
393                serverlog("Exiting as we could obtain the lock")
394                return
395
396            if not lock:
397                procs = get_lockfile_process_msg(lockfile)
398                msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
399                if procs:
400                    msg.append(":\n%s" % procs)
401                serverlog("".join(msg))
402
403    def idle_thread(self):
404        def remove_idle_func(function):
405            with bb.utils.lock_timeout(self._idlefuncsLock):
406                del self._idlefuns[function]
407                self.idle_cond.notify_all()
408
409        while not self.quit:
410            nextsleep = 0.1
411            fds = []
412
413            try:
414                self.cooker.process_inotify_updates()
415            except Exception as exc:
416                serverlog("Exception %s in inofify updates broke the idle_thread, exiting" % traceback.format_exc())
417                self.quit = True
418
419            with bb.utils.lock_timeout(self._idlefuncsLock):
420                items = list(self._idlefuns.items())
421
422            for function, data in items:
423                try:
424                    retval = function(self, data, False)
425                    if isinstance(retval, idleFinish):
426                        serverlog("Removing idle function %s at idleFinish" % str(function))
427                        remove_idle_func(function)
428                        self.cooker.command.finishAsyncCommand(retval.msg)
429                        nextsleep = None
430                    elif retval is False:
431                        serverlog("Removing idle function %s" % str(function))
432                        remove_idle_func(function)
433                        nextsleep = None
434                    elif retval is True:
435                        nextsleep = None
436                    elif isinstance(retval, float) and nextsleep:
437                        if (retval < nextsleep):
438                            nextsleep = retval
439                    elif nextsleep is None:
440                        continue
441                    else:
442                        fds = fds + retval
443                except SystemExit:
444                    raise
445                except Exception as exc:
446                    if not isinstance(exc, bb.BBHandledException):
447                        logger.exception('Running idle function')
448                    remove_idle_func(function)
449                    serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
450                    self.quit = True
451
452            # Create new heartbeat event?
453            now = time.time()
454            if bb.event._heartbeat_enabled and now >= self.next_heartbeat:
455                # We might have missed heartbeats. Just trigger once in
456                # that case and continue after the usual delay.
457                self.next_heartbeat += self.heartbeat_seconds
458                if self.next_heartbeat <= now:
459                    self.next_heartbeat = now + self.heartbeat_seconds
460                if hasattr(self.cooker, "data"):
461                    heartbeat = bb.event.HeartbeatEvent(now)
462                    try:
463                        bb.event.fire(heartbeat, self.cooker.data)
464                    except Exception as exc:
465                        if not isinstance(exc, bb.BBHandledException):
466                            logger.exception('Running heartbeat function')
467                        serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
468                        self.quit = True
469            if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
470                # Shorten timeout so that we we wake up in time for
471                # the heartbeat.
472                nextsleep = self.next_heartbeat - now
473
474            if nextsleep is not None:
475                select.select(fds,[],[],nextsleep)[0]
476
477    def idle_commands(self, delay, fds=None):
478        nextsleep = delay
479        if not fds:
480            fds = []
481
482        if not self.idle:
483            self.idle = threading.Thread(target=self.idle_thread)
484            self.idle.start()
485        elif self.idle and not self.idle.is_alive():
486            serverlog("Idle thread terminated, main thread exiting too")
487            bb.error("Idle thread terminated, main thread exiting too")
488            self.quit = True
489
490        if nextsleep is not None:
491            if self.xmlrpc:
492                nextsleep = self.xmlrpc.get_timeout(nextsleep)
493            try:
494                return select.select(fds,[],[],nextsleep)[0]
495            except InterruptedError:
496                # Ignore EINTR
497                return []
498        else:
499            return select.select(fds,[],[],0)[0]
500
501
502class ServerCommunicator():
503    def __init__(self, connection, recv):
504        self.connection = connection
505        self.recv = recv
506
507    def runCommand(self, command):
508        self.connection.send(command)
509        if not self.recv.poll(30):
510            logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
511            if not self.recv.poll(30):
512                raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
513        ret, exc = self.recv.get()
514        # Should probably turn all exceptions in exc back into exceptions?
515        # For now, at least handle BBHandledException
516        if exc and ("BBHandledException" in exc or "SystemExit" in exc):
517            raise bb.BBHandledException()
518        return ret, exc
519
520    def updateFeatureSet(self, featureset):
521        _, error = self.runCommand(["setFeatures", featureset])
522        if error:
523            logger.error("Unable to set the cooker to the correct featureset: %s" % error)
524            raise BaseException(error)
525
526    def getEventHandle(self):
527        handle, error = self.runCommand(["getUIHandlerNum"])
528        if error:
529            logger.error("Unable to get UI Handler Number: %s" % error)
530            raise BaseException(error)
531
532        return handle
533
534    def terminateServer(self):
535        self.connection.send(['terminateServer'])
536        return
537
538class BitBakeProcessServerConnection(object):
539    def __init__(self, ui_channel, recv, eq, sock):
540        self.connection = ServerCommunicator(ui_channel, recv)
541        self.events = eq
542        # Save sock so it doesn't get gc'd for the life of our connection
543        self.socket_connection = sock
544
545    def terminate(self):
546        self.events.close()
547        self.socket_connection.close()
548        self.connection.connection.close()
549        self.connection.recv.close()
550        return
551
552start_log_format = '--- Starting bitbake server pid %s at %s ---'
553start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
554
555class BitBakeServer(object):
556
557    def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
558
559        self.server_timeout = server_timeout
560        self.xmlrpcinterface = xmlrpcinterface
561        self.featureset = featureset
562        self.sockname = sockname
563        self.bitbake_lock = lock
564        self.profile = profile
565        self.readypipe, self.readypipein = os.pipe()
566
567        # Place the log in the builddirectory alongside the lock file
568        logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
569        self.logfile = logfile
570
571        startdatetime = datetime.datetime.now()
572        bb.daemonize.createDaemon(self._startServer, logfile)
573        self.bitbake_lock.close()
574        os.close(self.readypipein)
575
576        ready = ConnectionReader(self.readypipe)
577        r = ready.poll(5)
578        if not r:
579            bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
580            r = ready.poll(90)
581        if r:
582            try:
583                r = ready.get()
584            except EOFError:
585                # Trap the child exiting/closing the pipe and error out
586                r = None
587        if not r or r[0] != "r":
588            ready.close()
589            bb.error("Unable to start bitbake server (%s)" % str(r))
590            if os.path.exists(logfile):
591                logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
592                started = False
593                lines = []
594                lastlines = []
595                with open(logfile, "r") as f:
596                    for line in f:
597                        if started:
598                            lines.append(line)
599                        else:
600                            lastlines.append(line)
601                            res = logstart_re.search(line.rstrip())
602                            if res:
603                                ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
604                                if ldatetime >= startdatetime:
605                                    started = True
606                                    lines.append(line)
607                        if len(lastlines) > 60:
608                            lastlines = lastlines[-60:]
609                if lines:
610                    if len(lines) > 60:
611                        bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
612                    else:
613                        bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
614                elif lastlines:
615                        bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
616            else:
617                bb.error("%s doesn't exist" % logfile)
618
619            raise SystemExit(1)
620
621        ready.close()
622
623    def _startServer(self):
624        os.close(self.readypipe)
625        os.set_inheritable(self.bitbake_lock.fileno(), True)
626        os.set_inheritable(self.readypipein, True)
627        serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
628        os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname,  str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
629
630def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
631
632    import bb.cookerdata
633    import bb.cooker
634
635    serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
636
637    try:
638        bitbake_lock = os.fdopen(lockfd, "w")
639
640        # Create server control socket
641        if os.path.exists(sockname):
642            serverlog("WARNING: removing existing socket file '%s'" % sockname)
643            os.unlink(sockname)
644
645        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
646        # AF_UNIX has path length issues so chdir here to workaround
647        cwd = os.getcwd()
648        try:
649            os.chdir(os.path.dirname(sockname))
650            sock.bind(os.path.basename(sockname))
651        finally:
652            os.chdir(cwd)
653        sock.listen(1)
654
655        server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
656        writer = ConnectionWriter(readypipeinfd)
657        try:
658            featureset = []
659            cooker = bb.cooker.BBCooker(featureset, server)
660            cooker.configuration.profile = profile
661        except bb.BBHandledException:
662            return None
663        writer.send("r")
664        writer.close()
665        server.cooker = cooker
666        serverlog("Started bitbake server pid %d" % os.getpid())
667
668        server.run()
669    finally:
670        # Flush any messages/errors to the logfile before exit
671        sys.stdout.flush()
672        sys.stderr.flush()
673
674def connectProcessServer(sockname, featureset):
675    # Connect to socket
676    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
677    # AF_UNIX has path length issues so chdir here to workaround
678    cwd = os.getcwd()
679
680    readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
681    eq = command_chan_recv = command_chan = None
682
683    sock.settimeout(10)
684
685    try:
686        try:
687            os.chdir(os.path.dirname(sockname))
688            finished = False
689            while not finished:
690                try:
691                    sock.connect(os.path.basename(sockname))
692                    finished = True
693                except IOError as e:
694                    if e.errno == errno.EWOULDBLOCK:
695                        pass
696                    raise
697        finally:
698            os.chdir(cwd)
699
700        # Send an fd for the remote to write events to
701        readfd, writefd = os.pipe()
702        eq = BBUIEventQueue(readfd)
703        # Send an fd for the remote to recieve commands from
704        readfd1, writefd1 = os.pipe()
705        command_chan = ConnectionWriter(writefd1)
706        # Send an fd for the remote to write commands results to
707        readfd2, writefd2 = os.pipe()
708        command_chan_recv = ConnectionReader(readfd2)
709
710        sendfds(sock, [writefd, readfd1, writefd2])
711
712        server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
713
714        # Close the ends of the pipes we won't use
715        for i in [writefd, readfd1, writefd2]:
716            os.close(i)
717
718        server_connection.connection.updateFeatureSet(featureset)
719
720    except (Exception, SystemExit) as e:
721        if command_chan_recv:
722            command_chan_recv.close()
723        if command_chan:
724            command_chan.close()
725        for i in [writefd, readfd1, writefd2]:
726            try:
727                if i:
728                    os.close(i)
729            except OSError:
730                pass
731        sock.close()
732        raise
733
734    return server_connection
735
736def sendfds(sock, fds):
737        '''Send an array of fds over an AF_UNIX socket.'''
738        fds = array.array('i', fds)
739        msg = bytes([len(fds) % 256])
740        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
741
742def recvfds(sock, size):
743        '''Receive an array of fds over an AF_UNIX socket.'''
744        a = array.array('i')
745        bytes_size = a.itemsize * size
746        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
747        if not msg and not ancdata:
748            raise EOFError
749        try:
750            if len(ancdata) != 1:
751                raise RuntimeError('received %d items of ancdata' %
752                                   len(ancdata))
753            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
754            if (cmsg_level == socket.SOL_SOCKET and
755                cmsg_type == socket.SCM_RIGHTS):
756                if len(cmsg_data) % a.itemsize != 0:
757                    raise ValueError
758                a.frombytes(cmsg_data)
759                assert len(a) % 256 == msg[0]
760                return list(a)
761        except (ValueError, IndexError):
762            pass
763        raise RuntimeError('Invalid data received')
764
765class BBUIEventQueue:
766    def __init__(self, readfd):
767
768        self.eventQueue = []
769        self.eventQueueLock = threading.Lock()
770        self.eventQueueNotify = threading.Event()
771
772        self.reader = ConnectionReader(readfd)
773
774        self.t = threading.Thread()
775        self.t.run = self.startCallbackHandler
776        self.t.start()
777
778    def getEvent(self):
779        with bb.utils.lock_timeout(self.eventQueueLock):
780            if len(self.eventQueue) == 0:
781                return None
782
783            item = self.eventQueue.pop(0)
784            if len(self.eventQueue) == 0:
785                self.eventQueueNotify.clear()
786
787        return item
788
789    def waitEvent(self, delay):
790        self.eventQueueNotify.wait(delay)
791        return self.getEvent()
792
793    def queue_event(self, event):
794        with bb.utils.lock_timeout(self.eventQueueLock):
795            self.eventQueue.append(event)
796            self.eventQueueNotify.set()
797
798    def send_event(self, event):
799        self.queue_event(pickle.loads(event))
800
801    def startCallbackHandler(self):
802        bb.utils.set_process_name("UIEventQueue")
803        while True:
804            try:
805                ready = self.reader.wait(0.25)
806                if ready:
807                    event = self.reader.get()
808                    self.queue_event(event)
809            except (EOFError, OSError, TypeError):
810                # Easiest way to exit is to close the file descriptor to cause an exit
811                break
812
813    def close(self):
814        self.reader.close()
815        self.t.join()
816
817class ConnectionReader(object):
818
819    def __init__(self, fd):
820        self.reader = multiprocessing.connection.Connection(fd, writable=False)
821        self.rlock = multiprocessing.Lock()
822
823    def wait(self, timeout=None):
824        return multiprocessing.connection.wait([self.reader], timeout)
825
826    def poll(self, timeout=None):
827        return self.reader.poll(timeout)
828
829    def get(self):
830        with bb.utils.lock_timeout(self.rlock):
831            res = self.reader.recv_bytes()
832        return multiprocessing.reduction.ForkingPickler.loads(res)
833
834    def fileno(self):
835        return self.reader.fileno()
836
837    def close(self):
838        return self.reader.close()
839
840
841class ConnectionWriter(object):
842
843    def __init__(self, fd):
844        self.writer = multiprocessing.connection.Connection(fd, readable=False)
845        self.wlock = multiprocessing.Lock()
846        # Why bb.event needs this I have no idea
847        self.event = self
848
849    def _send(self, obj):
850        gc.disable()
851        with bb.utils.lock_timeout(self.wlock):
852            self.writer.send_bytes(obj)
853        gc.enable()
854
855    def send(self, obj):
856        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
857        # See notes/code in CookerParser
858        # We must not terminate holding this lock else processes will hang.
859        # For SIGTERM, raising afterwards avoids this.
860        # For SIGINT, we don't want to have written partial data to the pipe.
861        # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
862        process = multiprocessing.current_process()
863        if process and hasattr(process, "queue_signals"):
864            with bb.utils.lock_timeout(process.signal_threadlock):
865                process.queue_signals = True
866                self._send(obj)
867                process.queue_signals = False
868
869                while len(process.signal_received) > 0:
870                    sig = process.signal_received.pop()
871                    process.handle_sig(sig, None)
872        else:
873            self._send(obj)
874
875    def fileno(self):
876        return self.writer.fileno()
877
878    def close(self):
879        return self.writer.close()
880