1""" 2QMP Events and EventListeners 3 4Asynchronous QMP uses `EventListener` objects to listen for events. An 5`EventListener` is a FIFO event queue that can be pre-filtered to listen 6for only specific events. Each `EventListener` instance receives its own 7copy of events that it hears, so events may be consumed without fear or 8worry for depriving other listeners of events they need to hear. 9 10 11EventListener Tutorial 12---------------------- 13 14In all of the following examples, we assume that we have a `QMPClient` 15instantiated named ``qmp`` that is already connected. 16 17 18`listener()` context blocks with one name 19~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 20 21The most basic usage is by using the `listener()` context manager to 22construct them: 23 24.. code:: python 25 26 with qmp.listener('STOP') as listener: 27 await qmp.execute('stop') 28 await listener.get() 29 30The listener is active only for the duration of the ‘with’ block. This 31instance listens only for ‘STOP’ events. 32 33 34`listener()` context blocks with two or more names 35~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 36 37Multiple events can be selected for by providing any ``Iterable[str]``: 38 39.. code:: python 40 41 with qmp.listener(('STOP', 'RESUME')) as listener: 42 await qmp.execute('stop') 43 event = await listener.get() 44 assert event['event'] == 'STOP' 45 46 await qmp.execute('cont') 47 event = await listener.get() 48 assert event['event'] == 'RESUME' 49 50 51`listener()` context blocks with no names 52~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 53 54By omitting names entirely, you can listen to ALL events. 55 56.. code:: python 57 58 with qmp.listener() as listener: 59 await qmp.execute('stop') 60 event = await listener.get() 61 assert event['event'] == 'STOP' 62 63This isn’t a very good use case for this feature: In a non-trivial 64running system, we may not know what event will arrive next. Grabbing 65the top of a FIFO queue returning multiple kinds of events may be prone 66to error. 67 68 69Using async iterators to retrieve events 70~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 71 72If you’d like to simply watch what events happen to arrive, you can use 73the listener as an async iterator: 74 75.. code:: python 76 77 with qmp.listener() as listener: 78 async for event in listener: 79 print(f"Event arrived: {event['event']}") 80 81This is analogous to the following code: 82 83.. code:: python 84 85 with qmp.listener() as listener: 86 while True: 87 event = listener.get() 88 print(f"Event arrived: {event['event']}") 89 90This event stream will never end, so these blocks will never terminate. 91 92 93Using asyncio.Task to concurrently retrieve events 94~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 95 96Since a listener’s event stream will never terminate, it is not likely 97useful to use that form in a script. For longer-running clients, we can 98create event handlers by using `asyncio.Task` to create concurrent 99coroutines: 100 101.. code:: python 102 103 async def print_events(listener): 104 try: 105 async for event in listener: 106 print(f"Event arrived: {event['event']}") 107 except asyncio.CancelledError: 108 return 109 110 with qmp.listener() as listener: 111 task = asyncio.Task(print_events(listener)) 112 await qmp.execute('stop') 113 await qmp.execute('cont') 114 task.cancel() 115 await task 116 117However, there is no guarantee that these events will be received by the 118time we leave this context block. Once the context block is exited, the 119listener will cease to hear any new events, and becomes inert. 120 121Be mindful of the timing: the above example will *probably*– but does 122not *guarantee*– that both STOP/RESUMED events will be printed. The 123example below outlines how to use listeners outside of a context block. 124 125 126Using `register_listener()` and `remove_listener()` 127~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 128 129To create a listener with a longer lifetime, beyond the scope of a 130single block, create a listener and then call `register_listener()`: 131 132.. code:: python 133 134 class MyClient: 135 def __init__(self, qmp): 136 self.qmp = qmp 137 self.listener = EventListener() 138 139 async def print_events(self): 140 try: 141 async for event in self.listener: 142 print(f"Event arrived: {event['event']}") 143 except asyncio.CancelledError: 144 return 145 146 async def run(self): 147 self.task = asyncio.Task(self.print_events) 148 self.qmp.register_listener(self.listener) 149 await qmp.execute('stop') 150 await qmp.execute('cont') 151 152 async def stop(self): 153 self.task.cancel() 154 await self.task 155 self.qmp.remove_listener(self.listener) 156 157The listener can be deactivated by using `remove_listener()`. When it is 158removed, any possible pending events are cleared and it can be 159re-registered at a later time. 160 161 162Using the built-in all events listener 163~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 164 165The `QMPClient` object creates its own default listener named 166:py:obj:`~Events.events` that can be used for the same purpose without 167having to create your own: 168 169.. code:: python 170 171 async def print_events(listener): 172 try: 173 async for event in listener: 174 print(f"Event arrived: {event['event']}") 175 except asyncio.CancelledError: 176 return 177 178 task = asyncio.Task(print_events(qmp.events)) 179 180 await qmp.execute('stop') 181 await qmp.execute('cont') 182 183 task.cancel() 184 await task 185 186 187Using both .get() and async iterators 188~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 189 190The async iterator and `get()` methods pull events from the same FIFO 191queue. If you mix the usage of both, be aware: Events are emitted 192precisely once per listener. 193 194If multiple contexts try to pull events from the same listener instance, 195events are still emitted only precisely once. 196 197This restriction can be lifted by creating additional listeners. 198 199 200Creating multiple listeners 201~~~~~~~~~~~~~~~~~~~~~~~~~~~ 202 203Additional `EventListener` objects can be created at-will. Each one 204receives its own copy of events, with separate FIFO event queues. 205 206.. code:: python 207 208 my_listener = EventListener() 209 qmp.register_listener(my_listener) 210 211 await qmp.execute('stop') 212 copy1 = await my_listener.get() 213 copy2 = await qmp.events.get() 214 215 assert copy1 == copy2 216 217In this example, we await an event from both a user-created 218`EventListener` and the built-in events listener. Both receive the same 219event. 220 221 222Clearing listeners 223~~~~~~~~~~~~~~~~~~ 224 225`EventListener` objects can be cleared, clearing all events seen thus far: 226 227.. code:: python 228 229 await qmp.execute('stop') 230 qmp.events.clear() 231 await qmp.execute('cont') 232 event = await qmp.events.get() 233 assert event['event'] == 'RESUME' 234 235`EventListener` objects are FIFO queues. If events are not consumed, 236they will remain in the queue until they are witnessed or discarded via 237`clear()`. FIFO queues will be drained automatically upon leaving a 238context block, or when calling `remove_listener()`. 239 240 241Accessing listener history 242~~~~~~~~~~~~~~~~~~~~~~~~~~ 243 244`EventListener` objects record their history. Even after being cleared, 245you can obtain a record of all events seen so far: 246 247.. code:: python 248 249 await qmp.execute('stop') 250 await qmp.execute('cont') 251 qmp.events.clear() 252 253 assert len(qmp.events.history) == 2 254 assert qmp.events.history[0]['event'] == 'STOP' 255 assert qmp.events.history[1]['event'] == 'RESUME' 256 257The history is updated immediately and does not require the event to be 258witnessed first. 259 260 261Using event filters 262~~~~~~~~~~~~~~~~~~~ 263 264`EventListener` objects can be given complex filtering criteria if names 265are not sufficient: 266 267.. code:: python 268 269 def job1_filter(event) -> bool: 270 event_data = event.get('data', {}) 271 event_job_id = event_data.get('id') 272 return event_job_id == "job1" 273 274 with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener: 275 await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...}) 276 async for event in listener: 277 if event['data']['status'] == 'concluded': 278 break 279 280These filters might be most useful when parameterized. `EventListener` 281objects expect a function that takes only a single argument (the raw 282event, as a `Message`) and returns a bool; True if the event should be 283accepted into the stream. You can create a function that adapts this 284signature to accept configuration parameters: 285 286.. code:: python 287 288 def job_filter(job_id: str) -> EventFilter: 289 def filter(event: Message) -> bool: 290 return event['data']['id'] == job_id 291 return filter 292 293 with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener: 294 await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...}) 295 async for event in listener: 296 if event['data']['status'] == 'concluded': 297 break 298 299 300Activating an existing listener with `listen()` 301~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 302 303Listeners with complex, long configurations can also be created manually 304and activated temporarily by using `listen()` instead of `listener()`: 305 306.. code:: python 307 308 listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', 309 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 310 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) 311 312 with qmp.listen(listener): 313 await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...}) 314 async for event in listener: 315 print(event) 316 if event['event'] == 'BLOCK_JOB_COMPLETED': 317 break 318 319Any events that are not witnessed by the time the block is left will be 320cleared from the queue; entering the block is an implicit 321`register_listener()` and leaving the block is an implicit 322`remove_listener()`. 323 324 325Activating multiple existing listeners with `listen()` 326~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 327 328While `listener()` is only capable of creating a single listener, 329`listen()` is capable of activating multiple listeners simultaneously: 330 331.. code:: python 332 333 def job_filter(job_id: str) -> EventFilter: 334 def filter(event: Message) -> bool: 335 return event['data']['id'] == job_id 336 return filter 337 338 jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA')) 339 jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB')) 340 341 with qmp.listen(jobA, jobB): 342 qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...}) 343 qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...}) 344 345 async for event in jobA.get(): 346 if event['data']['status'] == 'concluded': 347 break 348 async for event in jobB.get(): 349 if event['data']['status'] == 'concluded': 350 break 351 352 353Extending the `EventListener` class 354~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 355 356In the case that a more specialized `EventListener` is desired to 357provide either more functionality or more compact syntax for specialized 358cases, it can be extended. 359 360One of the key methods to extend or override is 361:py:meth:`~EventListener.accept()`. The default implementation checks an 362incoming message for: 363 3641. A qualifying name, if any :py:obj:`~EventListener.names` were 365 specified at initialization time 3662. That :py:obj:`~EventListener.event_filter()` returns True. 367 368This can be modified however you see fit to change the criteria for 369inclusion in the stream. 370 371For convenience, a ``JobListener`` class could be created that simply 372bakes in configuration so it does not need to be repeated: 373 374.. code:: python 375 376 class JobListener(EventListener): 377 def __init__(self, job_id: str): 378 super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', 379 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 380 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) 381 self.job_id = job_id 382 383 def accept(self, event) -> bool: 384 if not super().accept(event): 385 return False 386 if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'): 387 return event['data']['id'] == job_id 388 return event['data']['device'] == job_id 389 390From here on out, you can conjure up a custom-purpose listener that 391listens only for job-related events for a specific job-id easily: 392 393.. code:: python 394 395 listener = JobListener('job4') 396 with qmp.listener(listener): 397 await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...}) 398 async for event in listener: 399 print(event) 400 if event['event'] == 'BLOCK_JOB_COMPLETED': 401 break 402 403 404Experimental Interfaces & Design Issues 405--------------------------------------- 406 407These interfaces are not ones I am sure I will keep or otherwise modify 408heavily. 409 410qmp.listener()’s type signature 411~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 412 413`listener()` does not return anything, because it was assumed the caller 414already had a handle to the listener. However, for 415``qmp.listener(EventListener())`` forms, the caller will not have saved 416a handle to the listener. 417 418Because this function can accept *many* listeners, I found it hard to 419accurately type in a way where it could be used in both “one” or “many” 420forms conveniently and in a statically type-safe manner. 421 422Ultimately, I removed the return altogether, but perhaps with more time 423I can work out a way to re-add it. 424 425 426API Reference 427------------- 428 429""" 430 431import asyncio 432from contextlib import contextmanager 433import logging 434from typing import ( 435 AsyncIterator, 436 Callable, 437 Iterable, 438 Iterator, 439 List, 440 Optional, 441 Set, 442 Tuple, 443 Union, 444) 445 446from .error import QMPError 447from .message import Message 448 449 450EventNames = Union[str, Iterable[str], None] 451EventFilter = Callable[[Message], bool] 452 453 454class ListenerError(QMPError): 455 """ 456 Generic error class for `EventListener`-related problems. 457 """ 458 459 460class EventListener: 461 """ 462 Selectively listens for events with runtime configurable filtering. 463 464 This class is designed to be directly usable for the most common cases, 465 but it can be extended to provide more rigorous control. 466 467 :param names: 468 One or more names of events to listen for. 469 When not provided, listen for ALL events. 470 :param event_filter: 471 An optional event filtering function. 472 When names are also provided, this acts as a secondary filter. 473 474 When ``names`` and ``event_filter`` are both provided, the names 475 will be filtered first, and then the filter function will be called 476 second. The event filter function can assume that the format of the 477 event is a known format. 478 """ 479 def __init__( 480 self, 481 names: EventNames = None, 482 event_filter: Optional[EventFilter] = None, 483 ): 484 # Queue of 'heard' events yet to be witnessed by a caller. 485 self._queue: 'asyncio.Queue[Message]' = asyncio.Queue() 486 487 # Intended as a historical record, NOT a processing queue or backlog. 488 self._history: List[Message] = [] 489 490 #: Primary event filter, based on one or more event names. 491 self.names: Set[str] = set() 492 if isinstance(names, str): 493 self.names.add(names) 494 elif names is not None: 495 self.names.update(names) 496 497 #: Optional, secondary event filter. 498 self.event_filter: Optional[EventFilter] = event_filter 499 500 @property 501 def history(self) -> Tuple[Message, ...]: 502 """ 503 A read-only history of all events seen so far. 504 505 This represents *every* event, including those not yet witnessed 506 via `get()` or ``async for``. It persists between `clear()` 507 calls and is immutable. 508 """ 509 return tuple(self._history) 510 511 def accept(self, event: Message) -> bool: 512 """ 513 Determine if this listener accepts this event. 514 515 This method determines which events will appear in the stream. 516 The default implementation simply checks the event against the 517 list of names and the event_filter to decide if this 518 `EventListener` accepts a given event. It can be 519 overridden/extended to provide custom listener behavior. 520 521 User code is not expected to need to invoke this method. 522 523 :param event: The event under consideration. 524 :return: `True`, if this listener accepts this event. 525 """ 526 name_ok = (not self.names) or (event['event'] in self.names) 527 return name_ok and ( 528 (not self.event_filter) or self.event_filter(event) 529 ) 530 531 async def put(self, event: Message) -> None: 532 """ 533 Conditionally put a new event into the FIFO queue. 534 535 This method is not designed to be invoked from user code, and it 536 should not need to be overridden. It is a public interface so 537 that `QMPClient` has an interface by which it can inform 538 registered listeners of new events. 539 540 The event will be put into the queue if 541 :py:meth:`~EventListener.accept()` returns `True`. 542 543 :param event: The new event to put into the FIFO queue. 544 """ 545 if not self.accept(event): 546 return 547 548 self._history.append(event) 549 await self._queue.put(event) 550 551 async def get(self) -> Message: 552 """ 553 Wait for the very next event in this stream. 554 555 If one is already available, return that one. 556 """ 557 return await self._queue.get() 558 559 def empty(self) -> bool: 560 """ 561 Return `True` if there are no pending events. 562 """ 563 return self._queue.empty() 564 565 def clear(self) -> List[Message]: 566 """ 567 Clear this listener of all pending events. 568 569 Called when an `EventListener` is being unregistered, this clears the 570 pending FIFO queue synchronously. It can be also be used to 571 manually clear any pending events, if desired. 572 573 :return: The cleared events, if any. 574 575 .. warning:: 576 Take care when discarding events. Cleared events will be 577 silently tossed on the floor. All events that were ever 578 accepted by this listener are visible in `history()`. 579 """ 580 events = [] 581 while True: 582 try: 583 events.append(self._queue.get_nowait()) 584 except asyncio.QueueEmpty: 585 break 586 587 return events 588 589 def __aiter__(self) -> AsyncIterator[Message]: 590 return self 591 592 async def __anext__(self) -> Message: 593 """ 594 Enables the `EventListener` to function as an async iterator. 595 596 It may be used like this: 597 598 .. code:: python 599 600 async for event in listener: 601 print(event) 602 603 These iterators will never terminate of their own accord; you 604 must provide break conditions or otherwise prepare to run them 605 in an `asyncio.Task` that can be cancelled. 606 """ 607 return await self.get() 608 609 610class Events: 611 """ 612 Events is a mix-in class that adds event functionality to the QMP class. 613 614 It's designed specifically as a mix-in for `QMPClient`, and it 615 relies upon the class it is being mixed into having a 'logger' 616 property. 617 """ 618 def __init__(self) -> None: 619 self._listeners: List[EventListener] = [] 620 621 #: Default, all-events `EventListener`. 622 self.events: EventListener = EventListener() 623 self.register_listener(self.events) 624 625 # Parent class needs to have a logger 626 self.logger: logging.Logger 627 628 async def _event_dispatch(self, msg: Message) -> None: 629 """ 630 Given a new event, propagate it to all of the active listeners. 631 632 :param msg: The event to propagate. 633 """ 634 for listener in self._listeners: 635 await listener.put(msg) 636 637 def register_listener(self, listener: EventListener) -> None: 638 """ 639 Register and activate an `EventListener`. 640 641 :param listener: The listener to activate. 642 :raise ListenerError: If the given listener is already registered. 643 """ 644 if listener in self._listeners: 645 raise ListenerError("Attempted to re-register existing listener") 646 self.logger.debug("Registering %s.", str(listener)) 647 self._listeners.append(listener) 648 649 def remove_listener(self, listener: EventListener) -> None: 650 """ 651 Unregister and deactivate an `EventListener`. 652 653 The removed listener will have its pending events cleared via 654 `clear()`. The listener can be re-registered later when 655 desired. 656 657 :param listener: The listener to deactivate. 658 :raise ListenerError: If the given listener is not registered. 659 """ 660 if listener == self.events: 661 raise ListenerError("Cannot remove the default listener.") 662 self.logger.debug("Removing %s.", str(listener)) 663 listener.clear() 664 self._listeners.remove(listener) 665 666 @contextmanager 667 def listen(self, *listeners: EventListener) -> Iterator[None]: 668 r""" 669 Context manager: Temporarily listen with an `EventListener`. 670 671 Accepts one or more `EventListener` objects and registers them, 672 activating them for the duration of the context block. 673 674 `EventListener` objects will have any pending events in their 675 FIFO queue cleared upon exiting the context block, when they are 676 deactivated. 677 678 :param \*listeners: One or more EventListeners to activate. 679 :raise ListenerError: If the given listener(s) are already active. 680 """ 681 _added = [] 682 683 try: 684 for listener in listeners: 685 self.register_listener(listener) 686 _added.append(listener) 687 688 yield 689 690 finally: 691 for listener in _added: 692 self.remove_listener(listener) 693 694 @contextmanager 695 def listener( 696 self, 697 names: EventNames = (), 698 event_filter: Optional[EventFilter] = None 699 ) -> Iterator[EventListener]: 700 """ 701 Context manager: Temporarily listen with a new `EventListener`. 702 703 Creates an `EventListener` object and registers it, activating 704 it for the duration of the context block. 705 706 :param names: 707 One or more names of events to listen for. 708 When not provided, listen for ALL events. 709 :param event_filter: 710 An optional event filtering function. 711 When names are also provided, this acts as a secondary filter. 712 713 :return: The newly created and active `EventListener`. 714 """ 715 listener = EventListener(names, event_filter) 716 with self.listen(listener): 717 yield listener 718