xref: /openbmc/openbmc/poky/bitbake/lib/bb/server/process.py (revision c124f4f2e04dca16a428a76c89677328bc7bf908)
1#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9"""
10    This module implements a multiprocessing.Process based server for bitbake.
11"""
12
13import bb
14import bb.event
15import logging
16import multiprocessing
17import threading
18import array
19import os
20import sys
21import time
22import select
23import socket
24import subprocess
25import errno
26import re
27import datetime
28import pickle
29import traceback
30import gc
31import stat
32import bb.server.xmlrpcserver
33from bb import daemonize
34from multiprocessing import queues
35
36logger = logging.getLogger('BitBake')
37
38class ProcessTimeout(SystemExit):
39    pass
40
41def currenttime():
42    return datetime.datetime.now().strftime('%H:%M:%S.%f')
43
44def serverlog(msg):
45    print(str(os.getpid()) + " " +  currenttime() + " " + msg)
46    #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
47    #sys.stdout.flush()
48
49#
50# When we have lockfile issues, try and find infomation about which process is
51# using the lockfile
52#
53def get_lockfile_process_msg(lockfile):
54    # Some systems may not have lsof available
55    procs = None
56    try:
57        procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
58    except subprocess.CalledProcessError:
59        # File was deleted?
60        pass
61    except OSError as e:
62        if e.errno != errno.ENOENT:
63            raise
64    if procs is None:
65        # Fall back to fuser if lsof is unavailable
66        try:
67            procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
68        except subprocess.CalledProcessError:
69            # File was deleted?
70            pass
71        except OSError as e:
72            if e.errno != errno.ENOENT:
73                raise
74    if procs:
75        return procs.decode("utf-8")
76    return None
77
78class idleFinish():
79    def __init__(self, msg):
80         self.msg = msg
81
82class ProcessServer():
83    profile_filename = "profile.log"
84    profile_processed_filename = "profile.log.processed"
85
86    def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
87        self.command_channel = False
88        self.command_channel_reply = False
89        self.quit = False
90        self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
91        self.next_heartbeat = time.time()
92
93        self.event_handle = None
94        self.hadanyui = False
95        self.haveui = False
96        self.maxuiwait = 30
97        self.xmlrpc = False
98
99        self.idle = None
100        # Need a lock for _idlefuns changes
101        self._idlefuns = {}
102        self._idlefuncsLock = threading.Lock()
103        self.idle_cond = threading.Condition(self._idlefuncsLock)
104
105        self.bitbake_lock = lock
106        self.bitbake_lock_name = lockname
107        self.sock = sock
108        self.sockname = sockname
109        # It is possible the directory may be renamed. Cache the inode of the socket file
110        # so we can tell if things changed.
111        self.sockinode = os.stat(self.sockname)[stat.ST_INO]
112
113        self.server_timeout = server_timeout
114        self.timeout = self.server_timeout
115        self.xmlrpcinterface = xmlrpcinterface
116
117    def register_idle_function(self, function, data):
118        """Register a function to be called while the server is idle"""
119        assert hasattr(function, '__call__')
120        with bb.utils.lock_timeout(self._idlefuncsLock):
121            self._idlefuns[function] = data
122        serverlog("Registering idle function %s" % str(function))
123
124    def run(self):
125
126        if self.xmlrpcinterface[0]:
127            self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
128
129            serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
130
131        try:
132            self.bitbake_lock.seek(0)
133            self.bitbake_lock.truncate()
134            if self.xmlrpc:
135                self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
136            else:
137                self.bitbake_lock.write("%s\n" % (os.getpid()))
138            self.bitbake_lock.flush()
139        except Exception as e:
140            serverlog("Error writing to lock file: %s" % str(e))
141            pass
142
143        if self.cooker.configuration.profile:
144            try:
145                import cProfile as profile
146            except:
147                import profile
148            prof = profile.Profile()
149
150            ret = profile.Profile.runcall(prof, self.main)
151
152            prof.dump_stats("profile.log")
153            bb.utils.process_profilelog("profile.log")
154            serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
155
156        else:
157            ret = self.main()
158
159        return ret
160
161    def _idle_check(self):
162        return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
163
164    def wait_for_idle(self, timeout=30):
165        # Wait for the idle loop to have cleared
166        with bb.utils.lock_timeout(self._idlefuncsLock):
167            return self.idle_cond.wait_for(self._idle_check, timeout) is not False
168
169    def set_async_cmd(self, cmd):
170        with bb.utils.lock_timeout(self._idlefuncsLock):
171            ret = self.idle_cond.wait_for(self._idle_check, 30)
172            if ret is False:
173                return False
174            self.cooker.command.currentAsyncCommand = cmd
175            return True
176
177    def clear_async_cmd(self):
178        with bb.utils.lock_timeout(self._idlefuncsLock):
179            self.cooker.command.currentAsyncCommand = None
180            self.idle_cond.notify_all()
181
182    def get_async_cmd(self):
183        with bb.utils.lock_timeout(self._idlefuncsLock):
184            return self.cooker.command.currentAsyncCommand
185
186    def main(self):
187        self.cooker.pre_serve()
188
189        bb.utils.set_process_name("Cooker")
190
191        ready = []
192        newconnections = []
193
194        self.controllersock = False
195        fds = [self.sock]
196        if self.xmlrpc:
197            fds.append(self.xmlrpc)
198        seendata = False
199        serverlog("Entering server connection loop")
200        serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
201
202        def disconnect_client(self, fds):
203            serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
204            if self.controllersock:
205                fds.remove(self.controllersock)
206                self.controllersock.close()
207                self.controllersock = False
208            if self.haveui:
209                # Wait for the idle loop to have cleared (30s max)
210                if not self.wait_for_idle(30):
211                    serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
212                    self.quit = True
213                fds.remove(self.command_channel)
214                bb.event.unregister_UIHhandler(self.event_handle, True)
215                self.command_channel_reply.writer.close()
216                self.event_writer.writer.close()
217                self.command_channel.close()
218                self.command_channel = False
219                del self.event_writer
220                self.lastui = time.time()
221                self.cooker.clientComplete()
222                self.haveui = False
223            ready = select.select(fds,[],[],0)[0]
224            if newconnections and not self.quit:
225                serverlog("Starting new client")
226                conn = newconnections.pop(-1)
227                fds.append(conn)
228                self.controllersock = conn
229            elif not self.timeout and not ready:
230                serverlog("No timeout, exiting.")
231                self.quit = True
232
233        self.lastui = time.time()
234        while not self.quit:
235            if self.sock in ready:
236                while select.select([self.sock],[],[],0)[0]:
237                    controllersock, address = self.sock.accept()
238                    if self.controllersock:
239                        serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
240                        newconnections.append(controllersock)
241                    else:
242                        serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
243                        self.controllersock = controllersock
244                        fds.append(controllersock)
245            if self.controllersock in ready:
246                try:
247                    serverlog("Processing Client")
248                    ui_fds = recvfds(self.controllersock, 3)
249                    serverlog("Connecting Client")
250
251                    # Where to write events to
252                    writer = ConnectionWriter(ui_fds[0])
253                    self.event_handle = bb.event.register_UIHhandler(writer, True)
254                    self.event_writer = writer
255
256                    # Where to read commands from
257                    reader = ConnectionReader(ui_fds[1])
258                    fds.append(reader)
259                    self.command_channel = reader
260
261                    # Where to send command return values to
262                    writer = ConnectionWriter(ui_fds[2])
263                    self.command_channel_reply = writer
264
265                    self.haveui = True
266                    self.hadanyui = True
267
268                except (EOFError, OSError):
269                    disconnect_client(self, fds)
270
271            if not self.timeout == -1.0 and not self.haveui and self.timeout and \
272                    (self.lastui + self.timeout) < time.time():
273                serverlog("Server timeout, exiting.")
274                self.quit = True
275
276            # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
277            # one. We have had issue with processes hanging indefinitely so timing out UI-less
278            # servers is useful.
279            if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
280                serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
281                self.quit = True
282
283            if self.command_channel in ready:
284                try:
285                    command = self.command_channel.get()
286                except EOFError:
287                    # Client connection shutting down
288                    ready = []
289                    disconnect_client(self, fds)
290                    continue
291                if command[0] == "terminateServer":
292                    self.quit = True
293                    continue
294                try:
295                    serverlog("Running command %s" % command)
296                    reply = self.cooker.command.runCommand(command, self)
297                    serverlog("Sending reply %s" % repr(reply))
298                    self.command_channel_reply.send(reply)
299                    serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
300                except Exception as e:
301                   stack = traceback.format_exc()
302                   serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
303                   logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
304
305            if self.xmlrpc in ready:
306                self.xmlrpc.handle_requests()
307
308            if not seendata and hasattr(self.cooker, "data"):
309                heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
310                if heartbeat_event:
311                    try:
312                        self.heartbeat_seconds = float(heartbeat_event)
313                    except:
314                        bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
315
316                self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
317                try:
318                    if self.timeout:
319                        self.timeout = float(self.timeout)
320                except:
321                    bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
322                seendata = True
323
324            if not self.idle:
325                self.idle = threading.Thread(target=self.idle_thread)
326                self.idle.start()
327            elif self.idle and not self.idle.is_alive():
328                serverlog("Idle thread terminated, main thread exiting too")
329                bb.error("Idle thread terminated, main thread exiting too")
330                self.quit = True
331
332            nextsleep = 1.0
333            if self.xmlrpc:
334                nextsleep = self.xmlrpc.get_timeout(nextsleep)
335            try:
336                ready = select.select(fds,[],[],nextsleep)[0]
337            except InterruptedError:
338                # Ignore EINTR
339                ready = []
340
341        if self.idle:
342            self.idle.join()
343
344        serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
345        # Remove the socket file so we don't get any more connections to avoid races
346        # The build directory could have been renamed so if the file isn't the one we created
347        # we shouldn't delete it.
348        try:
349            sockinode = os.stat(self.sockname)[stat.ST_INO]
350            if sockinode == self.sockinode:
351                os.unlink(self.sockname)
352            else:
353                serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
354        except Exception as err:
355            serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
356        self.sock.close()
357
358        try:
359            self.cooker.shutdown(True, idle=False)
360            self.cooker.notifier.stop()
361            self.cooker.confignotifier.stop()
362        except:
363            pass
364
365        self.cooker.post_serve()
366
367        if len(threading.enumerate()) != 1:
368            serverlog("More than one thread left?: " + str(threading.enumerate()))
369
370        # Flush logs before we release the lock
371        sys.stdout.flush()
372        sys.stderr.flush()
373
374        # Finally release the lockfile but warn about other processes holding it open
375        lock = self.bitbake_lock
376        lockfile = self.bitbake_lock_name
377
378        def get_lock_contents(lockfile):
379            try:
380                with open(lockfile, "r") as f:
381                    return f.readlines()
382            except FileNotFoundError:
383                return None
384
385        lock.close()
386        lock = None
387
388        while not lock:
389            i = 0
390            lock = None
391            if not os.path.exists(os.path.basename(lockfile)):
392                serverlog("Lockfile directory gone, exiting.")
393                return
394
395            while not lock and i < 30:
396                lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
397                if not lock:
398                    newlockcontents = get_lock_contents(lockfile)
399                    if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
400                        # A new server was started, the lockfile contents changed, we can exit
401                        serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
402                        return
403                    time.sleep(0.1)
404                i += 1
405            if lock:
406                # We hold the lock so we can remove the file (hide stale pid data)
407                # via unlockfile.
408                bb.utils.unlockfile(lock)
409                serverlog("Exiting as we could obtain the lock")
410                return
411
412            if not lock:
413                procs = get_lockfile_process_msg(lockfile)
414                msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
415                if procs:
416                    msg.append(":\n%s" % procs)
417                serverlog("".join(msg))
418
419    def idle_thread(self):
420        if self.cooker.configuration.profile:
421            try:
422                import cProfile as profile
423            except:
424                import profile
425            prof = profile.Profile()
426
427            ret = profile.Profile.runcall(prof, self.idle_thread_internal)
428
429            prof.dump_stats("profile-mainloop.log")
430            bb.utils.process_profilelog("profile-mainloop.log")
431            serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed")
432        else:
433            self.idle_thread_internal()
434
435    def idle_thread_internal(self):
436        def remove_idle_func(function):
437            with bb.utils.lock_timeout(self._idlefuncsLock):
438                del self._idlefuns[function]
439                self.idle_cond.notify_all()
440
441        while not self.quit:
442            nextsleep = 1.0
443            fds = []
444
445            with bb.utils.lock_timeout(self._idlefuncsLock):
446                items = list(self._idlefuns.items())
447
448            for function, data in items:
449                try:
450                    retval = function(self, data, False)
451                    if isinstance(retval, idleFinish):
452                        serverlog("Removing idle function %s at idleFinish" % str(function))
453                        remove_idle_func(function)
454                        self.cooker.command.finishAsyncCommand(retval.msg)
455                        nextsleep = None
456                    elif retval is False:
457                        serverlog("Removing idle function %s" % str(function))
458                        remove_idle_func(function)
459                        nextsleep = None
460                    elif retval is True:
461                        nextsleep = None
462                    elif isinstance(retval, float) and nextsleep:
463                        if (retval < nextsleep):
464                            nextsleep = retval
465                    elif nextsleep is None:
466                        continue
467                    else:
468                        fds = fds + retval
469                except SystemExit:
470                    raise
471                except Exception as exc:
472                    if not isinstance(exc, bb.BBHandledException):
473                        logger.exception('Running idle function')
474                    remove_idle_func(function)
475                    serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
476                    self.quit = True
477
478            # Create new heartbeat event?
479            now = time.time()
480            if items and bb.event._heartbeat_enabled and now >= self.next_heartbeat:
481                # We might have missed heartbeats. Just trigger once in
482                # that case and continue after the usual delay.
483                self.next_heartbeat += self.heartbeat_seconds
484                if self.next_heartbeat <= now:
485                    self.next_heartbeat = now + self.heartbeat_seconds
486                if hasattr(self.cooker, "data"):
487                    heartbeat = bb.event.HeartbeatEvent(now)
488                    try:
489                        bb.event.fire(heartbeat, self.cooker.data)
490                    except Exception as exc:
491                        if not isinstance(exc, bb.BBHandledException):
492                            logger.exception('Running heartbeat function')
493                        serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
494                        self.quit = True
495            if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
496                # Shorten timeout so that we we wake up in time for
497                # the heartbeat.
498                nextsleep = self.next_heartbeat - now
499
500            if nextsleep is not None:
501                select.select(fds,[],[],nextsleep)[0]
502
503class ServerCommunicator():
504    def __init__(self, connection, recv):
505        self.connection = connection
506        self.recv = recv
507
508    def runCommand(self, command):
509        try:
510            self.connection.send(command)
511        except BrokenPipeError as e:
512            raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
513        if not self.recv.poll(30):
514            logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
515            if not self.recv.poll(30):
516                raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
517        try:
518            ret, exc = self.recv.get()
519        except EOFError as e:
520            raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
521        # Should probably turn all exceptions in exc back into exceptions?
522        # For now, at least handle BBHandledException
523        if exc and ("BBHandledException" in exc or "SystemExit" in exc):
524            raise bb.BBHandledException()
525        return ret, exc
526
527    def updateFeatureSet(self, featureset):
528        _, error = self.runCommand(["setFeatures", featureset])
529        if error:
530            logger.error("Unable to set the cooker to the correct featureset: %s" % error)
531            raise BaseException(error)
532
533    def getEventHandle(self):
534        handle, error = self.runCommand(["getUIHandlerNum"])
535        if error:
536            logger.error("Unable to get UI Handler Number: %s" % error)
537            raise BaseException(error)
538
539        return handle
540
541    def terminateServer(self):
542        self.connection.send(['terminateServer'])
543        return
544
545class BitBakeProcessServerConnection(object):
546    def __init__(self, ui_channel, recv, eq, sock):
547        self.connection = ServerCommunicator(ui_channel, recv)
548        self.events = eq
549        # Save sock so it doesn't get gc'd for the life of our connection
550        self.socket_connection = sock
551
552    def terminate(self):
553        self.events.close()
554        self.socket_connection.close()
555        self.connection.connection.close()
556        self.connection.recv.close()
557        return
558
559start_log_format = '--- Starting bitbake server pid %s at %s ---'
560start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
561
562class BitBakeServer(object):
563
564    def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
565
566        self.server_timeout = server_timeout
567        self.xmlrpcinterface = xmlrpcinterface
568        self.featureset = featureset
569        self.sockname = sockname
570        self.bitbake_lock = lock
571        self.profile = profile
572        self.readypipe, self.readypipein = os.pipe()
573
574        # Place the log in the builddirectory alongside the lock file
575        logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
576        self.logfile = logfile
577
578        startdatetime = datetime.datetime.now()
579        bb.daemonize.createDaemon(self._startServer, logfile)
580        self.bitbake_lock.close()
581        os.close(self.readypipein)
582
583        ready = ConnectionReader(self.readypipe)
584        r = ready.poll(5)
585        if not r:
586            bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
587            r = ready.poll(90)
588        if r:
589            try:
590                r = ready.get()
591            except EOFError:
592                # Trap the child exiting/closing the pipe and error out
593                r = None
594        if not r or r[0] != "r":
595            ready.close()
596            bb.error("Unable to start bitbake server (%s)" % str(r))
597            if os.path.exists(logfile):
598                logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
599                started = False
600                lines = []
601                lastlines = []
602                with open(logfile, "r") as f:
603                    for line in f:
604                        if started:
605                            lines.append(line)
606                        else:
607                            lastlines.append(line)
608                            res = logstart_re.search(line.rstrip())
609                            if res:
610                                ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
611                                if ldatetime >= startdatetime:
612                                    started = True
613                                    lines.append(line)
614                        if len(lastlines) > 60:
615                            lastlines = lastlines[-60:]
616                if lines:
617                    if len(lines) > 60:
618                        bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
619                    else:
620                        bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
621                elif lastlines:
622                        bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
623            else:
624                bb.error("%s doesn't exist" % logfile)
625
626            raise SystemExit(1)
627
628        ready.close()
629
630    def _startServer(self):
631        os.close(self.readypipe)
632        os.set_inheritable(self.bitbake_lock.fileno(), True)
633        os.set_inheritable(self.readypipein, True)
634        serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
635        os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname,  str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
636
637def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
638
639    import bb.cookerdata
640    import bb.cooker
641
642    serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
643
644    try:
645        bitbake_lock = os.fdopen(lockfd, "w")
646
647        # Create server control socket
648        if os.path.exists(sockname):
649            serverlog("WARNING: removing existing socket file '%s'" % sockname)
650            os.unlink(sockname)
651
652        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
653        # AF_UNIX has path length issues so chdir here to workaround
654        cwd = os.getcwd()
655        try:
656            os.chdir(os.path.dirname(sockname))
657            sock.bind(os.path.basename(sockname))
658        finally:
659            os.chdir(cwd)
660        sock.listen(1)
661
662        server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
663        writer = ConnectionWriter(readypipeinfd)
664        try:
665            featureset = []
666            cooker = bb.cooker.BBCooker(featureset, server)
667            cooker.configuration.profile = profile
668        except bb.BBHandledException:
669            return None
670        writer.send("r")
671        writer.close()
672        server.cooker = cooker
673        serverlog("Started bitbake server pid %d" % os.getpid())
674
675        server.run()
676    finally:
677        # Flush any messages/errors to the logfile before exit
678        sys.stdout.flush()
679        sys.stderr.flush()
680
681def connectProcessServer(sockname, featureset):
682    # Connect to socket
683    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
684    # AF_UNIX has path length issues so chdir here to workaround
685    cwd = os.getcwd()
686
687    readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
688    eq = command_chan_recv = command_chan = None
689
690    sock.settimeout(10)
691
692    try:
693        try:
694            os.chdir(os.path.dirname(sockname))
695            finished = False
696            while not finished:
697                try:
698                    sock.connect(os.path.basename(sockname))
699                    finished = True
700                except IOError as e:
701                    if e.errno == errno.EWOULDBLOCK:
702                        pass
703                    raise
704        finally:
705            os.chdir(cwd)
706
707        # Send an fd for the remote to write events to
708        readfd, writefd = os.pipe()
709        eq = BBUIEventQueue(readfd)
710        # Send an fd for the remote to recieve commands from
711        readfd1, writefd1 = os.pipe()
712        command_chan = ConnectionWriter(writefd1)
713        # Send an fd for the remote to write commands results to
714        readfd2, writefd2 = os.pipe()
715        command_chan_recv = ConnectionReader(readfd2)
716
717        sendfds(sock, [writefd, readfd1, writefd2])
718
719        server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
720
721        # Close the ends of the pipes we won't use
722        for i in [writefd, readfd1, writefd2]:
723            os.close(i)
724
725        server_connection.connection.updateFeatureSet(featureset)
726
727    except (Exception, SystemExit) as e:
728        if command_chan_recv:
729            command_chan_recv.close()
730        if command_chan:
731            command_chan.close()
732        for i in [writefd, readfd1, writefd2]:
733            try:
734                if i:
735                    os.close(i)
736            except OSError:
737                pass
738        sock.close()
739        raise
740
741    return server_connection
742
743def sendfds(sock, fds):
744        '''Send an array of fds over an AF_UNIX socket.'''
745        fds = array.array('i', fds)
746        msg = bytes([len(fds) % 256])
747        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
748
749def recvfds(sock, size):
750        '''Receive an array of fds over an AF_UNIX socket.'''
751        a = array.array('i')
752        bytes_size = a.itemsize * size
753        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
754        if not msg and not ancdata:
755            raise EOFError
756        try:
757            if len(ancdata) != 1:
758                raise RuntimeError('received %d items of ancdata' %
759                                   len(ancdata))
760            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
761            if (cmsg_level == socket.SOL_SOCKET and
762                cmsg_type == socket.SCM_RIGHTS):
763                if len(cmsg_data) % a.itemsize != 0:
764                    raise ValueError
765                a.frombytes(cmsg_data)
766                assert len(a) % 256 == msg[0]
767                return list(a)
768        except (ValueError, IndexError):
769            pass
770        raise RuntimeError('Invalid data received')
771
772class BBUIEventQueue:
773    def __init__(self, readfd):
774
775        self.eventQueue = []
776        self.eventQueueLock = threading.Lock()
777        self.eventQueueNotify = threading.Event()
778
779        self.reader = ConnectionReader(readfd)
780
781        self.t = threading.Thread()
782        self.t.run = self.startCallbackHandler
783        self.t.start()
784
785    def getEvent(self):
786        with bb.utils.lock_timeout(self.eventQueueLock):
787            if len(self.eventQueue) == 0:
788                return None
789
790            item = self.eventQueue.pop(0)
791            if len(self.eventQueue) == 0:
792                self.eventQueueNotify.clear()
793
794        return item
795
796    def waitEvent(self, delay):
797        self.eventQueueNotify.wait(delay)
798        return self.getEvent()
799
800    def queue_event(self, event):
801        with bb.utils.lock_timeout(self.eventQueueLock):
802            self.eventQueue.append(event)
803            self.eventQueueNotify.set()
804
805    def send_event(self, event):
806        self.queue_event(pickle.loads(event))
807
808    def startCallbackHandler(self):
809        bb.utils.set_process_name("UIEventQueue")
810        while True:
811            try:
812                ready = self.reader.wait(0.25)
813                if ready:
814                    event = self.reader.get()
815                    self.queue_event(event)
816            except (EOFError, OSError, TypeError):
817                # Easiest way to exit is to close the file descriptor to cause an exit
818                break
819
820    def close(self):
821        self.reader.close()
822        self.t.join()
823
824class ConnectionReader(object):
825
826    def __init__(self, fd):
827        self.reader = multiprocessing.connection.Connection(fd, writable=False)
828        self.rlock = multiprocessing.Lock()
829
830    def wait(self, timeout=None):
831        return multiprocessing.connection.wait([self.reader], timeout)
832
833    def poll(self, timeout=None):
834        return self.reader.poll(timeout)
835
836    def get(self):
837        with bb.utils.lock_timeout(self.rlock):
838            res = self.reader.recv_bytes()
839        return multiprocessing.reduction.ForkingPickler.loads(res)
840
841    def fileno(self):
842        return self.reader.fileno()
843
844    def close(self):
845        return self.reader.close()
846
847
848class ConnectionWriter(object):
849
850    def __init__(self, fd):
851        self.writer = multiprocessing.connection.Connection(fd, readable=False)
852        self.wlock = multiprocessing.Lock()
853        # Why bb.event needs this I have no idea
854        self.event = self
855
856    def _send(self, obj):
857        gc.disable()
858        with bb.utils.lock_timeout(self.wlock):
859            self.writer.send_bytes(obj)
860        gc.enable()
861
862    def send(self, obj):
863        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
864        # See notes/code in CookerParser
865        # We must not terminate holding this lock else processes will hang.
866        # For SIGTERM, raising afterwards avoids this.
867        # For SIGINT, we don't want to have written partial data to the pipe.
868        # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
869        process = multiprocessing.current_process()
870        if process and hasattr(process, "queue_signals"):
871            with bb.utils.lock_timeout(process.signal_threadlock):
872                process.queue_signals = True
873                self._send(obj)
874                process.queue_signals = False
875
876                while len(process.signal_received) > 0:
877                    sig = process.signal_received.pop()
878                    process.handle_sig(sig, None)
879        else:
880            self._send(obj)
881
882    def fileno(self):
883        return self.writer.fileno()
884
885    def close(self):
886        return self.writer.close()
887