1*37094b6dSJohn Snow""" 2*37094b6dSJohn SnowQMP Events and EventListeners 3*37094b6dSJohn Snow 4*37094b6dSJohn SnowAsynchronous QMP uses `EventListener` objects to listen for events. An 5*37094b6dSJohn Snow`EventListener` is a FIFO event queue that can be pre-filtered to listen 6*37094b6dSJohn Snowfor only specific events. Each `EventListener` instance receives its own 7*37094b6dSJohn Snowcopy of events that it hears, so events may be consumed without fear or 8*37094b6dSJohn Snowworry for depriving other listeners of events they need to hear. 9*37094b6dSJohn Snow 10*37094b6dSJohn Snow 11*37094b6dSJohn SnowEventListener Tutorial 12*37094b6dSJohn Snow---------------------- 13*37094b6dSJohn Snow 14*37094b6dSJohn SnowIn all of the following examples, we assume that we have a `QMPClient` 15*37094b6dSJohn Snowinstantiated named ``qmp`` that is already connected. 16*37094b6dSJohn Snow 17*37094b6dSJohn Snow 18*37094b6dSJohn Snow`listener()` context blocks with one name 19*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 20*37094b6dSJohn Snow 21*37094b6dSJohn SnowThe most basic usage is by using the `listener()` context manager to 22*37094b6dSJohn Snowconstruct them: 23*37094b6dSJohn Snow 24*37094b6dSJohn Snow.. code:: python 25*37094b6dSJohn Snow 26*37094b6dSJohn Snow with qmp.listener('STOP') as listener: 27*37094b6dSJohn Snow await qmp.execute('stop') 28*37094b6dSJohn Snow await listener.get() 29*37094b6dSJohn Snow 30*37094b6dSJohn SnowThe listener is active only for the duration of the ‘with’ block. This 31*37094b6dSJohn Snowinstance listens only for ‘STOP’ events. 32*37094b6dSJohn Snow 33*37094b6dSJohn Snow 34*37094b6dSJohn Snow`listener()` context blocks with two or more names 35*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 36*37094b6dSJohn Snow 37*37094b6dSJohn SnowMultiple events can be selected for by providing any ``Iterable[str]``: 38*37094b6dSJohn Snow 39*37094b6dSJohn Snow.. code:: python 40*37094b6dSJohn Snow 41*37094b6dSJohn Snow with qmp.listener(('STOP', 'RESUME')) as listener: 42*37094b6dSJohn Snow await qmp.execute('stop') 43*37094b6dSJohn Snow event = await listener.get() 44*37094b6dSJohn Snow assert event['event'] == 'STOP' 45*37094b6dSJohn Snow 46*37094b6dSJohn Snow await qmp.execute('cont') 47*37094b6dSJohn Snow event = await listener.get() 48*37094b6dSJohn Snow assert event['event'] == 'RESUME' 49*37094b6dSJohn Snow 50*37094b6dSJohn Snow 51*37094b6dSJohn Snow`listener()` context blocks with no names 52*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 53*37094b6dSJohn Snow 54*37094b6dSJohn SnowBy omitting names entirely, you can listen to ALL events. 55*37094b6dSJohn Snow 56*37094b6dSJohn Snow.. code:: python 57*37094b6dSJohn Snow 58*37094b6dSJohn Snow with qmp.listener() as listener: 59*37094b6dSJohn Snow await qmp.execute('stop') 60*37094b6dSJohn Snow event = await listener.get() 61*37094b6dSJohn Snow assert event['event'] == 'STOP' 62*37094b6dSJohn Snow 63*37094b6dSJohn SnowThis isn’t a very good use case for this feature: In a non-trivial 64*37094b6dSJohn Snowrunning system, we may not know what event will arrive next. Grabbing 65*37094b6dSJohn Snowthe top of a FIFO queue returning multiple kinds of events may be prone 66*37094b6dSJohn Snowto error. 67*37094b6dSJohn Snow 68*37094b6dSJohn Snow 69*37094b6dSJohn SnowUsing async iterators to retrieve events 70*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 71*37094b6dSJohn Snow 72*37094b6dSJohn SnowIf you’d like to simply watch what events happen to arrive, you can use 73*37094b6dSJohn Snowthe listener as an async iterator: 74*37094b6dSJohn Snow 75*37094b6dSJohn Snow.. code:: python 76*37094b6dSJohn Snow 77*37094b6dSJohn Snow with qmp.listener() as listener: 78*37094b6dSJohn Snow async for event in listener: 79*37094b6dSJohn Snow print(f"Event arrived: {event['event']}") 80*37094b6dSJohn Snow 81*37094b6dSJohn SnowThis is analogous to the following code: 82*37094b6dSJohn Snow 83*37094b6dSJohn Snow.. code:: python 84*37094b6dSJohn Snow 85*37094b6dSJohn Snow with qmp.listener() as listener: 86*37094b6dSJohn Snow while True: 87*37094b6dSJohn Snow event = listener.get() 88*37094b6dSJohn Snow print(f"Event arrived: {event['event']}") 89*37094b6dSJohn Snow 90*37094b6dSJohn SnowThis event stream will never end, so these blocks will never terminate. 91*37094b6dSJohn Snow 92*37094b6dSJohn Snow 93*37094b6dSJohn SnowUsing asyncio.Task to concurrently retrieve events 94*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 95*37094b6dSJohn Snow 96*37094b6dSJohn SnowSince a listener’s event stream will never terminate, it is not likely 97*37094b6dSJohn Snowuseful to use that form in a script. For longer-running clients, we can 98*37094b6dSJohn Snowcreate event handlers by using `asyncio.Task` to create concurrent 99*37094b6dSJohn Snowcoroutines: 100*37094b6dSJohn Snow 101*37094b6dSJohn Snow.. code:: python 102*37094b6dSJohn Snow 103*37094b6dSJohn Snow async def print_events(listener): 104*37094b6dSJohn Snow try: 105*37094b6dSJohn Snow async for event in listener: 106*37094b6dSJohn Snow print(f"Event arrived: {event['event']}") 107*37094b6dSJohn Snow except asyncio.CancelledError: 108*37094b6dSJohn Snow return 109*37094b6dSJohn Snow 110*37094b6dSJohn Snow with qmp.listener() as listener: 111*37094b6dSJohn Snow task = asyncio.Task(print_events(listener)) 112*37094b6dSJohn Snow await qmp.execute('stop') 113*37094b6dSJohn Snow await qmp.execute('cont') 114*37094b6dSJohn Snow task.cancel() 115*37094b6dSJohn Snow await task 116*37094b6dSJohn Snow 117*37094b6dSJohn SnowHowever, there is no guarantee that these events will be received by the 118*37094b6dSJohn Snowtime we leave this context block. Once the context block is exited, the 119*37094b6dSJohn Snowlistener will cease to hear any new events, and becomes inert. 120*37094b6dSJohn Snow 121*37094b6dSJohn SnowBe mindful of the timing: the above example will *probably*– but does 122*37094b6dSJohn Snownot *guarantee*– that both STOP/RESUMED events will be printed. The 123*37094b6dSJohn Snowexample below outlines how to use listeners outside of a context block. 124*37094b6dSJohn Snow 125*37094b6dSJohn Snow 126*37094b6dSJohn SnowUsing `register_listener()` and `remove_listener()` 127*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 128*37094b6dSJohn Snow 129*37094b6dSJohn SnowTo create a listener with a longer lifetime, beyond the scope of a 130*37094b6dSJohn Snowsingle block, create a listener and then call `register_listener()`: 131*37094b6dSJohn Snow 132*37094b6dSJohn Snow.. code:: python 133*37094b6dSJohn Snow 134*37094b6dSJohn Snow class MyClient: 135*37094b6dSJohn Snow def __init__(self, qmp): 136*37094b6dSJohn Snow self.qmp = qmp 137*37094b6dSJohn Snow self.listener = EventListener() 138*37094b6dSJohn Snow 139*37094b6dSJohn Snow async def print_events(self): 140*37094b6dSJohn Snow try: 141*37094b6dSJohn Snow async for event in self.listener: 142*37094b6dSJohn Snow print(f"Event arrived: {event['event']}") 143*37094b6dSJohn Snow except asyncio.CancelledError: 144*37094b6dSJohn Snow return 145*37094b6dSJohn Snow 146*37094b6dSJohn Snow async def run(self): 147*37094b6dSJohn Snow self.task = asyncio.Task(self.print_events) 148*37094b6dSJohn Snow self.qmp.register_listener(self.listener) 149*37094b6dSJohn Snow await qmp.execute('stop') 150*37094b6dSJohn Snow await qmp.execute('cont') 151*37094b6dSJohn Snow 152*37094b6dSJohn Snow async def stop(self): 153*37094b6dSJohn Snow self.task.cancel() 154*37094b6dSJohn Snow await self.task 155*37094b6dSJohn Snow self.qmp.remove_listener(self.listener) 156*37094b6dSJohn Snow 157*37094b6dSJohn SnowThe listener can be deactivated by using `remove_listener()`. When it is 158*37094b6dSJohn Snowremoved, any possible pending events are cleared and it can be 159*37094b6dSJohn Snowre-registered at a later time. 160*37094b6dSJohn Snow 161*37094b6dSJohn Snow 162*37094b6dSJohn SnowUsing the built-in all events listener 163*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 164*37094b6dSJohn Snow 165*37094b6dSJohn SnowThe `QMPClient` object creates its own default listener named 166*37094b6dSJohn Snow:py:obj:`~Events.events` that can be used for the same purpose without 167*37094b6dSJohn Snowhaving to create your own: 168*37094b6dSJohn Snow 169*37094b6dSJohn Snow.. code:: python 170*37094b6dSJohn Snow 171*37094b6dSJohn Snow async def print_events(listener): 172*37094b6dSJohn Snow try: 173*37094b6dSJohn Snow async for event in listener: 174*37094b6dSJohn Snow print(f"Event arrived: {event['event']}") 175*37094b6dSJohn Snow except asyncio.CancelledError: 176*37094b6dSJohn Snow return 177*37094b6dSJohn Snow 178*37094b6dSJohn Snow task = asyncio.Task(print_events(qmp.events)) 179*37094b6dSJohn Snow 180*37094b6dSJohn Snow await qmp.execute('stop') 181*37094b6dSJohn Snow await qmp.execute('cont') 182*37094b6dSJohn Snow 183*37094b6dSJohn Snow task.cancel() 184*37094b6dSJohn Snow await task 185*37094b6dSJohn Snow 186*37094b6dSJohn Snow 187*37094b6dSJohn SnowUsing both .get() and async iterators 188*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 189*37094b6dSJohn Snow 190*37094b6dSJohn SnowThe async iterator and `get()` methods pull events from the same FIFO 191*37094b6dSJohn Snowqueue. If you mix the usage of both, be aware: Events are emitted 192*37094b6dSJohn Snowprecisely once per listener. 193*37094b6dSJohn Snow 194*37094b6dSJohn SnowIf multiple contexts try to pull events from the same listener instance, 195*37094b6dSJohn Snowevents are still emitted only precisely once. 196*37094b6dSJohn Snow 197*37094b6dSJohn SnowThis restriction can be lifted by creating additional listeners. 198*37094b6dSJohn Snow 199*37094b6dSJohn Snow 200*37094b6dSJohn SnowCreating multiple listeners 201*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~ 202*37094b6dSJohn Snow 203*37094b6dSJohn SnowAdditional `EventListener` objects can be created at-will. Each one 204*37094b6dSJohn Snowreceives its own copy of events, with separate FIFO event queues. 205*37094b6dSJohn Snow 206*37094b6dSJohn Snow.. code:: python 207*37094b6dSJohn Snow 208*37094b6dSJohn Snow my_listener = EventListener() 209*37094b6dSJohn Snow qmp.register_listener(my_listener) 210*37094b6dSJohn Snow 211*37094b6dSJohn Snow await qmp.execute('stop') 212*37094b6dSJohn Snow copy1 = await my_listener.get() 213*37094b6dSJohn Snow copy2 = await qmp.events.get() 214*37094b6dSJohn Snow 215*37094b6dSJohn Snow assert copy1 == copy2 216*37094b6dSJohn Snow 217*37094b6dSJohn SnowIn this example, we await an event from both a user-created 218*37094b6dSJohn Snow`EventListener` and the built-in events listener. Both receive the same 219*37094b6dSJohn Snowevent. 220*37094b6dSJohn Snow 221*37094b6dSJohn Snow 222*37094b6dSJohn SnowClearing listeners 223*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~ 224*37094b6dSJohn Snow 225*37094b6dSJohn Snow`EventListener` objects can be cleared, clearing all events seen thus far: 226*37094b6dSJohn Snow 227*37094b6dSJohn Snow.. code:: python 228*37094b6dSJohn Snow 229*37094b6dSJohn Snow await qmp.execute('stop') 230*37094b6dSJohn Snow qmp.events.clear() 231*37094b6dSJohn Snow await qmp.execute('cont') 232*37094b6dSJohn Snow event = await qmp.events.get() 233*37094b6dSJohn Snow assert event['event'] == 'RESUME' 234*37094b6dSJohn Snow 235*37094b6dSJohn Snow`EventListener` objects are FIFO queues. If events are not consumed, 236*37094b6dSJohn Snowthey will remain in the queue until they are witnessed or discarded via 237*37094b6dSJohn Snow`clear()`. FIFO queues will be drained automatically upon leaving a 238*37094b6dSJohn Snowcontext block, or when calling `remove_listener()`. 239*37094b6dSJohn Snow 240*37094b6dSJohn Snow 241*37094b6dSJohn SnowAccessing listener history 242*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~ 243*37094b6dSJohn Snow 244*37094b6dSJohn Snow`EventListener` objects record their history. Even after being cleared, 245*37094b6dSJohn Snowyou can obtain a record of all events seen so far: 246*37094b6dSJohn Snow 247*37094b6dSJohn Snow.. code:: python 248*37094b6dSJohn Snow 249*37094b6dSJohn Snow await qmp.execute('stop') 250*37094b6dSJohn Snow await qmp.execute('cont') 251*37094b6dSJohn Snow qmp.events.clear() 252*37094b6dSJohn Snow 253*37094b6dSJohn Snow assert len(qmp.events.history) == 2 254*37094b6dSJohn Snow assert qmp.events.history[0]['event'] == 'STOP' 255*37094b6dSJohn Snow assert qmp.events.history[1]['event'] == 'RESUME' 256*37094b6dSJohn Snow 257*37094b6dSJohn SnowThe history is updated immediately and does not require the event to be 258*37094b6dSJohn Snowwitnessed first. 259*37094b6dSJohn Snow 260*37094b6dSJohn Snow 261*37094b6dSJohn SnowUsing event filters 262*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~ 263*37094b6dSJohn Snow 264*37094b6dSJohn Snow`EventListener` objects can be given complex filtering criteria if names 265*37094b6dSJohn Snoware not sufficient: 266*37094b6dSJohn Snow 267*37094b6dSJohn Snow.. code:: python 268*37094b6dSJohn Snow 269*37094b6dSJohn Snow def job1_filter(event) -> bool: 270*37094b6dSJohn Snow event_data = event.get('data', {}) 271*37094b6dSJohn Snow event_job_id = event_data.get('id') 272*37094b6dSJohn Snow return event_job_id == "job1" 273*37094b6dSJohn Snow 274*37094b6dSJohn Snow with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener: 275*37094b6dSJohn Snow await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...}) 276*37094b6dSJohn Snow async for event in listener: 277*37094b6dSJohn Snow if event['data']['status'] == 'concluded': 278*37094b6dSJohn Snow break 279*37094b6dSJohn Snow 280*37094b6dSJohn SnowThese filters might be most useful when parameterized. `EventListener` 281*37094b6dSJohn Snowobjects expect a function that takes only a single argument (the raw 282*37094b6dSJohn Snowevent, as a `Message`) and returns a bool; True if the event should be 283*37094b6dSJohn Snowaccepted into the stream. You can create a function that adapts this 284*37094b6dSJohn Snowsignature to accept configuration parameters: 285*37094b6dSJohn Snow 286*37094b6dSJohn Snow.. code:: python 287*37094b6dSJohn Snow 288*37094b6dSJohn Snow def job_filter(job_id: str) -> EventFilter: 289*37094b6dSJohn Snow def filter(event: Message) -> bool: 290*37094b6dSJohn Snow return event['data']['id'] == job_id 291*37094b6dSJohn Snow return filter 292*37094b6dSJohn Snow 293*37094b6dSJohn Snow with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener: 294*37094b6dSJohn Snow await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...}) 295*37094b6dSJohn Snow async for event in listener: 296*37094b6dSJohn Snow if event['data']['status'] == 'concluded': 297*37094b6dSJohn Snow break 298*37094b6dSJohn Snow 299*37094b6dSJohn Snow 300*37094b6dSJohn SnowActivating an existing listener with `listen()` 301*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 302*37094b6dSJohn Snow 303*37094b6dSJohn SnowListeners with complex, long configurations can also be created manually 304*37094b6dSJohn Snowand activated temporarily by using `listen()` instead of `listener()`: 305*37094b6dSJohn Snow 306*37094b6dSJohn Snow.. code:: python 307*37094b6dSJohn Snow 308*37094b6dSJohn Snow listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', 309*37094b6dSJohn Snow 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 310*37094b6dSJohn Snow 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) 311*37094b6dSJohn Snow 312*37094b6dSJohn Snow with qmp.listen(listener): 313*37094b6dSJohn Snow await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...}) 314*37094b6dSJohn Snow async for event in listener: 315*37094b6dSJohn Snow print(event) 316*37094b6dSJohn Snow if event['event'] == 'BLOCK_JOB_COMPLETED': 317*37094b6dSJohn Snow break 318*37094b6dSJohn Snow 319*37094b6dSJohn SnowAny events that are not witnessed by the time the block is left will be 320*37094b6dSJohn Snowcleared from the queue; entering the block is an implicit 321*37094b6dSJohn Snow`register_listener()` and leaving the block is an implicit 322*37094b6dSJohn Snow`remove_listener()`. 323*37094b6dSJohn Snow 324*37094b6dSJohn Snow 325*37094b6dSJohn SnowActivating multiple existing listeners with `listen()` 326*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 327*37094b6dSJohn Snow 328*37094b6dSJohn SnowWhile `listener()` is only capable of creating a single listener, 329*37094b6dSJohn Snow`listen()` is capable of activating multiple listeners simultaneously: 330*37094b6dSJohn Snow 331*37094b6dSJohn Snow.. code:: python 332*37094b6dSJohn Snow 333*37094b6dSJohn Snow def job_filter(job_id: str) -> EventFilter: 334*37094b6dSJohn Snow def filter(event: Message) -> bool: 335*37094b6dSJohn Snow return event['data']['id'] == job_id 336*37094b6dSJohn Snow return filter 337*37094b6dSJohn Snow 338*37094b6dSJohn Snow jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA')) 339*37094b6dSJohn Snow jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB')) 340*37094b6dSJohn Snow 341*37094b6dSJohn Snow with qmp.listen(jobA, jobB): 342*37094b6dSJohn Snow qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...}) 343*37094b6dSJohn Snow qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...}) 344*37094b6dSJohn Snow 345*37094b6dSJohn Snow async for event in jobA.get(): 346*37094b6dSJohn Snow if event['data']['status'] == 'concluded': 347*37094b6dSJohn Snow break 348*37094b6dSJohn Snow async for event in jobB.get(): 349*37094b6dSJohn Snow if event['data']['status'] == 'concluded': 350*37094b6dSJohn Snow break 351*37094b6dSJohn Snow 352*37094b6dSJohn Snow 353*37094b6dSJohn SnowExtending the `EventListener` class 354*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 355*37094b6dSJohn Snow 356*37094b6dSJohn SnowIn the case that a more specialized `EventListener` is desired to 357*37094b6dSJohn Snowprovide either more functionality or more compact syntax for specialized 358*37094b6dSJohn Snowcases, it can be extended. 359*37094b6dSJohn Snow 360*37094b6dSJohn SnowOne of the key methods to extend or override is 361*37094b6dSJohn Snow:py:meth:`~EventListener.accept()`. The default implementation checks an 362*37094b6dSJohn Snowincoming message for: 363*37094b6dSJohn Snow 364*37094b6dSJohn Snow1. A qualifying name, if any :py:obj:`~EventListener.names` were 365*37094b6dSJohn Snow specified at initialization time 366*37094b6dSJohn Snow2. That :py:obj:`~EventListener.event_filter()` returns True. 367*37094b6dSJohn Snow 368*37094b6dSJohn SnowThis can be modified however you see fit to change the criteria for 369*37094b6dSJohn Snowinclusion in the stream. 370*37094b6dSJohn Snow 371*37094b6dSJohn SnowFor convenience, a ``JobListener`` class could be created that simply 372*37094b6dSJohn Snowbakes in configuration so it does not need to be repeated: 373*37094b6dSJohn Snow 374*37094b6dSJohn Snow.. code:: python 375*37094b6dSJohn Snow 376*37094b6dSJohn Snow class JobListener(EventListener): 377*37094b6dSJohn Snow def __init__(self, job_id: str): 378*37094b6dSJohn Snow super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED', 379*37094b6dSJohn Snow 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY', 380*37094b6dSJohn Snow 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE')) 381*37094b6dSJohn Snow self.job_id = job_id 382*37094b6dSJohn Snow 383*37094b6dSJohn Snow def accept(self, event) -> bool: 384*37094b6dSJohn Snow if not super().accept(event): 385*37094b6dSJohn Snow return False 386*37094b6dSJohn Snow if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'): 387*37094b6dSJohn Snow return event['data']['id'] == job_id 388*37094b6dSJohn Snow return event['data']['device'] == job_id 389*37094b6dSJohn Snow 390*37094b6dSJohn SnowFrom here on out, you can conjure up a custom-purpose listener that 391*37094b6dSJohn Snowlistens only for job-related events for a specific job-id easily: 392*37094b6dSJohn Snow 393*37094b6dSJohn Snow.. code:: python 394*37094b6dSJohn Snow 395*37094b6dSJohn Snow listener = JobListener('job4') 396*37094b6dSJohn Snow with qmp.listener(listener): 397*37094b6dSJohn Snow await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...}) 398*37094b6dSJohn Snow async for event in listener: 399*37094b6dSJohn Snow print(event) 400*37094b6dSJohn Snow if event['event'] == 'BLOCK_JOB_COMPLETED': 401*37094b6dSJohn Snow break 402*37094b6dSJohn Snow 403*37094b6dSJohn Snow 404*37094b6dSJohn SnowExperimental Interfaces & Design Issues 405*37094b6dSJohn Snow--------------------------------------- 406*37094b6dSJohn Snow 407*37094b6dSJohn SnowThese interfaces are not ones I am sure I will keep or otherwise modify 408*37094b6dSJohn Snowheavily. 409*37094b6dSJohn Snow 410*37094b6dSJohn Snowqmp.listener()’s type signature 411*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 412*37094b6dSJohn Snow 413*37094b6dSJohn Snow`listener()` does not return anything, because it was assumed the caller 414*37094b6dSJohn Snowalready had a handle to the listener. However, for 415*37094b6dSJohn Snow``qmp.listener(EventListener())`` forms, the caller will not have saved 416*37094b6dSJohn Snowa handle to the listener. 417*37094b6dSJohn Snow 418*37094b6dSJohn SnowBecause this function can accept *many* listeners, I found it hard to 419*37094b6dSJohn Snowaccurately type in a way where it could be used in both “one” or “many” 420*37094b6dSJohn Snowforms conveniently and in a statically type-safe manner. 421*37094b6dSJohn Snow 422*37094b6dSJohn SnowUltimately, I removed the return altogether, but perhaps with more time 423*37094b6dSJohn SnowI can work out a way to re-add it. 424*37094b6dSJohn Snow 425*37094b6dSJohn Snow 426*37094b6dSJohn SnowAPI Reference 427*37094b6dSJohn Snow------------- 428*37094b6dSJohn Snow 429*37094b6dSJohn Snow""" 430*37094b6dSJohn Snow 431*37094b6dSJohn Snowimport asyncio 432*37094b6dSJohn Snowfrom contextlib import contextmanager 433*37094b6dSJohn Snowimport logging 434*37094b6dSJohn Snowfrom typing import ( 435*37094b6dSJohn Snow AsyncIterator, 436*37094b6dSJohn Snow Callable, 437*37094b6dSJohn Snow Iterable, 438*37094b6dSJohn Snow Iterator, 439*37094b6dSJohn Snow List, 440*37094b6dSJohn Snow Optional, 441*37094b6dSJohn Snow Set, 442*37094b6dSJohn Snow Tuple, 443*37094b6dSJohn Snow Union, 444*37094b6dSJohn Snow) 445*37094b6dSJohn Snow 446*37094b6dSJohn Snowfrom .error import QMPError 447*37094b6dSJohn Snowfrom .message import Message 448*37094b6dSJohn Snow 449*37094b6dSJohn Snow 450*37094b6dSJohn SnowEventNames = Union[str, Iterable[str], None] 451*37094b6dSJohn SnowEventFilter = Callable[[Message], bool] 452*37094b6dSJohn Snow 453*37094b6dSJohn Snow 454*37094b6dSJohn Snowclass ListenerError(QMPError): 455*37094b6dSJohn Snow """ 456*37094b6dSJohn Snow Generic error class for `EventListener`-related problems. 457*37094b6dSJohn Snow """ 458*37094b6dSJohn Snow 459*37094b6dSJohn Snow 460*37094b6dSJohn Snowclass EventListener: 461*37094b6dSJohn Snow """ 462*37094b6dSJohn Snow Selectively listens for events with runtime configurable filtering. 463*37094b6dSJohn Snow 464*37094b6dSJohn Snow This class is designed to be directly usable for the most common cases, 465*37094b6dSJohn Snow but it can be extended to provide more rigorous control. 466*37094b6dSJohn Snow 467*37094b6dSJohn Snow :param names: 468*37094b6dSJohn Snow One or more names of events to listen for. 469*37094b6dSJohn Snow When not provided, listen for ALL events. 470*37094b6dSJohn Snow :param event_filter: 471*37094b6dSJohn Snow An optional event filtering function. 472*37094b6dSJohn Snow When names are also provided, this acts as a secondary filter. 473*37094b6dSJohn Snow 474*37094b6dSJohn Snow When ``names`` and ``event_filter`` are both provided, the names 475*37094b6dSJohn Snow will be filtered first, and then the filter function will be called 476*37094b6dSJohn Snow second. The event filter function can assume that the format of the 477*37094b6dSJohn Snow event is a known format. 478*37094b6dSJohn Snow """ 479*37094b6dSJohn Snow def __init__( 480*37094b6dSJohn Snow self, 481*37094b6dSJohn Snow names: EventNames = None, 482*37094b6dSJohn Snow event_filter: Optional[EventFilter] = None, 483*37094b6dSJohn Snow ): 484*37094b6dSJohn Snow # Queue of 'heard' events yet to be witnessed by a caller. 485*37094b6dSJohn Snow self._queue: 'asyncio.Queue[Message]' = asyncio.Queue() 486*37094b6dSJohn Snow 487*37094b6dSJohn Snow # Intended as a historical record, NOT a processing queue or backlog. 488*37094b6dSJohn Snow self._history: List[Message] = [] 489*37094b6dSJohn Snow 490*37094b6dSJohn Snow #: Primary event filter, based on one or more event names. 491*37094b6dSJohn Snow self.names: Set[str] = set() 492*37094b6dSJohn Snow if isinstance(names, str): 493*37094b6dSJohn Snow self.names.add(names) 494*37094b6dSJohn Snow elif names is not None: 495*37094b6dSJohn Snow self.names.update(names) 496*37094b6dSJohn Snow 497*37094b6dSJohn Snow #: Optional, secondary event filter. 498*37094b6dSJohn Snow self.event_filter: Optional[EventFilter] = event_filter 499*37094b6dSJohn Snow 500*37094b6dSJohn Snow @property 501*37094b6dSJohn Snow def history(self) -> Tuple[Message, ...]: 502*37094b6dSJohn Snow """ 503*37094b6dSJohn Snow A read-only history of all events seen so far. 504*37094b6dSJohn Snow 505*37094b6dSJohn Snow This represents *every* event, including those not yet witnessed 506*37094b6dSJohn Snow via `get()` or ``async for``. It persists between `clear()` 507*37094b6dSJohn Snow calls and is immutable. 508*37094b6dSJohn Snow """ 509*37094b6dSJohn Snow return tuple(self._history) 510*37094b6dSJohn Snow 511*37094b6dSJohn Snow def accept(self, event: Message) -> bool: 512*37094b6dSJohn Snow """ 513*37094b6dSJohn Snow Determine if this listener accepts this event. 514*37094b6dSJohn Snow 515*37094b6dSJohn Snow This method determines which events will appear in the stream. 516*37094b6dSJohn Snow The default implementation simply checks the event against the 517*37094b6dSJohn Snow list of names and the event_filter to decide if this 518*37094b6dSJohn Snow `EventListener` accepts a given event. It can be 519*37094b6dSJohn Snow overridden/extended to provide custom listener behavior. 520*37094b6dSJohn Snow 521*37094b6dSJohn Snow User code is not expected to need to invoke this method. 522*37094b6dSJohn Snow 523*37094b6dSJohn Snow :param event: The event under consideration. 524*37094b6dSJohn Snow :return: `True`, if this listener accepts this event. 525*37094b6dSJohn Snow """ 526*37094b6dSJohn Snow name_ok = (not self.names) or (event['event'] in self.names) 527*37094b6dSJohn Snow return name_ok and ( 528*37094b6dSJohn Snow (not self.event_filter) or self.event_filter(event) 529*37094b6dSJohn Snow ) 530*37094b6dSJohn Snow 531*37094b6dSJohn Snow async def put(self, event: Message) -> None: 532*37094b6dSJohn Snow """ 533*37094b6dSJohn Snow Conditionally put a new event into the FIFO queue. 534*37094b6dSJohn Snow 535*37094b6dSJohn Snow This method is not designed to be invoked from user code, and it 536*37094b6dSJohn Snow should not need to be overridden. It is a public interface so 537*37094b6dSJohn Snow that `QMPClient` has an interface by which it can inform 538*37094b6dSJohn Snow registered listeners of new events. 539*37094b6dSJohn Snow 540*37094b6dSJohn Snow The event will be put into the queue if 541*37094b6dSJohn Snow :py:meth:`~EventListener.accept()` returns `True`. 542*37094b6dSJohn Snow 543*37094b6dSJohn Snow :param event: The new event to put into the FIFO queue. 544*37094b6dSJohn Snow """ 545*37094b6dSJohn Snow if not self.accept(event): 546*37094b6dSJohn Snow return 547*37094b6dSJohn Snow 548*37094b6dSJohn Snow self._history.append(event) 549*37094b6dSJohn Snow await self._queue.put(event) 550*37094b6dSJohn Snow 551*37094b6dSJohn Snow async def get(self) -> Message: 552*37094b6dSJohn Snow """ 553*37094b6dSJohn Snow Wait for the very next event in this stream. 554*37094b6dSJohn Snow 555*37094b6dSJohn Snow If one is already available, return that one. 556*37094b6dSJohn Snow """ 557*37094b6dSJohn Snow return await self._queue.get() 558*37094b6dSJohn Snow 559*37094b6dSJohn Snow def empty(self) -> bool: 560*37094b6dSJohn Snow """ 561*37094b6dSJohn Snow Return `True` if there are no pending events. 562*37094b6dSJohn Snow """ 563*37094b6dSJohn Snow return self._queue.empty() 564*37094b6dSJohn Snow 565*37094b6dSJohn Snow def clear(self) -> List[Message]: 566*37094b6dSJohn Snow """ 567*37094b6dSJohn Snow Clear this listener of all pending events. 568*37094b6dSJohn Snow 569*37094b6dSJohn Snow Called when an `EventListener` is being unregistered, this clears the 570*37094b6dSJohn Snow pending FIFO queue synchronously. It can be also be used to 571*37094b6dSJohn Snow manually clear any pending events, if desired. 572*37094b6dSJohn Snow 573*37094b6dSJohn Snow :return: The cleared events, if any. 574*37094b6dSJohn Snow 575*37094b6dSJohn Snow .. warning:: 576*37094b6dSJohn Snow Take care when discarding events. Cleared events will be 577*37094b6dSJohn Snow silently tossed on the floor. All events that were ever 578*37094b6dSJohn Snow accepted by this listener are visible in `history()`. 579*37094b6dSJohn Snow """ 580*37094b6dSJohn Snow events = [] 581*37094b6dSJohn Snow while True: 582*37094b6dSJohn Snow try: 583*37094b6dSJohn Snow events.append(self._queue.get_nowait()) 584*37094b6dSJohn Snow except asyncio.QueueEmpty: 585*37094b6dSJohn Snow break 586*37094b6dSJohn Snow 587*37094b6dSJohn Snow return events 588*37094b6dSJohn Snow 589*37094b6dSJohn Snow def __aiter__(self) -> AsyncIterator[Message]: 590*37094b6dSJohn Snow return self 591*37094b6dSJohn Snow 592*37094b6dSJohn Snow async def __anext__(self) -> Message: 593*37094b6dSJohn Snow """ 594*37094b6dSJohn Snow Enables the `EventListener` to function as an async iterator. 595*37094b6dSJohn Snow 596*37094b6dSJohn Snow It may be used like this: 597*37094b6dSJohn Snow 598*37094b6dSJohn Snow .. code:: python 599*37094b6dSJohn Snow 600*37094b6dSJohn Snow async for event in listener: 601*37094b6dSJohn Snow print(event) 602*37094b6dSJohn Snow 603*37094b6dSJohn Snow These iterators will never terminate of their own accord; you 604*37094b6dSJohn Snow must provide break conditions or otherwise prepare to run them 605*37094b6dSJohn Snow in an `asyncio.Task` that can be cancelled. 606*37094b6dSJohn Snow """ 607*37094b6dSJohn Snow return await self.get() 608*37094b6dSJohn Snow 609*37094b6dSJohn Snow 610*37094b6dSJohn Snowclass Events: 611*37094b6dSJohn Snow """ 612*37094b6dSJohn Snow Events is a mix-in class that adds event functionality to the QMP class. 613*37094b6dSJohn Snow 614*37094b6dSJohn Snow It's designed specifically as a mix-in for `QMPClient`, and it 615*37094b6dSJohn Snow relies upon the class it is being mixed into having a 'logger' 616*37094b6dSJohn Snow property. 617*37094b6dSJohn Snow """ 618*37094b6dSJohn Snow def __init__(self) -> None: 619*37094b6dSJohn Snow self._listeners: List[EventListener] = [] 620*37094b6dSJohn Snow 621*37094b6dSJohn Snow #: Default, all-events `EventListener`. 622*37094b6dSJohn Snow self.events: EventListener = EventListener() 623*37094b6dSJohn Snow self.register_listener(self.events) 624*37094b6dSJohn Snow 625*37094b6dSJohn Snow # Parent class needs to have a logger 626*37094b6dSJohn Snow self.logger: logging.Logger 627*37094b6dSJohn Snow 628*37094b6dSJohn Snow async def _event_dispatch(self, msg: Message) -> None: 629*37094b6dSJohn Snow """ 630*37094b6dSJohn Snow Given a new event, propagate it to all of the active listeners. 631*37094b6dSJohn Snow 632*37094b6dSJohn Snow :param msg: The event to propagate. 633*37094b6dSJohn Snow """ 634*37094b6dSJohn Snow for listener in self._listeners: 635*37094b6dSJohn Snow await listener.put(msg) 636*37094b6dSJohn Snow 637*37094b6dSJohn Snow def register_listener(self, listener: EventListener) -> None: 638*37094b6dSJohn Snow """ 639*37094b6dSJohn Snow Register and activate an `EventListener`. 640*37094b6dSJohn Snow 641*37094b6dSJohn Snow :param listener: The listener to activate. 642*37094b6dSJohn Snow :raise ListenerError: If the given listener is already registered. 643*37094b6dSJohn Snow """ 644*37094b6dSJohn Snow if listener in self._listeners: 645*37094b6dSJohn Snow raise ListenerError("Attempted to re-register existing listener") 646*37094b6dSJohn Snow self.logger.debug("Registering %s.", str(listener)) 647*37094b6dSJohn Snow self._listeners.append(listener) 648*37094b6dSJohn Snow 649*37094b6dSJohn Snow def remove_listener(self, listener: EventListener) -> None: 650*37094b6dSJohn Snow """ 651*37094b6dSJohn Snow Unregister and deactivate an `EventListener`. 652*37094b6dSJohn Snow 653*37094b6dSJohn Snow The removed listener will have its pending events cleared via 654*37094b6dSJohn Snow `clear()`. The listener can be re-registered later when 655*37094b6dSJohn Snow desired. 656*37094b6dSJohn Snow 657*37094b6dSJohn Snow :param listener: The listener to deactivate. 658*37094b6dSJohn Snow :raise ListenerError: If the given listener is not registered. 659*37094b6dSJohn Snow """ 660*37094b6dSJohn Snow if listener == self.events: 661*37094b6dSJohn Snow raise ListenerError("Cannot remove the default listener.") 662*37094b6dSJohn Snow self.logger.debug("Removing %s.", str(listener)) 663*37094b6dSJohn Snow listener.clear() 664*37094b6dSJohn Snow self._listeners.remove(listener) 665*37094b6dSJohn Snow 666*37094b6dSJohn Snow @contextmanager 667*37094b6dSJohn Snow def listen(self, *listeners: EventListener) -> Iterator[None]: 668*37094b6dSJohn Snow r""" 669*37094b6dSJohn Snow Context manager: Temporarily listen with an `EventListener`. 670*37094b6dSJohn Snow 671*37094b6dSJohn Snow Accepts one or more `EventListener` objects and registers them, 672*37094b6dSJohn Snow activating them for the duration of the context block. 673*37094b6dSJohn Snow 674*37094b6dSJohn Snow `EventListener` objects will have any pending events in their 675*37094b6dSJohn Snow FIFO queue cleared upon exiting the context block, when they are 676*37094b6dSJohn Snow deactivated. 677*37094b6dSJohn Snow 678*37094b6dSJohn Snow :param \*listeners: One or more EventListeners to activate. 679*37094b6dSJohn Snow :raise ListenerError: If the given listener(s) are already active. 680*37094b6dSJohn Snow """ 681*37094b6dSJohn Snow _added = [] 682*37094b6dSJohn Snow 683*37094b6dSJohn Snow try: 684*37094b6dSJohn Snow for listener in listeners: 685*37094b6dSJohn Snow self.register_listener(listener) 686*37094b6dSJohn Snow _added.append(listener) 687*37094b6dSJohn Snow 688*37094b6dSJohn Snow yield 689*37094b6dSJohn Snow 690*37094b6dSJohn Snow finally: 691*37094b6dSJohn Snow for listener in _added: 692*37094b6dSJohn Snow self.remove_listener(listener) 693*37094b6dSJohn Snow 694*37094b6dSJohn Snow @contextmanager 695*37094b6dSJohn Snow def listener( 696*37094b6dSJohn Snow self, 697*37094b6dSJohn Snow names: EventNames = (), 698*37094b6dSJohn Snow event_filter: Optional[EventFilter] = None 699*37094b6dSJohn Snow ) -> Iterator[EventListener]: 700*37094b6dSJohn Snow """ 701*37094b6dSJohn Snow Context manager: Temporarily listen with a new `EventListener`. 702*37094b6dSJohn Snow 703*37094b6dSJohn Snow Creates an `EventListener` object and registers it, activating 704*37094b6dSJohn Snow it for the duration of the context block. 705*37094b6dSJohn Snow 706*37094b6dSJohn Snow :param names: 707*37094b6dSJohn Snow One or more names of events to listen for. 708*37094b6dSJohn Snow When not provided, listen for ALL events. 709*37094b6dSJohn Snow :param event_filter: 710*37094b6dSJohn Snow An optional event filtering function. 711*37094b6dSJohn Snow When names are also provided, this acts as a secondary filter. 712*37094b6dSJohn Snow 713*37094b6dSJohn Snow :return: The newly created and active `EventListener`. 714*37094b6dSJohn Snow """ 715*37094b6dSJohn Snow listener = EventListener(names, event_filter) 716*37094b6dSJohn Snow with self.listen(listener): 717*37094b6dSJohn Snow yield listener 718