1# 2# BitBake Process based server. 3# 4# Copyright (C) 2010 Bob Foerster <robert@erafx.com> 5# 6# SPDX-License-Identifier: GPL-2.0-only 7# 8 9""" 10 This module implements a multiprocessing.Process based server for bitbake. 11""" 12 13import bb 14import bb.event 15import logging 16import multiprocessing 17import threading 18import array 19import os 20import sys 21import time 22import select 23import socket 24import subprocess 25import errno 26import re 27import datetime 28import pickle 29import traceback 30import bb.server.xmlrpcserver 31from bb import daemonize 32from multiprocessing import queues 33 34logger = logging.getLogger('BitBake') 35 36class ProcessTimeout(SystemExit): 37 pass 38 39def serverlog(msg): 40 print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) 41 sys.stdout.flush() 42 43class ProcessServer(): 44 profile_filename = "profile.log" 45 profile_processed_filename = "profile.log.processed" 46 47 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): 48 self.command_channel = False 49 self.command_channel_reply = False 50 self.quit = False 51 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. 52 self.next_heartbeat = time.time() 53 54 self.event_handle = None 55 self.hadanyui = False 56 self.haveui = False 57 self.maxuiwait = 30 58 self.xmlrpc = False 59 60 self._idlefuns = {} 61 62 self.bitbake_lock = lock 63 self.bitbake_lock_name = lockname 64 self.sock = sock 65 self.sockname = sockname 66 67 self.server_timeout = server_timeout 68 self.timeout = self.server_timeout 69 self.xmlrpcinterface = xmlrpcinterface 70 71 def register_idle_function(self, function, data): 72 """Register a function to be called while the server is idle""" 73 assert hasattr(function, '__call__') 74 self._idlefuns[function] = data 75 76 def run(self): 77 78 if self.xmlrpcinterface[0]: 79 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) 80 81 serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) 82 83 try: 84 self.bitbake_lock.seek(0) 85 self.bitbake_lock.truncate() 86 if self.xmlrpc: 87 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port)) 88 else: 89 self.bitbake_lock.write("%s\n" % (os.getpid())) 90 self.bitbake_lock.flush() 91 except Exception as e: 92 serverlog("Error writing to lock file: %s" % str(e)) 93 pass 94 95 if self.cooker.configuration.profile: 96 try: 97 import cProfile as profile 98 except: 99 import profile 100 prof = profile.Profile() 101 102 ret = profile.Profile.runcall(prof, self.main) 103 104 prof.dump_stats("profile.log") 105 bb.utils.process_profilelog("profile.log") 106 serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") 107 108 else: 109 ret = self.main() 110 111 return ret 112 113 def main(self): 114 self.cooker.pre_serve() 115 116 bb.utils.set_process_name("Cooker") 117 118 ready = [] 119 newconnections = [] 120 121 self.controllersock = False 122 fds = [self.sock] 123 if self.xmlrpc: 124 fds.append(self.xmlrpc) 125 seendata = False 126 serverlog("Entering server connection loop") 127 128 def disconnect_client(self, fds): 129 serverlog("Disconnecting Client") 130 if self.controllersock: 131 fds.remove(self.controllersock) 132 self.controllersock.close() 133 self.controllersock = False 134 if self.haveui: 135 fds.remove(self.command_channel) 136 bb.event.unregister_UIHhandler(self.event_handle, True) 137 self.command_channel_reply.writer.close() 138 self.event_writer.writer.close() 139 self.command_channel.close() 140 self.command_channel = False 141 del self.event_writer 142 self.lastui = time.time() 143 self.cooker.clientComplete() 144 self.haveui = False 145 ready = select.select(fds,[],[],0)[0] 146 if newconnections: 147 serverlog("Starting new client") 148 conn = newconnections.pop(-1) 149 fds.append(conn) 150 self.controllersock = conn 151 elif not self.timeout and not ready: 152 serverlog("No timeout, exiting.") 153 self.quit = True 154 155 self.lastui = time.time() 156 while not self.quit: 157 if self.sock in ready: 158 while select.select([self.sock],[],[],0)[0]: 159 controllersock, address = self.sock.accept() 160 if self.controllersock: 161 serverlog("Queuing %s (%s)" % (str(ready), str(newconnections))) 162 newconnections.append(controllersock) 163 else: 164 serverlog("Accepting %s (%s)" % (str(ready), str(newconnections))) 165 self.controllersock = controllersock 166 fds.append(controllersock) 167 if self.controllersock in ready: 168 try: 169 serverlog("Processing Client") 170 ui_fds = recvfds(self.controllersock, 3) 171 serverlog("Connecting Client") 172 173 # Where to write events to 174 writer = ConnectionWriter(ui_fds[0]) 175 self.event_handle = bb.event.register_UIHhandler(writer, True) 176 self.event_writer = writer 177 178 # Where to read commands from 179 reader = ConnectionReader(ui_fds[1]) 180 fds.append(reader) 181 self.command_channel = reader 182 183 # Where to send command return values to 184 writer = ConnectionWriter(ui_fds[2]) 185 self.command_channel_reply = writer 186 187 self.haveui = True 188 self.hadanyui = True 189 190 except (EOFError, OSError): 191 disconnect_client(self, fds) 192 193 if not self.timeout == -1.0 and not self.haveui and self.timeout and \ 194 (self.lastui + self.timeout) < time.time(): 195 serverlog("Server timeout, exiting.") 196 self.quit = True 197 198 # If we don't see a UI connection within maxuiwait, its unlikely we're going to see 199 # one. We have had issue with processes hanging indefinitely so timing out UI-less 200 # servers is useful. 201 if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time(): 202 serverlog("No UI connection within max timeout, exiting to avoid infinite loop.") 203 self.quit = True 204 205 if self.command_channel in ready: 206 try: 207 command = self.command_channel.get() 208 except EOFError: 209 # Client connection shutting down 210 ready = [] 211 disconnect_client(self, fds) 212 continue 213 if command[0] == "terminateServer": 214 self.quit = True 215 continue 216 try: 217 serverlog("Running command %s" % command) 218 self.command_channel_reply.send(self.cooker.command.runCommand(command)) 219 serverlog("Command Completed") 220 except Exception as e: 221 stack = traceback.format_exc() 222 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) 223 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) 224 225 if self.xmlrpc in ready: 226 self.xmlrpc.handle_requests() 227 228 if not seendata and hasattr(self.cooker, "data"): 229 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') 230 if heartbeat_event: 231 try: 232 self.heartbeat_seconds = float(heartbeat_event) 233 except: 234 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) 235 236 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') 237 try: 238 if self.timeout: 239 self.timeout = float(self.timeout) 240 except: 241 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 242 seendata = True 243 244 ready = self.idle_commands(.1, fds) 245 246 if len(threading.enumerate()) != 1: 247 serverlog("More than one thread left?: " + str(threading.enumerate())) 248 249 serverlog("Exiting") 250 # Remove the socket file so we don't get any more connections to avoid races 251 try: 252 os.unlink(self.sockname) 253 except: 254 pass 255 self.sock.close() 256 257 try: 258 self.cooker.shutdown(True) 259 self.cooker.notifier.stop() 260 self.cooker.confignotifier.stop() 261 except: 262 pass 263 264 self.cooker.post_serve() 265 266 # Flush logs before we release the lock 267 sys.stdout.flush() 268 sys.stderr.flush() 269 270 # Finally release the lockfile but warn about other processes holding it open 271 lock = self.bitbake_lock 272 lockfile = self.bitbake_lock_name 273 274 def get_lock_contents(lockfile): 275 try: 276 with open(lockfile, "r") as f: 277 return f.readlines() 278 except FileNotFoundError: 279 return None 280 281 lockcontents = get_lock_contents(lockfile) 282 serverlog("Original lockfile contents: " + str(lockcontents)) 283 284 lock.close() 285 lock = None 286 287 while not lock: 288 i = 0 289 lock = None 290 while not lock and i < 30: 291 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 292 if not lock: 293 newlockcontents = get_lock_contents(lockfile) 294 if newlockcontents != lockcontents: 295 # A new server was started, the lockfile contents changed, we can exit 296 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 297 return 298 time.sleep(0.1) 299 i += 1 300 if lock: 301 # We hold the lock so we can remove the file (hide stale pid data) 302 # via unlockfile. 303 bb.utils.unlockfile(lock) 304 serverlog("Exiting as we could obtain the lock") 305 return 306 307 if not lock: 308 # Some systems may not have lsof available 309 procs = None 310 try: 311 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) 312 except subprocess.CalledProcessError: 313 # File was deleted? 314 continue 315 except OSError as e: 316 if e.errno != errno.ENOENT: 317 raise 318 if procs is None: 319 # Fall back to fuser if lsof is unavailable 320 try: 321 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) 322 except subprocess.CalledProcessError: 323 # File was deleted? 324 continue 325 except OSError as e: 326 if e.errno != errno.ENOENT: 327 raise 328 329 msg = "Delaying shutdown due to active processes which appear to be holding bitbake.lock" 330 if procs: 331 msg += ":\n%s" % str(procs.decode("utf-8")) 332 serverlog(msg) 333 334 def idle_commands(self, delay, fds=None): 335 nextsleep = delay 336 if not fds: 337 fds = [] 338 339 for function, data in list(self._idlefuns.items()): 340 try: 341 retval = function(self, data, False) 342 if retval is False: 343 del self._idlefuns[function] 344 nextsleep = None 345 elif retval is True: 346 nextsleep = None 347 elif isinstance(retval, float) and nextsleep: 348 if (retval < nextsleep): 349 nextsleep = retval 350 elif nextsleep is None: 351 continue 352 else: 353 fds = fds + retval 354 except SystemExit: 355 raise 356 except Exception as exc: 357 if not isinstance(exc, bb.BBHandledException): 358 logger.exception('Running idle function') 359 del self._idlefuns[function] 360 self.quit = True 361 362 # Create new heartbeat event? 363 now = time.time() 364 if now >= self.next_heartbeat: 365 # We might have missed heartbeats. Just trigger once in 366 # that case and continue after the usual delay. 367 self.next_heartbeat += self.heartbeat_seconds 368 if self.next_heartbeat <= now: 369 self.next_heartbeat = now + self.heartbeat_seconds 370 if hasattr(self.cooker, "data"): 371 heartbeat = bb.event.HeartbeatEvent(now) 372 try: 373 bb.event.fire(heartbeat, self.cooker.data) 374 except Exception as exc: 375 if not isinstance(exc, bb.BBHandledException): 376 logger.exception('Running heartbeat function') 377 self.quit = True 378 if nextsleep and now + nextsleep > self.next_heartbeat: 379 # Shorten timeout so that we we wake up in time for 380 # the heartbeat. 381 nextsleep = self.next_heartbeat - now 382 383 if nextsleep is not None: 384 if self.xmlrpc: 385 nextsleep = self.xmlrpc.get_timeout(nextsleep) 386 try: 387 return select.select(fds,[],[],nextsleep)[0] 388 except InterruptedError: 389 # Ignore EINTR 390 return [] 391 else: 392 return select.select(fds,[],[],0)[0] 393 394 395class ServerCommunicator(): 396 def __init__(self, connection, recv): 397 self.connection = connection 398 self.recv = recv 399 400 def runCommand(self, command): 401 self.connection.send(command) 402 if not self.recv.poll(30): 403 logger.info("No reply from server in 30s") 404 if not self.recv.poll(30): 405 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") 406 ret, exc = self.recv.get() 407 # Should probably turn all exceptions in exc back into exceptions? 408 # For now, at least handle BBHandledException 409 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 410 raise bb.BBHandledException() 411 return ret, exc 412 413 def updateFeatureSet(self, featureset): 414 _, error = self.runCommand(["setFeatures", featureset]) 415 if error: 416 logger.error("Unable to set the cooker to the correct featureset: %s" % error) 417 raise BaseException(error) 418 419 def getEventHandle(self): 420 handle, error = self.runCommand(["getUIHandlerNum"]) 421 if error: 422 logger.error("Unable to get UI Handler Number: %s" % error) 423 raise BaseException(error) 424 425 return handle 426 427 def terminateServer(self): 428 self.connection.send(['terminateServer']) 429 return 430 431class BitBakeProcessServerConnection(object): 432 def __init__(self, ui_channel, recv, eq, sock): 433 self.connection = ServerCommunicator(ui_channel, recv) 434 self.events = eq 435 # Save sock so it doesn't get gc'd for the life of our connection 436 self.socket_connection = sock 437 438 def terminate(self): 439 self.socket_connection.close() 440 self.connection.connection.close() 441 self.connection.recv.close() 442 return 443 444start_log_format = '--- Starting bitbake server pid %s at %s ---' 445start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' 446 447class BitBakeServer(object): 448 449 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): 450 451 self.server_timeout = server_timeout 452 self.xmlrpcinterface = xmlrpcinterface 453 self.featureset = featureset 454 self.sockname = sockname 455 self.bitbake_lock = lock 456 self.readypipe, self.readypipein = os.pipe() 457 458 # Place the log in the builddirectory alongside the lock file 459 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log") 460 self.logfile = logfile 461 462 startdatetime = datetime.datetime.now() 463 bb.daemonize.createDaemon(self._startServer, logfile) 464 self.bitbake_lock.close() 465 os.close(self.readypipein) 466 467 ready = ConnectionReader(self.readypipe) 468 r = ready.poll(5) 469 if not r: 470 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90") 471 r = ready.poll(90) 472 if r: 473 try: 474 r = ready.get() 475 except EOFError: 476 # Trap the child exitting/closing the pipe and error out 477 r = None 478 if not r or r[0] != "r": 479 ready.close() 480 bb.error("Unable to start bitbake server (%s)" % str(r)) 481 if os.path.exists(logfile): 482 logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) 483 started = False 484 lines = [] 485 lastlines = [] 486 with open(logfile, "r") as f: 487 for line in f: 488 if started: 489 lines.append(line) 490 else: 491 lastlines.append(line) 492 res = logstart_re.search(line.rstrip()) 493 if res: 494 ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format) 495 if ldatetime >= startdatetime: 496 started = True 497 lines.append(line) 498 if len(lastlines) > 60: 499 lastlines = lastlines[-60:] 500 if lines: 501 if len(lines) > 60: 502 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:]))) 503 else: 504 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines))) 505 elif lastlines: 506 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines))) 507 else: 508 bb.error("%s doesn't exist" % logfile) 509 510 raise SystemExit(1) 511 512 ready.close() 513 514 def _startServer(self): 515 os.close(self.readypipe) 516 os.set_inheritable(self.bitbake_lock.fileno(), True) 517 os.set_inheritable(self.readypipein, True) 518 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 519 os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 520 521def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): 522 523 import bb.cookerdata 524 import bb.cooker 525 526 serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format))) 527 528 try: 529 bitbake_lock = os.fdopen(lockfd, "w") 530 531 # Create server control socket 532 if os.path.exists(sockname): 533 os.unlink(sockname) 534 535 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 536 # AF_UNIX has path length issues so chdir here to workaround 537 cwd = os.getcwd() 538 try: 539 os.chdir(os.path.dirname(sockname)) 540 sock.bind(os.path.basename(sockname)) 541 finally: 542 os.chdir(cwd) 543 sock.listen(1) 544 545 server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface) 546 writer = ConnectionWriter(readypipeinfd) 547 try: 548 featureset = [] 549 cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) 550 except bb.BBHandledException: 551 return None 552 writer.send("r") 553 writer.close() 554 server.cooker = cooker 555 serverlog("Started bitbake server pid %d" % os.getpid()) 556 557 server.run() 558 finally: 559 # Flush any ,essages/errors to the logfile before exit 560 sys.stdout.flush() 561 sys.stderr.flush() 562 563def connectProcessServer(sockname, featureset): 564 # Connect to socket 565 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 566 # AF_UNIX has path length issues so chdir here to workaround 567 cwd = os.getcwd() 568 569 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None 570 eq = command_chan_recv = command_chan = None 571 572 sock.settimeout(10) 573 574 try: 575 try: 576 os.chdir(os.path.dirname(sockname)) 577 finished = False 578 while not finished: 579 try: 580 sock.connect(os.path.basename(sockname)) 581 finished = True 582 except IOError as e: 583 if e.errno == errno.EWOULDBLOCK: 584 pass 585 raise 586 finally: 587 os.chdir(cwd) 588 589 # Send an fd for the remote to write events to 590 readfd, writefd = os.pipe() 591 eq = BBUIEventQueue(readfd) 592 # Send an fd for the remote to recieve commands from 593 readfd1, writefd1 = os.pipe() 594 command_chan = ConnectionWriter(writefd1) 595 # Send an fd for the remote to write commands results to 596 readfd2, writefd2 = os.pipe() 597 command_chan_recv = ConnectionReader(readfd2) 598 599 sendfds(sock, [writefd, readfd1, writefd2]) 600 601 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock) 602 603 # Close the ends of the pipes we won't use 604 for i in [writefd, readfd1, writefd2]: 605 os.close(i) 606 607 server_connection.connection.updateFeatureSet(featureset) 608 609 except (Exception, SystemExit) as e: 610 if command_chan_recv: 611 command_chan_recv.close() 612 if command_chan: 613 command_chan.close() 614 for i in [writefd, readfd1, writefd2]: 615 try: 616 if i: 617 os.close(i) 618 except OSError: 619 pass 620 sock.close() 621 raise 622 623 return server_connection 624 625def sendfds(sock, fds): 626 '''Send an array of fds over an AF_UNIX socket.''' 627 fds = array.array('i', fds) 628 msg = bytes([len(fds) % 256]) 629 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 630 631def recvfds(sock, size): 632 '''Receive an array of fds over an AF_UNIX socket.''' 633 a = array.array('i') 634 bytes_size = a.itemsize * size 635 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) 636 if not msg and not ancdata: 637 raise EOFError 638 try: 639 if len(ancdata) != 1: 640 raise RuntimeError('received %d items of ancdata' % 641 len(ancdata)) 642 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 643 if (cmsg_level == socket.SOL_SOCKET and 644 cmsg_type == socket.SCM_RIGHTS): 645 if len(cmsg_data) % a.itemsize != 0: 646 raise ValueError 647 a.frombytes(cmsg_data) 648 assert len(a) % 256 == msg[0] 649 return list(a) 650 except (ValueError, IndexError): 651 pass 652 raise RuntimeError('Invalid data received') 653 654class BBUIEventQueue: 655 def __init__(self, readfd): 656 657 self.eventQueue = [] 658 self.eventQueueLock = threading.Lock() 659 self.eventQueueNotify = threading.Event() 660 661 self.reader = ConnectionReader(readfd) 662 663 self.t = threading.Thread() 664 self.t.setDaemon(True) 665 self.t.run = self.startCallbackHandler 666 self.t.start() 667 668 def getEvent(self): 669 self.eventQueueLock.acquire() 670 671 if len(self.eventQueue) == 0: 672 self.eventQueueLock.release() 673 return None 674 675 item = self.eventQueue.pop(0) 676 677 if len(self.eventQueue) == 0: 678 self.eventQueueNotify.clear() 679 680 self.eventQueueLock.release() 681 return item 682 683 def waitEvent(self, delay): 684 self.eventQueueNotify.wait(delay) 685 return self.getEvent() 686 687 def queue_event(self, event): 688 self.eventQueueLock.acquire() 689 self.eventQueue.append(event) 690 self.eventQueueNotify.set() 691 self.eventQueueLock.release() 692 693 def send_event(self, event): 694 self.queue_event(pickle.loads(event)) 695 696 def startCallbackHandler(self): 697 bb.utils.set_process_name("UIEventQueue") 698 while True: 699 try: 700 self.reader.wait() 701 event = self.reader.get() 702 self.queue_event(event) 703 except EOFError: 704 # Easiest way to exit is to close the file descriptor to cause an exit 705 break 706 self.reader.close() 707 708class ConnectionReader(object): 709 710 def __init__(self, fd): 711 self.reader = multiprocessing.connection.Connection(fd, writable=False) 712 self.rlock = multiprocessing.Lock() 713 714 def wait(self, timeout=None): 715 return multiprocessing.connection.wait([self.reader], timeout) 716 717 def poll(self, timeout=None): 718 return self.reader.poll(timeout) 719 720 def get(self): 721 with self.rlock: 722 res = self.reader.recv_bytes() 723 return multiprocessing.reduction.ForkingPickler.loads(res) 724 725 def fileno(self): 726 return self.reader.fileno() 727 728 def close(self): 729 return self.reader.close() 730 731 732class ConnectionWriter(object): 733 734 def __init__(self, fd): 735 self.writer = multiprocessing.connection.Connection(fd, readable=False) 736 self.wlock = multiprocessing.Lock() 737 # Why bb.event needs this I have no idea 738 self.event = self 739 740 def send(self, obj): 741 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 742 with self.wlock: 743 self.writer.send_bytes(obj) 744 745 def fileno(self): 746 return self.writer.fileno() 747 748 def close(self): 749 return self.writer.close() 750