1# 2# Copyright BitBake Contributors 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6 7import logging 8import bb.asyncrpc 9 10logger = logging.getLogger("BitBake.PRserv") 11 12class PRAsyncClient(bb.asyncrpc.AsyncClient): 13 def __init__(self): 14 super().__init__('PRSERVICE', '1.0', logger) 15 16 async def getPR(self, version, pkgarch, checksum): 17 response = await self.invoke( 18 {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} 19 ) 20 if response: 21 return response['value'] 22 23 async def importone(self, version, pkgarch, checksum, value): 24 response = await self.invoke( 25 {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} 26 ) 27 if response: 28 return response['value'] 29 30 async def export(self, version, pkgarch, checksum, colinfo): 31 response = await self.invoke( 32 {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} 33 ) 34 if response: 35 return (response['metainfo'], response['datainfo']) 36 37 async def is_readonly(self): 38 response = await self.invoke( 39 {'is-readonly': {}} 40 ) 41 if response: 42 return response['readonly'] 43 44class PRClient(bb.asyncrpc.Client): 45 def __init__(self): 46 super().__init__() 47 self._add_methods('getPR', 'importone', 'export', 'is_readonly') 48 49 def _get_async_client(self): 50 return PRAsyncClient() 51