1#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9"""
10    This module implements a multiprocessing.Process based server for bitbake.
11"""
12
13import bb
14import bb.event
15import logging
16import multiprocessing
17import threading
18import array
19import os
20import sys
21import time
22import select
23import socket
24import subprocess
25import errno
26import re
27import datetime
28import pickle
29import traceback
30import gc
31import stat
32import bb.server.xmlrpcserver
33from bb import daemonize
34from multiprocessing import queues
35
36logger = logging.getLogger('BitBake')
37
38class ProcessTimeout(SystemExit):
39    pass
40
41def currenttime():
42    return datetime.datetime.now().strftime('%H:%M:%S.%f')
43
44def serverlog(msg):
45    print(str(os.getpid()) + " " +  currenttime() + " " + msg)
46    #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
47    #sys.stdout.flush()
48
49#
50# When we have lockfile issues, try and find infomation about which process is
51# using the lockfile
52#
53def get_lockfile_process_msg(lockfile):
54    # Some systems may not have lsof available
55    procs = None
56    try:
57        procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
58    except subprocess.CalledProcessError:
59        # File was deleted?
60        pass
61    except OSError as e:
62        if e.errno != errno.ENOENT:
63            raise
64    if procs is None:
65        # Fall back to fuser if lsof is unavailable
66        try:
67            procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
68        except subprocess.CalledProcessError:
69            # File was deleted?
70            pass
71        except OSError as e:
72            if e.errno != errno.ENOENT:
73                raise
74    if procs:
75        return procs.decode("utf-8")
76    return None
77
78class idleFinish():
79    def __init__(self, msg):
80         self.msg = msg
81
82class ProcessServer():
83    profile_filename = "profile.log"
84    profile_processed_filename = "profile.log.processed"
85
86    def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
87        self.command_channel = False
88        self.command_channel_reply = False
89        self.quit = False
90        self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
91        self.next_heartbeat = time.time()
92
93        self.event_handle = None
94        self.hadanyui = False
95        self.haveui = False
96        self.maxuiwait = 30
97        self.xmlrpc = False
98
99        self.idle = None
100        # Need a lock for _idlefuns changes
101        self._idlefuns = {}
102        self._idlefuncsLock = threading.Lock()
103        self.idle_cond = threading.Condition(self._idlefuncsLock)
104
105        self.bitbake_lock = lock
106        self.bitbake_lock_name = lockname
107        self.sock = sock
108        self.sockname = sockname
109        # It is possible the directory may be renamed. Cache the inode of the socket file
110        # so we can tell if things changed.
111        self.sockinode = os.stat(self.sockname)[stat.ST_INO]
112
113        self.server_timeout = server_timeout
114        self.timeout = self.server_timeout
115        self.xmlrpcinterface = xmlrpcinterface
116
117    def register_idle_function(self, function, data):
118        """Register a function to be called while the server is idle"""
119        assert hasattr(function, '__call__')
120        with bb.utils.lock_timeout(self._idlefuncsLock):
121            self._idlefuns[function] = data
122        serverlog("Registering idle function %s" % str(function))
123
124    def run(self):
125
126        if self.xmlrpcinterface[0]:
127            self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
128
129            serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
130
131        try:
132            self.bitbake_lock.seek(0)
133            self.bitbake_lock.truncate()
134            if self.xmlrpc:
135                self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
136            else:
137                self.bitbake_lock.write("%s\n" % (os.getpid()))
138            self.bitbake_lock.flush()
139        except Exception as e:
140            serverlog("Error writing to lock file: %s" % str(e))
141            pass
142
143        if self.cooker.configuration.profile:
144            try:
145                import cProfile as profile
146            except:
147                import profile
148            prof = profile.Profile()
149
150            ret = profile.Profile.runcall(prof, self.main)
151
152            prof.dump_stats("profile.log")
153            bb.utils.process_profilelog("profile.log")
154            serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
155
156        else:
157            ret = self.main()
158
159        return ret
160
161    def _idle_check(self):
162        return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
163
164    def wait_for_idle(self, timeout=30):
165        # Wait for the idle loop to have cleared
166        with bb.utils.lock_timeout(self._idlefuncsLock):
167            return self.idle_cond.wait_for(self._idle_check, timeout) is not False
168
169    def set_async_cmd(self, cmd):
170        with bb.utils.lock_timeout(self._idlefuncsLock):
171            ret = self.idle_cond.wait_for(self._idle_check, 30)
172            if ret is False:
173                return False
174            self.cooker.command.currentAsyncCommand = cmd
175            return True
176
177    def clear_async_cmd(self):
178        with bb.utils.lock_timeout(self._idlefuncsLock):
179            self.cooker.command.currentAsyncCommand = None
180            self.idle_cond.notify_all()
181
182    def get_async_cmd(self):
183        with bb.utils.lock_timeout(self._idlefuncsLock):
184            return self.cooker.command.currentAsyncCommand
185
186    def main(self):
187        self.cooker.pre_serve()
188
189        bb.utils.set_process_name("Cooker")
190
191        ready = []
192        newconnections = []
193
194        self.controllersock = False
195        fds = [self.sock]
196        if self.xmlrpc:
197            fds.append(self.xmlrpc)
198        seendata = False
199        serverlog("Entering server connection loop")
200        serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
201
202        def disconnect_client(self, fds):
203            serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
204            if self.controllersock:
205                fds.remove(self.controllersock)
206                self.controllersock.close()
207                self.controllersock = False
208            if self.haveui:
209                # Wait for the idle loop to have cleared (30s max)
210                if not self.wait_for_idle(30):
211                    serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
212                    self.quit = True
213                fds.remove(self.command_channel)
214                bb.event.unregister_UIHhandler(self.event_handle, True)
215                self.command_channel_reply.writer.close()
216                self.event_writer.writer.close()
217                self.command_channel.close()
218                self.command_channel = False
219                del self.event_writer
220                self.lastui = time.time()
221                self.cooker.clientComplete()
222                self.haveui = False
223            ready = select.select(fds,[],[],0)[0]
224            if newconnections and not self.quit:
225                serverlog("Starting new client")
226                conn = newconnections.pop(-1)
227                fds.append(conn)
228                self.controllersock = conn
229            elif not self.timeout and not ready:
230                serverlog("No timeout, exiting.")
231                self.quit = True
232
233        self.lastui = time.time()
234        while not self.quit:
235            if self.sock in ready:
236                while select.select([self.sock],[],[],0)[0]:
237                    controllersock, address = self.sock.accept()
238                    if self.controllersock:
239                        serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
240                        newconnections.append(controllersock)
241                    else:
242                        serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
243                        self.controllersock = controllersock
244                        fds.append(controllersock)
245            if self.controllersock in ready:
246                try:
247                    serverlog("Processing Client")
248                    ui_fds = recvfds(self.controllersock, 3)
249                    serverlog("Connecting Client")
250
251                    # Where to write events to
252                    writer = ConnectionWriter(ui_fds[0])
253                    self.event_handle = bb.event.register_UIHhandler(writer, True)
254                    self.event_writer = writer
255
256                    # Where to read commands from
257                    reader = ConnectionReader(ui_fds[1])
258                    fds.append(reader)
259                    self.command_channel = reader
260
261                    # Where to send command return values to
262                    writer = ConnectionWriter(ui_fds[2])
263                    self.command_channel_reply = writer
264
265                    self.haveui = True
266                    self.hadanyui = True
267
268                except (EOFError, OSError):
269                    disconnect_client(self, fds)
270
271            if not self.timeout == -1.0 and not self.haveui and self.timeout and \
272                    (self.lastui + self.timeout) < time.time():
273                serverlog("Server timeout, exiting.")
274                self.quit = True
275
276            # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
277            # one. We have had issue with processes hanging indefinitely so timing out UI-less
278            # servers is useful.
279            if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
280                serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
281                self.quit = True
282
283            if self.command_channel in ready:
284                try:
285                    command = self.command_channel.get()
286                except EOFError:
287                    # Client connection shutting down
288                    ready = []
289                    disconnect_client(self, fds)
290                    continue
291                if command[0] == "terminateServer":
292                    self.quit = True
293                    continue
294                try:
295                    serverlog("Running command %s" % command)
296                    reply = self.cooker.command.runCommand(command, self)
297                    serverlog("Sending reply %s" % repr(reply))
298                    self.command_channel_reply.send(reply)
299                    serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
300                except Exception as e:
301                   stack = traceback.format_exc()
302                   serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
303                   logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
304
305            if self.xmlrpc in ready:
306                self.xmlrpc.handle_requests()
307
308            if not seendata and hasattr(self.cooker, "data"):
309                heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
310                if heartbeat_event:
311                    try:
312                        self.heartbeat_seconds = float(heartbeat_event)
313                    except:
314                        bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
315
316                self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
317                try:
318                    if self.timeout:
319                        self.timeout = float(self.timeout)
320                except:
321                    bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
322                seendata = True
323
324            ready = self.idle_commands(.1, fds)
325
326        if self.idle:
327            self.idle.join()
328
329        serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
330        # Remove the socket file so we don't get any more connections to avoid races
331        # The build directory could have been renamed so if the file isn't the one we created
332        # we shouldn't delete it.
333        try:
334            sockinode = os.stat(self.sockname)[stat.ST_INO]
335            if sockinode == self.sockinode:
336                os.unlink(self.sockname)
337            else:
338                serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
339        except Exception as err:
340            serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
341        self.sock.close()
342
343        try:
344            self.cooker.shutdown(True, idle=False)
345            self.cooker.notifier.stop()
346            self.cooker.confignotifier.stop()
347        except:
348            pass
349
350        self.cooker.post_serve()
351
352        if len(threading.enumerate()) != 1:
353            serverlog("More than one thread left?: " + str(threading.enumerate()))
354
355        # Flush logs before we release the lock
356        sys.stdout.flush()
357        sys.stderr.flush()
358
359        # Finally release the lockfile but warn about other processes holding it open
360        lock = self.bitbake_lock
361        lockfile = self.bitbake_lock_name
362
363        def get_lock_contents(lockfile):
364            try:
365                with open(lockfile, "r") as f:
366                    return f.readlines()
367            except FileNotFoundError:
368                return None
369
370        lock.close()
371        lock = None
372
373        while not lock:
374            i = 0
375            lock = None
376            if not os.path.exists(os.path.basename(lockfile)):
377                serverlog("Lockfile directory gone, exiting.")
378                return
379
380            while not lock and i < 30:
381                lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
382                if not lock:
383                    newlockcontents = get_lock_contents(lockfile)
384                    if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
385                        # A new server was started, the lockfile contents changed, we can exit
386                        serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
387                        return
388                    time.sleep(0.1)
389                i += 1
390            if lock:
391                # We hold the lock so we can remove the file (hide stale pid data)
392                # via unlockfile.
393                bb.utils.unlockfile(lock)
394                serverlog("Exiting as we could obtain the lock")
395                return
396
397            if not lock:
398                procs = get_lockfile_process_msg(lockfile)
399                msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
400                if procs:
401                    msg.append(":\n%s" % procs)
402                serverlog("".join(msg))
403
404    def idle_thread(self):
405        if self.cooker.configuration.profile:
406            try:
407                import cProfile as profile
408            except:
409                import profile
410            prof = profile.Profile()
411
412            ret = profile.Profile.runcall(prof, self.idle_thread_internal)
413
414            prof.dump_stats("profile-mainloop.log")
415            bb.utils.process_profilelog("profile-mainloop.log")
416            serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed")
417        else:
418            self.idle_thread_internal()
419
420    def idle_thread_internal(self):
421        def remove_idle_func(function):
422            with bb.utils.lock_timeout(self._idlefuncsLock):
423                del self._idlefuns[function]
424                self.idle_cond.notify_all()
425
426        while not self.quit:
427            nextsleep = 0.1
428            fds = []
429
430            with bb.utils.lock_timeout(self._idlefuncsLock):
431                items = list(self._idlefuns.items())
432
433            for function, data in items:
434                try:
435                    retval = function(self, data, False)
436                    if isinstance(retval, idleFinish):
437                        serverlog("Removing idle function %s at idleFinish" % str(function))
438                        remove_idle_func(function)
439                        self.cooker.command.finishAsyncCommand(retval.msg)
440                        nextsleep = None
441                    elif retval is False:
442                        serverlog("Removing idle function %s" % str(function))
443                        remove_idle_func(function)
444                        nextsleep = None
445                    elif retval is True:
446                        nextsleep = None
447                    elif isinstance(retval, float) and nextsleep:
448                        if (retval < nextsleep):
449                            nextsleep = retval
450                    elif nextsleep is None:
451                        continue
452                    else:
453                        fds = fds + retval
454                except SystemExit:
455                    raise
456                except Exception as exc:
457                    if not isinstance(exc, bb.BBHandledException):
458                        logger.exception('Running idle function')
459                    remove_idle_func(function)
460                    serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
461                    self.quit = True
462
463            # Create new heartbeat event?
464            now = time.time()
465            if bb.event._heartbeat_enabled and now >= self.next_heartbeat:
466                # We might have missed heartbeats. Just trigger once in
467                # that case and continue after the usual delay.
468                self.next_heartbeat += self.heartbeat_seconds
469                if self.next_heartbeat <= now:
470                    self.next_heartbeat = now + self.heartbeat_seconds
471                if hasattr(self.cooker, "data"):
472                    heartbeat = bb.event.HeartbeatEvent(now)
473                    try:
474                        bb.event.fire(heartbeat, self.cooker.data)
475                    except Exception as exc:
476                        if not isinstance(exc, bb.BBHandledException):
477                            logger.exception('Running heartbeat function')
478                        serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
479                        self.quit = True
480            if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
481                # Shorten timeout so that we we wake up in time for
482                # the heartbeat.
483                nextsleep = self.next_heartbeat - now
484
485            if nextsleep is not None:
486                select.select(fds,[],[],nextsleep)[0]
487
488    def idle_commands(self, delay, fds=None):
489        nextsleep = delay
490        if not fds:
491            fds = []
492
493        if not self.idle:
494            self.idle = threading.Thread(target=self.idle_thread)
495            self.idle.start()
496        elif self.idle and not self.idle.is_alive():
497            serverlog("Idle thread terminated, main thread exiting too")
498            bb.error("Idle thread terminated, main thread exiting too")
499            self.quit = True
500
501        if nextsleep is not None:
502            if self.xmlrpc:
503                nextsleep = self.xmlrpc.get_timeout(nextsleep)
504            try:
505                return select.select(fds,[],[],nextsleep)[0]
506            except InterruptedError:
507                # Ignore EINTR
508                return []
509        else:
510            return select.select(fds,[],[],0)[0]
511
512
513class ServerCommunicator():
514    def __init__(self, connection, recv):
515        self.connection = connection
516        self.recv = recv
517
518    def runCommand(self, command):
519        try:
520            self.connection.send(command)
521        except BrokenPipeError as e:
522            raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
523        if not self.recv.poll(30):
524            logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
525            if not self.recv.poll(30):
526                raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
527        try:
528            ret, exc = self.recv.get()
529        except EOFError as e:
530            raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
531        # Should probably turn all exceptions in exc back into exceptions?
532        # For now, at least handle BBHandledException
533        if exc and ("BBHandledException" in exc or "SystemExit" in exc):
534            raise bb.BBHandledException()
535        return ret, exc
536
537    def updateFeatureSet(self, featureset):
538        _, error = self.runCommand(["setFeatures", featureset])
539        if error:
540            logger.error("Unable to set the cooker to the correct featureset: %s" % error)
541            raise BaseException(error)
542
543    def getEventHandle(self):
544        handle, error = self.runCommand(["getUIHandlerNum"])
545        if error:
546            logger.error("Unable to get UI Handler Number: %s" % error)
547            raise BaseException(error)
548
549        return handle
550
551    def terminateServer(self):
552        self.connection.send(['terminateServer'])
553        return
554
555class BitBakeProcessServerConnection(object):
556    def __init__(self, ui_channel, recv, eq, sock):
557        self.connection = ServerCommunicator(ui_channel, recv)
558        self.events = eq
559        # Save sock so it doesn't get gc'd for the life of our connection
560        self.socket_connection = sock
561
562    def terminate(self):
563        self.events.close()
564        self.socket_connection.close()
565        self.connection.connection.close()
566        self.connection.recv.close()
567        return
568
569start_log_format = '--- Starting bitbake server pid %s at %s ---'
570start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
571
572class BitBakeServer(object):
573
574    def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
575
576        self.server_timeout = server_timeout
577        self.xmlrpcinterface = xmlrpcinterface
578        self.featureset = featureset
579        self.sockname = sockname
580        self.bitbake_lock = lock
581        self.profile = profile
582        self.readypipe, self.readypipein = os.pipe()
583
584        # Place the log in the builddirectory alongside the lock file
585        logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
586        self.logfile = logfile
587
588        startdatetime = datetime.datetime.now()
589        bb.daemonize.createDaemon(self._startServer, logfile)
590        self.bitbake_lock.close()
591        os.close(self.readypipein)
592
593        ready = ConnectionReader(self.readypipe)
594        r = ready.poll(5)
595        if not r:
596            bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
597            r = ready.poll(90)
598        if r:
599            try:
600                r = ready.get()
601            except EOFError:
602                # Trap the child exiting/closing the pipe and error out
603                r = None
604        if not r or r[0] != "r":
605            ready.close()
606            bb.error("Unable to start bitbake server (%s)" % str(r))
607            if os.path.exists(logfile):
608                logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
609                started = False
610                lines = []
611                lastlines = []
612                with open(logfile, "r") as f:
613                    for line in f:
614                        if started:
615                            lines.append(line)
616                        else:
617                            lastlines.append(line)
618                            res = logstart_re.search(line.rstrip())
619                            if res:
620                                ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
621                                if ldatetime >= startdatetime:
622                                    started = True
623                                    lines.append(line)
624                        if len(lastlines) > 60:
625                            lastlines = lastlines[-60:]
626                if lines:
627                    if len(lines) > 60:
628                        bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
629                    else:
630                        bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
631                elif lastlines:
632                        bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
633            else:
634                bb.error("%s doesn't exist" % logfile)
635
636            raise SystemExit(1)
637
638        ready.close()
639
640    def _startServer(self):
641        os.close(self.readypipe)
642        os.set_inheritable(self.bitbake_lock.fileno(), True)
643        os.set_inheritable(self.readypipein, True)
644        serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
645        os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname,  str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
646
647def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
648
649    import bb.cookerdata
650    import bb.cooker
651
652    serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
653
654    try:
655        bitbake_lock = os.fdopen(lockfd, "w")
656
657        # Create server control socket
658        if os.path.exists(sockname):
659            serverlog("WARNING: removing existing socket file '%s'" % sockname)
660            os.unlink(sockname)
661
662        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
663        # AF_UNIX has path length issues so chdir here to workaround
664        cwd = os.getcwd()
665        try:
666            os.chdir(os.path.dirname(sockname))
667            sock.bind(os.path.basename(sockname))
668        finally:
669            os.chdir(cwd)
670        sock.listen(1)
671
672        server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
673        writer = ConnectionWriter(readypipeinfd)
674        try:
675            featureset = []
676            cooker = bb.cooker.BBCooker(featureset, server)
677            cooker.configuration.profile = profile
678        except bb.BBHandledException:
679            return None
680        writer.send("r")
681        writer.close()
682        server.cooker = cooker
683        serverlog("Started bitbake server pid %d" % os.getpid())
684
685        server.run()
686    finally:
687        # Flush any messages/errors to the logfile before exit
688        sys.stdout.flush()
689        sys.stderr.flush()
690
691def connectProcessServer(sockname, featureset):
692    # Connect to socket
693    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
694    # AF_UNIX has path length issues so chdir here to workaround
695    cwd = os.getcwd()
696
697    readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
698    eq = command_chan_recv = command_chan = None
699
700    sock.settimeout(10)
701
702    try:
703        try:
704            os.chdir(os.path.dirname(sockname))
705            finished = False
706            while not finished:
707                try:
708                    sock.connect(os.path.basename(sockname))
709                    finished = True
710                except IOError as e:
711                    if e.errno == errno.EWOULDBLOCK:
712                        pass
713                    raise
714        finally:
715            os.chdir(cwd)
716
717        # Send an fd for the remote to write events to
718        readfd, writefd = os.pipe()
719        eq = BBUIEventQueue(readfd)
720        # Send an fd for the remote to recieve commands from
721        readfd1, writefd1 = os.pipe()
722        command_chan = ConnectionWriter(writefd1)
723        # Send an fd for the remote to write commands results to
724        readfd2, writefd2 = os.pipe()
725        command_chan_recv = ConnectionReader(readfd2)
726
727        sendfds(sock, [writefd, readfd1, writefd2])
728
729        server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
730
731        # Close the ends of the pipes we won't use
732        for i in [writefd, readfd1, writefd2]:
733            os.close(i)
734
735        server_connection.connection.updateFeatureSet(featureset)
736
737    except (Exception, SystemExit) as e:
738        if command_chan_recv:
739            command_chan_recv.close()
740        if command_chan:
741            command_chan.close()
742        for i in [writefd, readfd1, writefd2]:
743            try:
744                if i:
745                    os.close(i)
746            except OSError:
747                pass
748        sock.close()
749        raise
750
751    return server_connection
752
753def sendfds(sock, fds):
754        '''Send an array of fds over an AF_UNIX socket.'''
755        fds = array.array('i', fds)
756        msg = bytes([len(fds) % 256])
757        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
758
759def recvfds(sock, size):
760        '''Receive an array of fds over an AF_UNIX socket.'''
761        a = array.array('i')
762        bytes_size = a.itemsize * size
763        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
764        if not msg and not ancdata:
765            raise EOFError
766        try:
767            if len(ancdata) != 1:
768                raise RuntimeError('received %d items of ancdata' %
769                                   len(ancdata))
770            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
771            if (cmsg_level == socket.SOL_SOCKET and
772                cmsg_type == socket.SCM_RIGHTS):
773                if len(cmsg_data) % a.itemsize != 0:
774                    raise ValueError
775                a.frombytes(cmsg_data)
776                assert len(a) % 256 == msg[0]
777                return list(a)
778        except (ValueError, IndexError):
779            pass
780        raise RuntimeError('Invalid data received')
781
782class BBUIEventQueue:
783    def __init__(self, readfd):
784
785        self.eventQueue = []
786        self.eventQueueLock = threading.Lock()
787        self.eventQueueNotify = threading.Event()
788
789        self.reader = ConnectionReader(readfd)
790
791        self.t = threading.Thread()
792        self.t.run = self.startCallbackHandler
793        self.t.start()
794
795    def getEvent(self):
796        with bb.utils.lock_timeout(self.eventQueueLock):
797            if len(self.eventQueue) == 0:
798                return None
799
800            item = self.eventQueue.pop(0)
801            if len(self.eventQueue) == 0:
802                self.eventQueueNotify.clear()
803
804        return item
805
806    def waitEvent(self, delay):
807        self.eventQueueNotify.wait(delay)
808        return self.getEvent()
809
810    def queue_event(self, event):
811        with bb.utils.lock_timeout(self.eventQueueLock):
812            self.eventQueue.append(event)
813            self.eventQueueNotify.set()
814
815    def send_event(self, event):
816        self.queue_event(pickle.loads(event))
817
818    def startCallbackHandler(self):
819        bb.utils.set_process_name("UIEventQueue")
820        while True:
821            try:
822                ready = self.reader.wait(0.25)
823                if ready:
824                    event = self.reader.get()
825                    self.queue_event(event)
826            except (EOFError, OSError, TypeError):
827                # Easiest way to exit is to close the file descriptor to cause an exit
828                break
829
830    def close(self):
831        self.reader.close()
832        self.t.join()
833
834class ConnectionReader(object):
835
836    def __init__(self, fd):
837        self.reader = multiprocessing.connection.Connection(fd, writable=False)
838        self.rlock = multiprocessing.Lock()
839
840    def wait(self, timeout=None):
841        return multiprocessing.connection.wait([self.reader], timeout)
842
843    def poll(self, timeout=None):
844        return self.reader.poll(timeout)
845
846    def get(self):
847        with bb.utils.lock_timeout(self.rlock):
848            res = self.reader.recv_bytes()
849        return multiprocessing.reduction.ForkingPickler.loads(res)
850
851    def fileno(self):
852        return self.reader.fileno()
853
854    def close(self):
855        return self.reader.close()
856
857
858class ConnectionWriter(object):
859
860    def __init__(self, fd):
861        self.writer = multiprocessing.connection.Connection(fd, readable=False)
862        self.wlock = multiprocessing.Lock()
863        # Why bb.event needs this I have no idea
864        self.event = self
865
866    def _send(self, obj):
867        gc.disable()
868        with bb.utils.lock_timeout(self.wlock):
869            self.writer.send_bytes(obj)
870        gc.enable()
871
872    def send(self, obj):
873        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
874        # See notes/code in CookerParser
875        # We must not terminate holding this lock else processes will hang.
876        # For SIGTERM, raising afterwards avoids this.
877        # For SIGINT, we don't want to have written partial data to the pipe.
878        # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
879        process = multiprocessing.current_process()
880        if process and hasattr(process, "queue_signals"):
881            with bb.utils.lock_timeout(process.signal_threadlock):
882                process.queue_signals = True
883                self._send(obj)
884                process.queue_signals = False
885
886                while len(process.signal_received) > 0:
887                    sig = process.signal_received.pop()
888                    process.handle_sig(sig, None)
889        else:
890            self._send(obj)
891
892    def fileno(self):
893        return self.writer.fileno()
894
895    def close(self):
896        return self.writer.close()
897