1#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9"""
10    This module implements a multiprocessing.Process based server for bitbake.
11"""
12
13import bb
14import bb.event
15import logging
16import multiprocessing
17import threading
18import array
19import os
20import sys
21import time
22import select
23import socket
24import subprocess
25import errno
26import re
27import datetime
28import pickle
29import traceback
30import bb.server.xmlrpcserver
31from bb import daemonize
32from multiprocessing import queues
33
34logger = logging.getLogger('BitBake')
35
36class ProcessTimeout(SystemExit):
37    pass
38
39def serverlog(msg):
40    print(str(os.getpid()) + " " +  datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg)
41    sys.stdout.flush()
42
43class ProcessServer():
44    profile_filename = "profile.log"
45    profile_processed_filename = "profile.log.processed"
46
47    def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
48        self.command_channel = False
49        self.command_channel_reply = False
50        self.quit = False
51        self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
52        self.next_heartbeat = time.time()
53
54        self.event_handle = None
55        self.hadanyui = False
56        self.haveui = False
57        self.maxuiwait = 30
58        self.xmlrpc = False
59
60        self._idlefuns = {}
61
62        self.bitbake_lock = lock
63        self.bitbake_lock_name = lockname
64        self.sock = sock
65        self.sockname = sockname
66
67        self.server_timeout = server_timeout
68        self.timeout = self.server_timeout
69        self.xmlrpcinterface = xmlrpcinterface
70
71    def register_idle_function(self, function, data):
72        """Register a function to be called while the server is idle"""
73        assert hasattr(function, '__call__')
74        self._idlefuns[function] = data
75
76    def run(self):
77
78        if self.xmlrpcinterface[0]:
79            self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
80
81            serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
82
83        try:
84            self.bitbake_lock.seek(0)
85            self.bitbake_lock.truncate()
86            if self.xmlrpc:
87                self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
88            else:
89                self.bitbake_lock.write("%s\n" % (os.getpid()))
90            self.bitbake_lock.flush()
91        except Exception as e:
92            serverlog("Error writing to lock file: %s" % str(e))
93            pass
94
95        if self.cooker.configuration.profile:
96            try:
97                import cProfile as profile
98            except:
99                import profile
100            prof = profile.Profile()
101
102            ret = profile.Profile.runcall(prof, self.main)
103
104            prof.dump_stats("profile.log")
105            bb.utils.process_profilelog("profile.log")
106            serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
107
108        else:
109            ret = self.main()
110
111        return ret
112
113    def main(self):
114        self.cooker.pre_serve()
115
116        bb.utils.set_process_name("Cooker")
117
118        ready = []
119        newconnections = []
120
121        self.controllersock = False
122        fds = [self.sock]
123        if self.xmlrpc:
124            fds.append(self.xmlrpc)
125        seendata = False
126        serverlog("Entering server connection loop")
127
128        def disconnect_client(self, fds):
129            serverlog("Disconnecting Client")
130            if self.controllersock:
131                fds.remove(self.controllersock)
132                self.controllersock.close()
133                self.controllersock = False
134            if self.haveui:
135                fds.remove(self.command_channel)
136                bb.event.unregister_UIHhandler(self.event_handle, True)
137                self.command_channel_reply.writer.close()
138                self.event_writer.writer.close()
139                self.command_channel.close()
140                self.command_channel = False
141                del self.event_writer
142                self.lastui = time.time()
143                self.cooker.clientComplete()
144                self.haveui = False
145            ready = select.select(fds,[],[],0)[0]
146            if newconnections:
147                serverlog("Starting new client")
148                conn = newconnections.pop(-1)
149                fds.append(conn)
150                self.controllersock = conn
151            elif not self.timeout and not ready:
152                serverlog("No timeout, exiting.")
153                self.quit = True
154
155        self.lastui = time.time()
156        while not self.quit:
157            if self.sock in ready:
158                while select.select([self.sock],[],[],0)[0]:
159                    controllersock, address = self.sock.accept()
160                    if self.controllersock:
161                        serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
162                        newconnections.append(controllersock)
163                    else:
164                        serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
165                        self.controllersock = controllersock
166                        fds.append(controllersock)
167            if self.controllersock in ready:
168                try:
169                    serverlog("Processing Client")
170                    ui_fds = recvfds(self.controllersock, 3)
171                    serverlog("Connecting Client")
172
173                    # Where to write events to
174                    writer = ConnectionWriter(ui_fds[0])
175                    self.event_handle = bb.event.register_UIHhandler(writer, True)
176                    self.event_writer = writer
177
178                    # Where to read commands from
179                    reader = ConnectionReader(ui_fds[1])
180                    fds.append(reader)
181                    self.command_channel = reader
182
183                    # Where to send command return values to
184                    writer = ConnectionWriter(ui_fds[2])
185                    self.command_channel_reply = writer
186
187                    self.haveui = True
188                    self.hadanyui = True
189
190                except (EOFError, OSError):
191                    disconnect_client(self, fds)
192
193            if not self.timeout == -1.0 and not self.haveui and self.timeout and \
194                    (self.lastui + self.timeout) < time.time():
195                serverlog("Server timeout, exiting.")
196                self.quit = True
197
198            # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
199            # one. We have had issue with processes hanging indefinitely so timing out UI-less
200            # servers is useful.
201            if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
202                serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
203                self.quit = True
204
205            if self.command_channel in ready:
206                try:
207                    command = self.command_channel.get()
208                except EOFError:
209                    # Client connection shutting down
210                    ready = []
211                    disconnect_client(self, fds)
212                    continue
213                if command[0] == "terminateServer":
214                    self.quit = True
215                    continue
216                try:
217                    serverlog("Running command %s" % command)
218                    self.command_channel_reply.send(self.cooker.command.runCommand(command))
219                    serverlog("Command Completed")
220                except Exception as e:
221                   stack = traceback.format_exc()
222                   serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
223                   logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
224
225            if self.xmlrpc in ready:
226                self.xmlrpc.handle_requests()
227
228            if not seendata and hasattr(self.cooker, "data"):
229                heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
230                if heartbeat_event:
231                    try:
232                        self.heartbeat_seconds = float(heartbeat_event)
233                    except:
234                        bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
235
236                self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
237                try:
238                    if self.timeout:
239                        self.timeout = float(self.timeout)
240                except:
241                    bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
242                seendata = True
243
244            ready = self.idle_commands(.1, fds)
245
246        if len(threading.enumerate()) != 1:
247            serverlog("More than one thread left?: " + str(threading.enumerate()))
248
249        serverlog("Exiting")
250        # Remove the socket file so we don't get any more connections to avoid races
251        try:
252            os.unlink(self.sockname)
253        except:
254            pass
255        self.sock.close()
256
257        try:
258            self.cooker.shutdown(True)
259            self.cooker.notifier.stop()
260            self.cooker.confignotifier.stop()
261        except:
262            pass
263
264        self.cooker.post_serve()
265
266        # Flush logs before we release the lock
267        sys.stdout.flush()
268        sys.stderr.flush()
269
270        # Finally release the lockfile but warn about other processes holding it open
271        lock = self.bitbake_lock
272        lockfile = self.bitbake_lock_name
273
274        def get_lock_contents(lockfile):
275            try:
276                with open(lockfile, "r") as f:
277                    return f.readlines()
278            except FileNotFoundError:
279                return None
280
281        lockcontents = get_lock_contents(lockfile)
282        serverlog("Original lockfile contents: " + str(lockcontents))
283
284        lock.close()
285        lock = None
286
287        while not lock:
288            i = 0
289            lock = None
290            while not lock and i < 30:
291                lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
292                if not lock:
293                    newlockcontents = get_lock_contents(lockfile)
294                    if newlockcontents != lockcontents:
295                        # A new server was started, the lockfile contents changed, we can exit
296                        serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
297                        return
298                    time.sleep(0.1)
299                i += 1
300            if lock:
301                # We hold the lock so we can remove the file (hide stale pid data)
302                # via unlockfile.
303                bb.utils.unlockfile(lock)
304                serverlog("Exiting as we could obtain the lock")
305                return
306
307            if not lock:
308                # Some systems may not have lsof available
309                procs = None
310                try:
311                    procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
312                except subprocess.CalledProcessError:
313                    # File was deleted?
314                    continue
315                except OSError as e:
316                    if e.errno != errno.ENOENT:
317                        raise
318                if procs is None:
319                    # Fall back to fuser if lsof is unavailable
320                    try:
321                        procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
322                    except subprocess.CalledProcessError:
323                        # File was deleted?
324                        continue
325                    except OSError as e:
326                        if e.errno != errno.ENOENT:
327                            raise
328
329                msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock"
330                if procs:
331                    msg += ":\n%s" % str(procs.decode("utf-8"))
332                serverlog(msg)
333
334    def idle_commands(self, delay, fds=None):
335        nextsleep = delay
336        if not fds:
337            fds = []
338
339        for function, data in list(self._idlefuns.items()):
340            try:
341                retval = function(self, data, False)
342                if retval is False:
343                    del self._idlefuns[function]
344                    nextsleep = None
345                elif retval is True:
346                    nextsleep = None
347                elif isinstance(retval, float) and nextsleep:
348                    if (retval < nextsleep):
349                        nextsleep = retval
350                elif nextsleep is None:
351                    continue
352                else:
353                    fds = fds + retval
354            except SystemExit:
355                raise
356            except Exception as exc:
357                if not isinstance(exc, bb.BBHandledException):
358                    logger.exception('Running idle function')
359                del self._idlefuns[function]
360                self.quit = True
361
362        # Create new heartbeat event?
363        now = time.time()
364        if now >= self.next_heartbeat:
365            # We might have missed heartbeats. Just trigger once in
366            # that case and continue after the usual delay.
367            self.next_heartbeat += self.heartbeat_seconds
368            if self.next_heartbeat <= now:
369                self.next_heartbeat = now + self.heartbeat_seconds
370            if hasattr(self.cooker, "data"):
371                heartbeat = bb.event.HeartbeatEvent(now)
372                try:
373                    bb.event.fire(heartbeat, self.cooker.data)
374                except Exception as exc:
375                    if not isinstance(exc, bb.BBHandledException):
376                        logger.exception('Running heartbeat function')
377                    self.quit = True
378        if nextsleep and now + nextsleep > self.next_heartbeat:
379            # Shorten timeout so that we we wake up in time for
380            # the heartbeat.
381            nextsleep = self.next_heartbeat - now
382
383        if nextsleep is not None:
384            if self.xmlrpc:
385                nextsleep = self.xmlrpc.get_timeout(nextsleep)
386            try:
387                return select.select(fds,[],[],nextsleep)[0]
388            except InterruptedError:
389                # Ignore EINTR
390                return []
391        else:
392            return select.select(fds,[],[],0)[0]
393
394
395class ServerCommunicator():
396    def __init__(self, connection, recv):
397        self.connection = connection
398        self.recv = recv
399
400    def runCommand(self, command):
401        self.connection.send(command)
402        if not self.recv.poll(30):
403            logger.info("No reply from server in 30s")
404            if not self.recv.poll(30):
405                raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)")
406        ret, exc = self.recv.get()
407        # Should probably turn all exceptions in exc back into exceptions?
408        # For now, at least handle BBHandledException
409        if exc and ("BBHandledException" in exc or "SystemExit" in exc):
410            raise bb.BBHandledException()
411        return ret, exc
412
413    def updateFeatureSet(self, featureset):
414        _, error = self.runCommand(["setFeatures", featureset])
415        if error:
416            logger.error("Unable to set the cooker to the correct featureset: %s" % error)
417            raise BaseException(error)
418
419    def getEventHandle(self):
420        handle, error = self.runCommand(["getUIHandlerNum"])
421        if error:
422            logger.error("Unable to get UI Handler Number: %s" % error)
423            raise BaseException(error)
424
425        return handle
426
427    def terminateServer(self):
428        self.connection.send(['terminateServer'])
429        return
430
431class BitBakeProcessServerConnection(object):
432    def __init__(self, ui_channel, recv, eq, sock):
433        self.connection = ServerCommunicator(ui_channel, recv)
434        self.events = eq
435        # Save sock so it doesn't get gc'd for the life of our connection
436        self.socket_connection = sock
437
438    def terminate(self):
439        self.socket_connection.close()
440        self.connection.connection.close()
441        self.connection.recv.close()
442        return
443
444start_log_format = '--- Starting bitbake server pid %s at %s ---'
445start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
446
447class BitBakeServer(object):
448
449    def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface):
450
451        self.server_timeout = server_timeout
452        self.xmlrpcinterface = xmlrpcinterface
453        self.featureset = featureset
454        self.sockname = sockname
455        self.bitbake_lock = lock
456        self.readypipe, self.readypipein = os.pipe()
457
458        # Place the log in the builddirectory alongside the lock file
459        logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
460        self.logfile = logfile
461
462        startdatetime = datetime.datetime.now()
463        bb.daemonize.createDaemon(self._startServer, logfile)
464        self.bitbake_lock.close()
465        os.close(self.readypipein)
466
467        ready = ConnectionReader(self.readypipe)
468        r = ready.poll(5)
469        if not r:
470            bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
471            r = ready.poll(90)
472        if r:
473            try:
474                r = ready.get()
475            except EOFError:
476                # Trap the child exitting/closing the pipe and error out
477                r = None
478        if not r or r[0] != "r":
479            ready.close()
480            bb.error("Unable to start bitbake server (%s)" % str(r))
481            if os.path.exists(logfile):
482                logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
483                started = False
484                lines = []
485                lastlines = []
486                with open(logfile, "r") as f:
487                    for line in f:
488                        if started:
489                            lines.append(line)
490                        else:
491                            lastlines.append(line)
492                            res = logstart_re.search(line.rstrip())
493                            if res:
494                                ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
495                                if ldatetime >= startdatetime:
496                                    started = True
497                                    lines.append(line)
498                        if len(lastlines) > 60:
499                            lastlines = lastlines[-60:]
500                if lines:
501                    if len(lines) > 60:
502                        bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
503                    else:
504                        bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
505                elif lastlines:
506                        bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
507            else:
508                bb.error("%s doesn't exist" % logfile)
509
510            raise SystemExit(1)
511
512        ready.close()
513
514    def _startServer(self):
515        os.close(self.readypipe)
516        os.set_inheritable(self.bitbake_lock.fileno(), True)
517        os.set_inheritable(self.readypipein, True)
518        serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
519        os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname,  str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
520
521def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface):
522
523    import bb.cookerdata
524    import bb.cooker
525
526    serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
527
528    try:
529        bitbake_lock = os.fdopen(lockfd, "w")
530
531        # Create server control socket
532        if os.path.exists(sockname):
533            os.unlink(sockname)
534
535        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
536        # AF_UNIX has path length issues so chdir here to workaround
537        cwd = os.getcwd()
538        try:
539            os.chdir(os.path.dirname(sockname))
540            sock.bind(os.path.basename(sockname))
541        finally:
542            os.chdir(cwd)
543        sock.listen(1)
544
545        server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
546        writer = ConnectionWriter(readypipeinfd)
547        try:
548            featureset = []
549            cooker = bb.cooker.BBCooker(featureset, server.register_idle_function)
550        except bb.BBHandledException:
551            return None
552        writer.send("r")
553        writer.close()
554        server.cooker = cooker
555        serverlog("Started bitbake server pid %d" % os.getpid())
556
557        server.run()
558    finally:
559        # Flush any ,essages/errors to the logfile before exit
560        sys.stdout.flush()
561        sys.stderr.flush()
562
563def connectProcessServer(sockname, featureset):
564    # Connect to socket
565    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
566    # AF_UNIX has path length issues so chdir here to workaround
567    cwd = os.getcwd()
568
569    readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
570    eq = command_chan_recv = command_chan = None
571
572    sock.settimeout(10)
573
574    try:
575        try:
576            os.chdir(os.path.dirname(sockname))
577            finished = False
578            while not finished:
579                try:
580                    sock.connect(os.path.basename(sockname))
581                    finished = True
582                except IOError as e:
583                    if e.errno == errno.EWOULDBLOCK:
584                        pass
585                    raise
586        finally:
587            os.chdir(cwd)
588
589        # Send an fd for the remote to write events to
590        readfd, writefd = os.pipe()
591        eq = BBUIEventQueue(readfd)
592        # Send an fd for the remote to recieve commands from
593        readfd1, writefd1 = os.pipe()
594        command_chan = ConnectionWriter(writefd1)
595        # Send an fd for the remote to write commands results to
596        readfd2, writefd2 = os.pipe()
597        command_chan_recv = ConnectionReader(readfd2)
598
599        sendfds(sock, [writefd, readfd1, writefd2])
600
601        server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
602
603        # Close the ends of the pipes we won't use
604        for i in [writefd, readfd1, writefd2]:
605            os.close(i)
606
607        server_connection.connection.updateFeatureSet(featureset)
608
609    except (Exception, SystemExit) as e:
610        if command_chan_recv:
611            command_chan_recv.close()
612        if command_chan:
613            command_chan.close()
614        for i in [writefd, readfd1, writefd2]:
615            try:
616                if i:
617                    os.close(i)
618            except OSError:
619                pass
620        sock.close()
621        raise
622
623    return server_connection
624
625def sendfds(sock, fds):
626        '''Send an array of fds over an AF_UNIX socket.'''
627        fds = array.array('i', fds)
628        msg = bytes([len(fds) % 256])
629        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
630
631def recvfds(sock, size):
632        '''Receive an array of fds over an AF_UNIX socket.'''
633        a = array.array('i')
634        bytes_size = a.itemsize * size
635        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
636        if not msg and not ancdata:
637            raise EOFError
638        try:
639            if len(ancdata) != 1:
640                raise RuntimeError('received %d items of ancdata' %
641                                   len(ancdata))
642            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
643            if (cmsg_level == socket.SOL_SOCKET and
644                cmsg_type == socket.SCM_RIGHTS):
645                if len(cmsg_data) % a.itemsize != 0:
646                    raise ValueError
647                a.frombytes(cmsg_data)
648                assert len(a) % 256 == msg[0]
649                return list(a)
650        except (ValueError, IndexError):
651            pass
652        raise RuntimeError('Invalid data received')
653
654class BBUIEventQueue:
655    def __init__(self, readfd):
656
657        self.eventQueue = []
658        self.eventQueueLock = threading.Lock()
659        self.eventQueueNotify = threading.Event()
660
661        self.reader = ConnectionReader(readfd)
662
663        self.t = threading.Thread()
664        self.t.setDaemon(True)
665        self.t.run = self.startCallbackHandler
666        self.t.start()
667
668    def getEvent(self):
669        self.eventQueueLock.acquire()
670
671        if len(self.eventQueue) == 0:
672            self.eventQueueLock.release()
673            return None
674
675        item = self.eventQueue.pop(0)
676
677        if len(self.eventQueue) == 0:
678            self.eventQueueNotify.clear()
679
680        self.eventQueueLock.release()
681        return item
682
683    def waitEvent(self, delay):
684        self.eventQueueNotify.wait(delay)
685        return self.getEvent()
686
687    def queue_event(self, event):
688        self.eventQueueLock.acquire()
689        self.eventQueue.append(event)
690        self.eventQueueNotify.set()
691        self.eventQueueLock.release()
692
693    def send_event(self, event):
694        self.queue_event(pickle.loads(event))
695
696    def startCallbackHandler(self):
697        bb.utils.set_process_name("UIEventQueue")
698        while True:
699            try:
700                self.reader.wait()
701                event = self.reader.get()
702                self.queue_event(event)
703            except EOFError:
704                # Easiest way to exit is to close the file descriptor to cause an exit
705                break
706        self.reader.close()
707
708class ConnectionReader(object):
709
710    def __init__(self, fd):
711        self.reader = multiprocessing.connection.Connection(fd, writable=False)
712        self.rlock = multiprocessing.Lock()
713
714    def wait(self, timeout=None):
715        return multiprocessing.connection.wait([self.reader], timeout)
716
717    def poll(self, timeout=None):
718        return self.reader.poll(timeout)
719
720    def get(self):
721        with self.rlock:
722            res = self.reader.recv_bytes()
723        return multiprocessing.reduction.ForkingPickler.loads(res)
724
725    def fileno(self):
726        return self.reader.fileno()
727
728    def close(self):
729        return self.reader.close()
730
731
732class ConnectionWriter(object):
733
734    def __init__(self, fd):
735        self.writer = multiprocessing.connection.Connection(fd, readable=False)
736        self.wlock = multiprocessing.Lock()
737        # Why bb.event needs this I have no idea
738        self.event = self
739
740    def send(self, obj):
741        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
742        with self.wlock:
743            self.writer.send_bytes(obj)
744
745    def fileno(self):
746        return self.writer.fileno()
747
748    def close(self):
749        return self.writer.close()
750