xref: /openbmc/qemu/python/qemu/qmp/events.py (revision 37094b6d)
1*37094b6dSJohn Snow"""
2*37094b6dSJohn SnowQMP Events and EventListeners
3*37094b6dSJohn Snow
4*37094b6dSJohn SnowAsynchronous QMP uses `EventListener` objects to listen for events. An
5*37094b6dSJohn Snow`EventListener` is a FIFO event queue that can be pre-filtered to listen
6*37094b6dSJohn Snowfor only specific events. Each `EventListener` instance receives its own
7*37094b6dSJohn Snowcopy of events that it hears, so events may be consumed without fear or
8*37094b6dSJohn Snowworry for depriving other listeners of events they need to hear.
9*37094b6dSJohn Snow
10*37094b6dSJohn Snow
11*37094b6dSJohn SnowEventListener Tutorial
12*37094b6dSJohn Snow----------------------
13*37094b6dSJohn Snow
14*37094b6dSJohn SnowIn all of the following examples, we assume that we have a `QMPClient`
15*37094b6dSJohn Snowinstantiated named ``qmp`` that is already connected.
16*37094b6dSJohn Snow
17*37094b6dSJohn Snow
18*37094b6dSJohn Snow`listener()` context blocks with one name
19*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
20*37094b6dSJohn Snow
21*37094b6dSJohn SnowThe most basic usage is by using the `listener()` context manager to
22*37094b6dSJohn Snowconstruct them:
23*37094b6dSJohn Snow
24*37094b6dSJohn Snow.. code:: python
25*37094b6dSJohn Snow
26*37094b6dSJohn Snow   with qmp.listener('STOP') as listener:
27*37094b6dSJohn Snow       await qmp.execute('stop')
28*37094b6dSJohn Snow       await listener.get()
29*37094b6dSJohn Snow
30*37094b6dSJohn SnowThe listener is active only for the duration of the ‘with’ block. This
31*37094b6dSJohn Snowinstance listens only for ‘STOP’ events.
32*37094b6dSJohn Snow
33*37094b6dSJohn Snow
34*37094b6dSJohn Snow`listener()` context blocks with two or more names
35*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
36*37094b6dSJohn Snow
37*37094b6dSJohn SnowMultiple events can be selected for by providing any ``Iterable[str]``:
38*37094b6dSJohn Snow
39*37094b6dSJohn Snow.. code:: python
40*37094b6dSJohn Snow
41*37094b6dSJohn Snow   with qmp.listener(('STOP', 'RESUME')) as listener:
42*37094b6dSJohn Snow       await qmp.execute('stop')
43*37094b6dSJohn Snow       event = await listener.get()
44*37094b6dSJohn Snow       assert event['event'] == 'STOP'
45*37094b6dSJohn Snow
46*37094b6dSJohn Snow       await qmp.execute('cont')
47*37094b6dSJohn Snow       event = await listener.get()
48*37094b6dSJohn Snow       assert event['event'] == 'RESUME'
49*37094b6dSJohn Snow
50*37094b6dSJohn Snow
51*37094b6dSJohn Snow`listener()` context blocks with no names
52*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
53*37094b6dSJohn Snow
54*37094b6dSJohn SnowBy omitting names entirely, you can listen to ALL events.
55*37094b6dSJohn Snow
56*37094b6dSJohn Snow.. code:: python
57*37094b6dSJohn Snow
58*37094b6dSJohn Snow   with qmp.listener() as listener:
59*37094b6dSJohn Snow       await qmp.execute('stop')
60*37094b6dSJohn Snow       event = await listener.get()
61*37094b6dSJohn Snow       assert event['event'] == 'STOP'
62*37094b6dSJohn Snow
63*37094b6dSJohn SnowThis isn’t a very good use case for this feature: In a non-trivial
64*37094b6dSJohn Snowrunning system, we may not know what event will arrive next. Grabbing
65*37094b6dSJohn Snowthe top of a FIFO queue returning multiple kinds of events may be prone
66*37094b6dSJohn Snowto error.
67*37094b6dSJohn Snow
68*37094b6dSJohn Snow
69*37094b6dSJohn SnowUsing async iterators to retrieve events
70*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
71*37094b6dSJohn Snow
72*37094b6dSJohn SnowIf you’d like to simply watch what events happen to arrive, you can use
73*37094b6dSJohn Snowthe listener as an async iterator:
74*37094b6dSJohn Snow
75*37094b6dSJohn Snow.. code:: python
76*37094b6dSJohn Snow
77*37094b6dSJohn Snow   with qmp.listener() as listener:
78*37094b6dSJohn Snow       async for event in listener:
79*37094b6dSJohn Snow           print(f"Event arrived: {event['event']}")
80*37094b6dSJohn Snow
81*37094b6dSJohn SnowThis is analogous to the following code:
82*37094b6dSJohn Snow
83*37094b6dSJohn Snow.. code:: python
84*37094b6dSJohn Snow
85*37094b6dSJohn Snow   with qmp.listener() as listener:
86*37094b6dSJohn Snow       while True:
87*37094b6dSJohn Snow           event = listener.get()
88*37094b6dSJohn Snow           print(f"Event arrived: {event['event']}")
89*37094b6dSJohn Snow
90*37094b6dSJohn SnowThis event stream will never end, so these blocks will never terminate.
91*37094b6dSJohn Snow
92*37094b6dSJohn Snow
93*37094b6dSJohn SnowUsing asyncio.Task to concurrently retrieve events
94*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
95*37094b6dSJohn Snow
96*37094b6dSJohn SnowSince a listener’s event stream will never terminate, it is not likely
97*37094b6dSJohn Snowuseful to use that form in a script. For longer-running clients, we can
98*37094b6dSJohn Snowcreate event handlers by using `asyncio.Task` to create concurrent
99*37094b6dSJohn Snowcoroutines:
100*37094b6dSJohn Snow
101*37094b6dSJohn Snow.. code:: python
102*37094b6dSJohn Snow
103*37094b6dSJohn Snow   async def print_events(listener):
104*37094b6dSJohn Snow       try:
105*37094b6dSJohn Snow           async for event in listener:
106*37094b6dSJohn Snow               print(f"Event arrived: {event['event']}")
107*37094b6dSJohn Snow       except asyncio.CancelledError:
108*37094b6dSJohn Snow           return
109*37094b6dSJohn Snow
110*37094b6dSJohn Snow   with qmp.listener() as listener:
111*37094b6dSJohn Snow       task = asyncio.Task(print_events(listener))
112*37094b6dSJohn Snow       await qmp.execute('stop')
113*37094b6dSJohn Snow       await qmp.execute('cont')
114*37094b6dSJohn Snow       task.cancel()
115*37094b6dSJohn Snow       await task
116*37094b6dSJohn Snow
117*37094b6dSJohn SnowHowever, there is no guarantee that these events will be received by the
118*37094b6dSJohn Snowtime we leave this context block. Once the context block is exited, the
119*37094b6dSJohn Snowlistener will cease to hear any new events, and becomes inert.
120*37094b6dSJohn Snow
121*37094b6dSJohn SnowBe mindful of the timing: the above example will *probably*– but does
122*37094b6dSJohn Snownot *guarantee*– that both STOP/RESUMED events will be printed. The
123*37094b6dSJohn Snowexample below outlines how to use listeners outside of a context block.
124*37094b6dSJohn Snow
125*37094b6dSJohn Snow
126*37094b6dSJohn SnowUsing `register_listener()` and `remove_listener()`
127*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
128*37094b6dSJohn Snow
129*37094b6dSJohn SnowTo create a listener with a longer lifetime, beyond the scope of a
130*37094b6dSJohn Snowsingle block, create a listener and then call `register_listener()`:
131*37094b6dSJohn Snow
132*37094b6dSJohn Snow.. code:: python
133*37094b6dSJohn Snow
134*37094b6dSJohn Snow   class MyClient:
135*37094b6dSJohn Snow       def __init__(self, qmp):
136*37094b6dSJohn Snow           self.qmp = qmp
137*37094b6dSJohn Snow           self.listener = EventListener()
138*37094b6dSJohn Snow
139*37094b6dSJohn Snow       async def print_events(self):
140*37094b6dSJohn Snow           try:
141*37094b6dSJohn Snow               async for event in self.listener:
142*37094b6dSJohn Snow                   print(f"Event arrived: {event['event']}")
143*37094b6dSJohn Snow           except asyncio.CancelledError:
144*37094b6dSJohn Snow               return
145*37094b6dSJohn Snow
146*37094b6dSJohn Snow       async def run(self):
147*37094b6dSJohn Snow           self.task = asyncio.Task(self.print_events)
148*37094b6dSJohn Snow           self.qmp.register_listener(self.listener)
149*37094b6dSJohn Snow           await qmp.execute('stop')
150*37094b6dSJohn Snow           await qmp.execute('cont')
151*37094b6dSJohn Snow
152*37094b6dSJohn Snow       async def stop(self):
153*37094b6dSJohn Snow           self.task.cancel()
154*37094b6dSJohn Snow           await self.task
155*37094b6dSJohn Snow           self.qmp.remove_listener(self.listener)
156*37094b6dSJohn Snow
157*37094b6dSJohn SnowThe listener can be deactivated by using `remove_listener()`. When it is
158*37094b6dSJohn Snowremoved, any possible pending events are cleared and it can be
159*37094b6dSJohn Snowre-registered at a later time.
160*37094b6dSJohn Snow
161*37094b6dSJohn Snow
162*37094b6dSJohn SnowUsing the built-in all events listener
163*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
164*37094b6dSJohn Snow
165*37094b6dSJohn SnowThe `QMPClient` object creates its own default listener named
166*37094b6dSJohn Snow:py:obj:`~Events.events` that can be used for the same purpose without
167*37094b6dSJohn Snowhaving to create your own:
168*37094b6dSJohn Snow
169*37094b6dSJohn Snow.. code:: python
170*37094b6dSJohn Snow
171*37094b6dSJohn Snow   async def print_events(listener):
172*37094b6dSJohn Snow       try:
173*37094b6dSJohn Snow           async for event in listener:
174*37094b6dSJohn Snow               print(f"Event arrived: {event['event']}")
175*37094b6dSJohn Snow       except asyncio.CancelledError:
176*37094b6dSJohn Snow           return
177*37094b6dSJohn Snow
178*37094b6dSJohn Snow   task = asyncio.Task(print_events(qmp.events))
179*37094b6dSJohn Snow
180*37094b6dSJohn Snow   await qmp.execute('stop')
181*37094b6dSJohn Snow   await qmp.execute('cont')
182*37094b6dSJohn Snow
183*37094b6dSJohn Snow   task.cancel()
184*37094b6dSJohn Snow   await task
185*37094b6dSJohn Snow
186*37094b6dSJohn Snow
187*37094b6dSJohn SnowUsing both .get() and async iterators
188*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
189*37094b6dSJohn Snow
190*37094b6dSJohn SnowThe async iterator and `get()` methods pull events from the same FIFO
191*37094b6dSJohn Snowqueue. If you mix the usage of both, be aware: Events are emitted
192*37094b6dSJohn Snowprecisely once per listener.
193*37094b6dSJohn Snow
194*37094b6dSJohn SnowIf multiple contexts try to pull events from the same listener instance,
195*37094b6dSJohn Snowevents are still emitted only precisely once.
196*37094b6dSJohn Snow
197*37094b6dSJohn SnowThis restriction can be lifted by creating additional listeners.
198*37094b6dSJohn Snow
199*37094b6dSJohn Snow
200*37094b6dSJohn SnowCreating multiple listeners
201*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~
202*37094b6dSJohn Snow
203*37094b6dSJohn SnowAdditional `EventListener` objects can be created at-will. Each one
204*37094b6dSJohn Snowreceives its own copy of events, with separate FIFO event queues.
205*37094b6dSJohn Snow
206*37094b6dSJohn Snow.. code:: python
207*37094b6dSJohn Snow
208*37094b6dSJohn Snow   my_listener = EventListener()
209*37094b6dSJohn Snow   qmp.register_listener(my_listener)
210*37094b6dSJohn Snow
211*37094b6dSJohn Snow   await qmp.execute('stop')
212*37094b6dSJohn Snow   copy1 = await my_listener.get()
213*37094b6dSJohn Snow   copy2 = await qmp.events.get()
214*37094b6dSJohn Snow
215*37094b6dSJohn Snow   assert copy1 == copy2
216*37094b6dSJohn Snow
217*37094b6dSJohn SnowIn this example, we await an event from both a user-created
218*37094b6dSJohn Snow`EventListener` and the built-in events listener. Both receive the same
219*37094b6dSJohn Snowevent.
220*37094b6dSJohn Snow
221*37094b6dSJohn Snow
222*37094b6dSJohn SnowClearing listeners
223*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~
224*37094b6dSJohn Snow
225*37094b6dSJohn Snow`EventListener` objects can be cleared, clearing all events seen thus far:
226*37094b6dSJohn Snow
227*37094b6dSJohn Snow.. code:: python
228*37094b6dSJohn Snow
229*37094b6dSJohn Snow   await qmp.execute('stop')
230*37094b6dSJohn Snow   qmp.events.clear()
231*37094b6dSJohn Snow   await qmp.execute('cont')
232*37094b6dSJohn Snow   event = await qmp.events.get()
233*37094b6dSJohn Snow   assert event['event'] == 'RESUME'
234*37094b6dSJohn Snow
235*37094b6dSJohn Snow`EventListener` objects are FIFO queues. If events are not consumed,
236*37094b6dSJohn Snowthey will remain in the queue until they are witnessed or discarded via
237*37094b6dSJohn Snow`clear()`. FIFO queues will be drained automatically upon leaving a
238*37094b6dSJohn Snowcontext block, or when calling `remove_listener()`.
239*37094b6dSJohn Snow
240*37094b6dSJohn Snow
241*37094b6dSJohn SnowAccessing listener history
242*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~
243*37094b6dSJohn Snow
244*37094b6dSJohn Snow`EventListener` objects record their history. Even after being cleared,
245*37094b6dSJohn Snowyou can obtain a record of all events seen so far:
246*37094b6dSJohn Snow
247*37094b6dSJohn Snow.. code:: python
248*37094b6dSJohn Snow
249*37094b6dSJohn Snow   await qmp.execute('stop')
250*37094b6dSJohn Snow   await qmp.execute('cont')
251*37094b6dSJohn Snow   qmp.events.clear()
252*37094b6dSJohn Snow
253*37094b6dSJohn Snow   assert len(qmp.events.history) == 2
254*37094b6dSJohn Snow   assert qmp.events.history[0]['event'] == 'STOP'
255*37094b6dSJohn Snow   assert qmp.events.history[1]['event'] == 'RESUME'
256*37094b6dSJohn Snow
257*37094b6dSJohn SnowThe history is updated immediately and does not require the event to be
258*37094b6dSJohn Snowwitnessed first.
259*37094b6dSJohn Snow
260*37094b6dSJohn Snow
261*37094b6dSJohn SnowUsing event filters
262*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~
263*37094b6dSJohn Snow
264*37094b6dSJohn Snow`EventListener` objects can be given complex filtering criteria if names
265*37094b6dSJohn Snoware not sufficient:
266*37094b6dSJohn Snow
267*37094b6dSJohn Snow.. code:: python
268*37094b6dSJohn Snow
269*37094b6dSJohn Snow   def job1_filter(event) -> bool:
270*37094b6dSJohn Snow       event_data = event.get('data', {})
271*37094b6dSJohn Snow       event_job_id = event_data.get('id')
272*37094b6dSJohn Snow       return event_job_id == "job1"
273*37094b6dSJohn Snow
274*37094b6dSJohn Snow   with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
275*37094b6dSJohn Snow       await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
276*37094b6dSJohn Snow       async for event in listener:
277*37094b6dSJohn Snow           if event['data']['status'] == 'concluded':
278*37094b6dSJohn Snow               break
279*37094b6dSJohn Snow
280*37094b6dSJohn SnowThese filters might be most useful when parameterized. `EventListener`
281*37094b6dSJohn Snowobjects expect a function that takes only a single argument (the raw
282*37094b6dSJohn Snowevent, as a `Message`) and returns a bool; True if the event should be
283*37094b6dSJohn Snowaccepted into the stream. You can create a function that adapts this
284*37094b6dSJohn Snowsignature to accept configuration parameters:
285*37094b6dSJohn Snow
286*37094b6dSJohn Snow.. code:: python
287*37094b6dSJohn Snow
288*37094b6dSJohn Snow   def job_filter(job_id: str) -> EventFilter:
289*37094b6dSJohn Snow       def filter(event: Message) -> bool:
290*37094b6dSJohn Snow           return event['data']['id'] == job_id
291*37094b6dSJohn Snow       return filter
292*37094b6dSJohn Snow
293*37094b6dSJohn Snow   with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
294*37094b6dSJohn Snow       await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
295*37094b6dSJohn Snow       async for event in listener:
296*37094b6dSJohn Snow           if event['data']['status'] == 'concluded':
297*37094b6dSJohn Snow               break
298*37094b6dSJohn Snow
299*37094b6dSJohn Snow
300*37094b6dSJohn SnowActivating an existing listener with `listen()`
301*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
302*37094b6dSJohn Snow
303*37094b6dSJohn SnowListeners with complex, long configurations can also be created manually
304*37094b6dSJohn Snowand activated temporarily by using `listen()` instead of `listener()`:
305*37094b6dSJohn Snow
306*37094b6dSJohn Snow.. code:: python
307*37094b6dSJohn Snow
308*37094b6dSJohn Snow   listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
309*37094b6dSJohn Snow                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
310*37094b6dSJohn Snow                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
311*37094b6dSJohn Snow
312*37094b6dSJohn Snow   with qmp.listen(listener):
313*37094b6dSJohn Snow       await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
314*37094b6dSJohn Snow       async for event in listener:
315*37094b6dSJohn Snow           print(event)
316*37094b6dSJohn Snow           if event['event'] == 'BLOCK_JOB_COMPLETED':
317*37094b6dSJohn Snow               break
318*37094b6dSJohn Snow
319*37094b6dSJohn SnowAny events that are not witnessed by the time the block is left will be
320*37094b6dSJohn Snowcleared from the queue; entering the block is an implicit
321*37094b6dSJohn Snow`register_listener()` and leaving the block is an implicit
322*37094b6dSJohn Snow`remove_listener()`.
323*37094b6dSJohn Snow
324*37094b6dSJohn Snow
325*37094b6dSJohn SnowActivating multiple existing listeners with `listen()`
326*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
327*37094b6dSJohn Snow
328*37094b6dSJohn SnowWhile `listener()` is only capable of creating a single listener,
329*37094b6dSJohn Snow`listen()` is capable of activating multiple listeners simultaneously:
330*37094b6dSJohn Snow
331*37094b6dSJohn Snow.. code:: python
332*37094b6dSJohn Snow
333*37094b6dSJohn Snow   def job_filter(job_id: str) -> EventFilter:
334*37094b6dSJohn Snow       def filter(event: Message) -> bool:
335*37094b6dSJohn Snow           return event['data']['id'] == job_id
336*37094b6dSJohn Snow       return filter
337*37094b6dSJohn Snow
338*37094b6dSJohn Snow   jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
339*37094b6dSJohn Snow   jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
340*37094b6dSJohn Snow
341*37094b6dSJohn Snow   with qmp.listen(jobA, jobB):
342*37094b6dSJohn Snow       qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
343*37094b6dSJohn Snow       qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
344*37094b6dSJohn Snow
345*37094b6dSJohn Snow       async for event in jobA.get():
346*37094b6dSJohn Snow           if event['data']['status'] == 'concluded':
347*37094b6dSJohn Snow               break
348*37094b6dSJohn Snow       async for event in jobB.get():
349*37094b6dSJohn Snow           if event['data']['status'] == 'concluded':
350*37094b6dSJohn Snow               break
351*37094b6dSJohn Snow
352*37094b6dSJohn Snow
353*37094b6dSJohn SnowExtending the `EventListener` class
354*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
355*37094b6dSJohn Snow
356*37094b6dSJohn SnowIn the case that a more specialized `EventListener` is desired to
357*37094b6dSJohn Snowprovide either more functionality or more compact syntax for specialized
358*37094b6dSJohn Snowcases, it can be extended.
359*37094b6dSJohn Snow
360*37094b6dSJohn SnowOne of the key methods to extend or override is
361*37094b6dSJohn Snow:py:meth:`~EventListener.accept()`. The default implementation checks an
362*37094b6dSJohn Snowincoming message for:
363*37094b6dSJohn Snow
364*37094b6dSJohn Snow1. A qualifying name, if any :py:obj:`~EventListener.names` were
365*37094b6dSJohn Snow   specified at initialization time
366*37094b6dSJohn Snow2. That :py:obj:`~EventListener.event_filter()` returns True.
367*37094b6dSJohn Snow
368*37094b6dSJohn SnowThis can be modified however you see fit to change the criteria for
369*37094b6dSJohn Snowinclusion in the stream.
370*37094b6dSJohn Snow
371*37094b6dSJohn SnowFor convenience, a ``JobListener`` class could be created that simply
372*37094b6dSJohn Snowbakes in configuration so it does not need to be repeated:
373*37094b6dSJohn Snow
374*37094b6dSJohn Snow.. code:: python
375*37094b6dSJohn Snow
376*37094b6dSJohn Snow   class JobListener(EventListener):
377*37094b6dSJohn Snow       def __init__(self, job_id: str):
378*37094b6dSJohn Snow           super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
379*37094b6dSJohn Snow                             'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
380*37094b6dSJohn Snow                             'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
381*37094b6dSJohn Snow           self.job_id = job_id
382*37094b6dSJohn Snow
383*37094b6dSJohn Snow       def accept(self, event) -> bool:
384*37094b6dSJohn Snow           if not super().accept(event):
385*37094b6dSJohn Snow               return False
386*37094b6dSJohn Snow           if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
387*37094b6dSJohn Snow               return event['data']['id'] == job_id
388*37094b6dSJohn Snow           return event['data']['device'] == job_id
389*37094b6dSJohn Snow
390*37094b6dSJohn SnowFrom here on out, you can conjure up a custom-purpose listener that
391*37094b6dSJohn Snowlistens only for job-related events for a specific job-id easily:
392*37094b6dSJohn Snow
393*37094b6dSJohn Snow.. code:: python
394*37094b6dSJohn Snow
395*37094b6dSJohn Snow   listener = JobListener('job4')
396*37094b6dSJohn Snow   with qmp.listener(listener):
397*37094b6dSJohn Snow       await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
398*37094b6dSJohn Snow       async for event in listener:
399*37094b6dSJohn Snow           print(event)
400*37094b6dSJohn Snow           if event['event'] == 'BLOCK_JOB_COMPLETED':
401*37094b6dSJohn Snow               break
402*37094b6dSJohn Snow
403*37094b6dSJohn Snow
404*37094b6dSJohn SnowExperimental Interfaces & Design Issues
405*37094b6dSJohn Snow---------------------------------------
406*37094b6dSJohn Snow
407*37094b6dSJohn SnowThese interfaces are not ones I am sure I will keep or otherwise modify
408*37094b6dSJohn Snowheavily.
409*37094b6dSJohn Snow
410*37094b6dSJohn Snowqmp.listener()’s type signature
411*37094b6dSJohn Snow~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
412*37094b6dSJohn Snow
413*37094b6dSJohn Snow`listener()` does not return anything, because it was assumed the caller
414*37094b6dSJohn Snowalready had a handle to the listener. However, for
415*37094b6dSJohn Snow``qmp.listener(EventListener())`` forms, the caller will not have saved
416*37094b6dSJohn Snowa handle to the listener.
417*37094b6dSJohn Snow
418*37094b6dSJohn SnowBecause this function can accept *many* listeners, I found it hard to
419*37094b6dSJohn Snowaccurately type in a way where it could be used in both “one” or “many”
420*37094b6dSJohn Snowforms conveniently and in a statically type-safe manner.
421*37094b6dSJohn Snow
422*37094b6dSJohn SnowUltimately, I removed the return altogether, but perhaps with more time
423*37094b6dSJohn SnowI can work out a way to re-add it.
424*37094b6dSJohn Snow
425*37094b6dSJohn Snow
426*37094b6dSJohn SnowAPI Reference
427*37094b6dSJohn Snow-------------
428*37094b6dSJohn Snow
429*37094b6dSJohn Snow"""
430*37094b6dSJohn Snow
431*37094b6dSJohn Snowimport asyncio
432*37094b6dSJohn Snowfrom contextlib import contextmanager
433*37094b6dSJohn Snowimport logging
434*37094b6dSJohn Snowfrom typing import (
435*37094b6dSJohn Snow    AsyncIterator,
436*37094b6dSJohn Snow    Callable,
437*37094b6dSJohn Snow    Iterable,
438*37094b6dSJohn Snow    Iterator,
439*37094b6dSJohn Snow    List,
440*37094b6dSJohn Snow    Optional,
441*37094b6dSJohn Snow    Set,
442*37094b6dSJohn Snow    Tuple,
443*37094b6dSJohn Snow    Union,
444*37094b6dSJohn Snow)
445*37094b6dSJohn Snow
446*37094b6dSJohn Snowfrom .error import QMPError
447*37094b6dSJohn Snowfrom .message import Message
448*37094b6dSJohn Snow
449*37094b6dSJohn Snow
450*37094b6dSJohn SnowEventNames = Union[str, Iterable[str], None]
451*37094b6dSJohn SnowEventFilter = Callable[[Message], bool]
452*37094b6dSJohn Snow
453*37094b6dSJohn Snow
454*37094b6dSJohn Snowclass ListenerError(QMPError):
455*37094b6dSJohn Snow    """
456*37094b6dSJohn Snow    Generic error class for `EventListener`-related problems.
457*37094b6dSJohn Snow    """
458*37094b6dSJohn Snow
459*37094b6dSJohn Snow
460*37094b6dSJohn Snowclass EventListener:
461*37094b6dSJohn Snow    """
462*37094b6dSJohn Snow    Selectively listens for events with runtime configurable filtering.
463*37094b6dSJohn Snow
464*37094b6dSJohn Snow    This class is designed to be directly usable for the most common cases,
465*37094b6dSJohn Snow    but it can be extended to provide more rigorous control.
466*37094b6dSJohn Snow
467*37094b6dSJohn Snow    :param names:
468*37094b6dSJohn Snow        One or more names of events to listen for.
469*37094b6dSJohn Snow        When not provided, listen for ALL events.
470*37094b6dSJohn Snow    :param event_filter:
471*37094b6dSJohn Snow        An optional event filtering function.
472*37094b6dSJohn Snow        When names are also provided, this acts as a secondary filter.
473*37094b6dSJohn Snow
474*37094b6dSJohn Snow    When ``names`` and ``event_filter`` are both provided, the names
475*37094b6dSJohn Snow    will be filtered first, and then the filter function will be called
476*37094b6dSJohn Snow    second. The event filter function can assume that the format of the
477*37094b6dSJohn Snow    event is a known format.
478*37094b6dSJohn Snow    """
479*37094b6dSJohn Snow    def __init__(
480*37094b6dSJohn Snow        self,
481*37094b6dSJohn Snow        names: EventNames = None,
482*37094b6dSJohn Snow        event_filter: Optional[EventFilter] = None,
483*37094b6dSJohn Snow    ):
484*37094b6dSJohn Snow        # Queue of 'heard' events yet to be witnessed by a caller.
485*37094b6dSJohn Snow        self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
486*37094b6dSJohn Snow
487*37094b6dSJohn Snow        # Intended as a historical record, NOT a processing queue or backlog.
488*37094b6dSJohn Snow        self._history: List[Message] = []
489*37094b6dSJohn Snow
490*37094b6dSJohn Snow        #: Primary event filter, based on one or more event names.
491*37094b6dSJohn Snow        self.names: Set[str] = set()
492*37094b6dSJohn Snow        if isinstance(names, str):
493*37094b6dSJohn Snow            self.names.add(names)
494*37094b6dSJohn Snow        elif names is not None:
495*37094b6dSJohn Snow            self.names.update(names)
496*37094b6dSJohn Snow
497*37094b6dSJohn Snow        #: Optional, secondary event filter.
498*37094b6dSJohn Snow        self.event_filter: Optional[EventFilter] = event_filter
499*37094b6dSJohn Snow
500*37094b6dSJohn Snow    @property
501*37094b6dSJohn Snow    def history(self) -> Tuple[Message, ...]:
502*37094b6dSJohn Snow        """
503*37094b6dSJohn Snow        A read-only history of all events seen so far.
504*37094b6dSJohn Snow
505*37094b6dSJohn Snow        This represents *every* event, including those not yet witnessed
506*37094b6dSJohn Snow        via `get()` or ``async for``. It persists between `clear()`
507*37094b6dSJohn Snow        calls and is immutable.
508*37094b6dSJohn Snow        """
509*37094b6dSJohn Snow        return tuple(self._history)
510*37094b6dSJohn Snow
511*37094b6dSJohn Snow    def accept(self, event: Message) -> bool:
512*37094b6dSJohn Snow        """
513*37094b6dSJohn Snow        Determine if this listener accepts this event.
514*37094b6dSJohn Snow
515*37094b6dSJohn Snow        This method determines which events will appear in the stream.
516*37094b6dSJohn Snow        The default implementation simply checks the event against the
517*37094b6dSJohn Snow        list of names and the event_filter to decide if this
518*37094b6dSJohn Snow        `EventListener` accepts a given event. It can be
519*37094b6dSJohn Snow        overridden/extended to provide custom listener behavior.
520*37094b6dSJohn Snow
521*37094b6dSJohn Snow        User code is not expected to need to invoke this method.
522*37094b6dSJohn Snow
523*37094b6dSJohn Snow        :param event: The event under consideration.
524*37094b6dSJohn Snow        :return: `True`, if this listener accepts this event.
525*37094b6dSJohn Snow        """
526*37094b6dSJohn Snow        name_ok = (not self.names) or (event['event'] in self.names)
527*37094b6dSJohn Snow        return name_ok and (
528*37094b6dSJohn Snow            (not self.event_filter) or self.event_filter(event)
529*37094b6dSJohn Snow        )
530*37094b6dSJohn Snow
531*37094b6dSJohn Snow    async def put(self, event: Message) -> None:
532*37094b6dSJohn Snow        """
533*37094b6dSJohn Snow        Conditionally put a new event into the FIFO queue.
534*37094b6dSJohn Snow
535*37094b6dSJohn Snow        This method is not designed to be invoked from user code, and it
536*37094b6dSJohn Snow        should not need to be overridden. It is a public interface so
537*37094b6dSJohn Snow        that `QMPClient` has an interface by which it can inform
538*37094b6dSJohn Snow        registered listeners of new events.
539*37094b6dSJohn Snow
540*37094b6dSJohn Snow        The event will be put into the queue if
541*37094b6dSJohn Snow        :py:meth:`~EventListener.accept()` returns `True`.
542*37094b6dSJohn Snow
543*37094b6dSJohn Snow        :param event: The new event to put into the FIFO queue.
544*37094b6dSJohn Snow        """
545*37094b6dSJohn Snow        if not self.accept(event):
546*37094b6dSJohn Snow            return
547*37094b6dSJohn Snow
548*37094b6dSJohn Snow        self._history.append(event)
549*37094b6dSJohn Snow        await self._queue.put(event)
550*37094b6dSJohn Snow
551*37094b6dSJohn Snow    async def get(self) -> Message:
552*37094b6dSJohn Snow        """
553*37094b6dSJohn Snow        Wait for the very next event in this stream.
554*37094b6dSJohn Snow
555*37094b6dSJohn Snow        If one is already available, return that one.
556*37094b6dSJohn Snow        """
557*37094b6dSJohn Snow        return await self._queue.get()
558*37094b6dSJohn Snow
559*37094b6dSJohn Snow    def empty(self) -> bool:
560*37094b6dSJohn Snow        """
561*37094b6dSJohn Snow        Return `True` if there are no pending events.
562*37094b6dSJohn Snow        """
563*37094b6dSJohn Snow        return self._queue.empty()
564*37094b6dSJohn Snow
565*37094b6dSJohn Snow    def clear(self) -> List[Message]:
566*37094b6dSJohn Snow        """
567*37094b6dSJohn Snow        Clear this listener of all pending events.
568*37094b6dSJohn Snow
569*37094b6dSJohn Snow        Called when an `EventListener` is being unregistered, this clears the
570*37094b6dSJohn Snow        pending FIFO queue synchronously. It can be also be used to
571*37094b6dSJohn Snow        manually clear any pending events, if desired.
572*37094b6dSJohn Snow
573*37094b6dSJohn Snow        :return: The cleared events, if any.
574*37094b6dSJohn Snow
575*37094b6dSJohn Snow        .. warning::
576*37094b6dSJohn Snow            Take care when discarding events. Cleared events will be
577*37094b6dSJohn Snow            silently tossed on the floor. All events that were ever
578*37094b6dSJohn Snow            accepted by this listener are visible in `history()`.
579*37094b6dSJohn Snow        """
580*37094b6dSJohn Snow        events = []
581*37094b6dSJohn Snow        while True:
582*37094b6dSJohn Snow            try:
583*37094b6dSJohn Snow                events.append(self._queue.get_nowait())
584*37094b6dSJohn Snow            except asyncio.QueueEmpty:
585*37094b6dSJohn Snow                break
586*37094b6dSJohn Snow
587*37094b6dSJohn Snow        return events
588*37094b6dSJohn Snow
589*37094b6dSJohn Snow    def __aiter__(self) -> AsyncIterator[Message]:
590*37094b6dSJohn Snow        return self
591*37094b6dSJohn Snow
592*37094b6dSJohn Snow    async def __anext__(self) -> Message:
593*37094b6dSJohn Snow        """
594*37094b6dSJohn Snow        Enables the `EventListener` to function as an async iterator.
595*37094b6dSJohn Snow
596*37094b6dSJohn Snow        It may be used like this:
597*37094b6dSJohn Snow
598*37094b6dSJohn Snow        .. code:: python
599*37094b6dSJohn Snow
600*37094b6dSJohn Snow            async for event in listener:
601*37094b6dSJohn Snow                print(event)
602*37094b6dSJohn Snow
603*37094b6dSJohn Snow        These iterators will never terminate of their own accord; you
604*37094b6dSJohn Snow        must provide break conditions or otherwise prepare to run them
605*37094b6dSJohn Snow        in an `asyncio.Task` that can be cancelled.
606*37094b6dSJohn Snow        """
607*37094b6dSJohn Snow        return await self.get()
608*37094b6dSJohn Snow
609*37094b6dSJohn Snow
610*37094b6dSJohn Snowclass Events:
611*37094b6dSJohn Snow    """
612*37094b6dSJohn Snow    Events is a mix-in class that adds event functionality to the QMP class.
613*37094b6dSJohn Snow
614*37094b6dSJohn Snow    It's designed specifically as a mix-in for `QMPClient`, and it
615*37094b6dSJohn Snow    relies upon the class it is being mixed into having a 'logger'
616*37094b6dSJohn Snow    property.
617*37094b6dSJohn Snow    """
618*37094b6dSJohn Snow    def __init__(self) -> None:
619*37094b6dSJohn Snow        self._listeners: List[EventListener] = []
620*37094b6dSJohn Snow
621*37094b6dSJohn Snow        #: Default, all-events `EventListener`.
622*37094b6dSJohn Snow        self.events: EventListener = EventListener()
623*37094b6dSJohn Snow        self.register_listener(self.events)
624*37094b6dSJohn Snow
625*37094b6dSJohn Snow        # Parent class needs to have a logger
626*37094b6dSJohn Snow        self.logger: logging.Logger
627*37094b6dSJohn Snow
628*37094b6dSJohn Snow    async def _event_dispatch(self, msg: Message) -> None:
629*37094b6dSJohn Snow        """
630*37094b6dSJohn Snow        Given a new event, propagate it to all of the active listeners.
631*37094b6dSJohn Snow
632*37094b6dSJohn Snow        :param msg: The event to propagate.
633*37094b6dSJohn Snow        """
634*37094b6dSJohn Snow        for listener in self._listeners:
635*37094b6dSJohn Snow            await listener.put(msg)
636*37094b6dSJohn Snow
637*37094b6dSJohn Snow    def register_listener(self, listener: EventListener) -> None:
638*37094b6dSJohn Snow        """
639*37094b6dSJohn Snow        Register and activate an `EventListener`.
640*37094b6dSJohn Snow
641*37094b6dSJohn Snow        :param listener: The listener to activate.
642*37094b6dSJohn Snow        :raise ListenerError: If the given listener is already registered.
643*37094b6dSJohn Snow        """
644*37094b6dSJohn Snow        if listener in self._listeners:
645*37094b6dSJohn Snow            raise ListenerError("Attempted to re-register existing listener")
646*37094b6dSJohn Snow        self.logger.debug("Registering %s.", str(listener))
647*37094b6dSJohn Snow        self._listeners.append(listener)
648*37094b6dSJohn Snow
649*37094b6dSJohn Snow    def remove_listener(self, listener: EventListener) -> None:
650*37094b6dSJohn Snow        """
651*37094b6dSJohn Snow        Unregister and deactivate an `EventListener`.
652*37094b6dSJohn Snow
653*37094b6dSJohn Snow        The removed listener will have its pending events cleared via
654*37094b6dSJohn Snow        `clear()`. The listener can be re-registered later when
655*37094b6dSJohn Snow        desired.
656*37094b6dSJohn Snow
657*37094b6dSJohn Snow        :param listener: The listener to deactivate.
658*37094b6dSJohn Snow        :raise ListenerError: If the given listener is not registered.
659*37094b6dSJohn Snow        """
660*37094b6dSJohn Snow        if listener == self.events:
661*37094b6dSJohn Snow            raise ListenerError("Cannot remove the default listener.")
662*37094b6dSJohn Snow        self.logger.debug("Removing %s.", str(listener))
663*37094b6dSJohn Snow        listener.clear()
664*37094b6dSJohn Snow        self._listeners.remove(listener)
665*37094b6dSJohn Snow
666*37094b6dSJohn Snow    @contextmanager
667*37094b6dSJohn Snow    def listen(self, *listeners: EventListener) -> Iterator[None]:
668*37094b6dSJohn Snow        r"""
669*37094b6dSJohn Snow        Context manager: Temporarily listen with an `EventListener`.
670*37094b6dSJohn Snow
671*37094b6dSJohn Snow        Accepts one or more `EventListener` objects and registers them,
672*37094b6dSJohn Snow        activating them for the duration of the context block.
673*37094b6dSJohn Snow
674*37094b6dSJohn Snow        `EventListener` objects will have any pending events in their
675*37094b6dSJohn Snow        FIFO queue cleared upon exiting the context block, when they are
676*37094b6dSJohn Snow        deactivated.
677*37094b6dSJohn Snow
678*37094b6dSJohn Snow        :param \*listeners: One or more EventListeners to activate.
679*37094b6dSJohn Snow        :raise ListenerError: If the given listener(s) are already active.
680*37094b6dSJohn Snow        """
681*37094b6dSJohn Snow        _added = []
682*37094b6dSJohn Snow
683*37094b6dSJohn Snow        try:
684*37094b6dSJohn Snow            for listener in listeners:
685*37094b6dSJohn Snow                self.register_listener(listener)
686*37094b6dSJohn Snow                _added.append(listener)
687*37094b6dSJohn Snow
688*37094b6dSJohn Snow            yield
689*37094b6dSJohn Snow
690*37094b6dSJohn Snow        finally:
691*37094b6dSJohn Snow            for listener in _added:
692*37094b6dSJohn Snow                self.remove_listener(listener)
693*37094b6dSJohn Snow
694*37094b6dSJohn Snow    @contextmanager
695*37094b6dSJohn Snow    def listener(
696*37094b6dSJohn Snow        self,
697*37094b6dSJohn Snow        names: EventNames = (),
698*37094b6dSJohn Snow        event_filter: Optional[EventFilter] = None
699*37094b6dSJohn Snow    ) -> Iterator[EventListener]:
700*37094b6dSJohn Snow        """
701*37094b6dSJohn Snow        Context manager: Temporarily listen with a new `EventListener`.
702*37094b6dSJohn Snow
703*37094b6dSJohn Snow        Creates an `EventListener` object and registers it, activating
704*37094b6dSJohn Snow        it for the duration of the context block.
705*37094b6dSJohn Snow
706*37094b6dSJohn Snow        :param names:
707*37094b6dSJohn Snow            One or more names of events to listen for.
708*37094b6dSJohn Snow            When not provided, listen for ALL events.
709*37094b6dSJohn Snow        :param event_filter:
710*37094b6dSJohn Snow            An optional event filtering function.
711*37094b6dSJohn Snow            When names are also provided, this acts as a secondary filter.
712*37094b6dSJohn Snow
713*37094b6dSJohn Snow        :return: The newly created and active `EventListener`.
714*37094b6dSJohn Snow        """
715*37094b6dSJohn Snow        listener = EventListener(names, event_filter)
716*37094b6dSJohn Snow        with self.listen(listener):
717*37094b6dSJohn Snow            yield listener
718