From 19c9f14ca95f8dfefe02b32084ea7530c10b9d3e Mon Sep 17 00:00:00 2001 From: unknown Date: Mon, 10 Feb 2020 09:26:48 +0100 Subject: [PATCH] separate sub importer, notify partial finished per subscription --- src/galaxy/api/plugin.py | 88 ++++++++++++++++++++++++++----------- tests/test_subscriptions.py | 15 +++++++ 2 files changed, 77 insertions(+), 26 deletions(-) diff --git a/src/galaxy/api/plugin.py b/src/galaxy/api/plugin.py index 28506c3..59dda07 100644 --- a/src/galaxy/api/plugin.py +++ b/src/galaxy/api/plugin.py @@ -15,7 +15,6 @@ from galaxy.api.types import ( ) from galaxy.task_manager import TaskManager - logger = logging.getLogger(__name__) @@ -34,16 +33,15 @@ class JSONEncoder(json.JSONEncoder): class Importer: def __init__( - self, - task_manger, - name, - get, - prepare_context, - notification_success, - notification_failure, - notification_finished, - complete, - is_generator=False + self, + task_manger, + name, + get, + prepare_context, + notification_success, + notification_failure, + notification_finished, + complete, ): self._task_manager = task_manger self._name = name @@ -55,16 +53,11 @@ class Importer: self._complete = complete self._import_in_progress = False - self._is_generator = is_generator async def _import_element(self, id_, context_): try: - if self._is_generator: - async for element in self._get(id_, context_): - self._notification_success(id_, element) - else: - element = await self._get(id_, context_) - self._notification_success(id_, element) + element = await self._get(id_, context_) + self._notification_success(id_, element) except ApplicationError as error: self._notification_failure(id_, error) except asyncio.CancelledError: @@ -101,6 +94,44 @@ class Importer: raise +class SubscriptionGamesImporter(Importer): + def __init__( + self, + task_manger, + name, + get, + prepare_context, + notification_success, + notification_failure, + notification_finished, + notification_partial_finished, + complete + ): + super(SubscriptionGamesImporter, self).__init__(task_manger, + name, + get, + prepare_context, + notification_success, + notification_failure, + notification_finished, + complete) + self._notification_partial_finished = notification_partial_finished + + async def _import_element(self, id_, context_): + try: + async for element in self._get(id_, context_): + self._notification_success(id_, element) + except ApplicationError as error: + self._notification_failure(id_, error) + except asyncio.CancelledError: + pass + except Exception: + logger.exception("Unexpected exception raised in %s importer", self._name) + self._notification_failure(id_, UnknownError()) + finally: + self._notification_partial_finished() + + class Plugin: """Use and override methods of this class to create a new platform integration.""" @@ -183,7 +214,7 @@ class Plugin: self._local_size_import_finished, self.local_size_import_complete ) - self._subscription_games_importer = Importer( + self._subscription_games_importer = SubscriptionGamesImporter( self._external_task_manager, "subscription games", self.get_subscription_games, @@ -191,8 +222,8 @@ class Plugin: self._subscription_games_import_success, self._subscription_games_import_failure, self._subscription_games_import_finished, - self.subscription_games_import_complete, - is_generator=True + self._subscriptions_games_partial_import_finished, + self.subscription_games_import_complete ) # internal @@ -297,7 +328,8 @@ class Plugin: if self._implements(methods): self._features.add(feature) - def _register_method(self, name, handler, result_name=None, internal=False, immediate=False, sensitive_params=False): + def _register_method(self, name, handler, result_name=None, internal=False, immediate=False, + sensitive_params=False): def wrap_result(result): if result_name: result = { @@ -671,7 +703,8 @@ class Plugin: def _local_size_import_finished(self) -> None: self._connection.send_notification("local_size_import_finished", None) - def _subscription_games_import_success(self, subscription_name: str, subscription_games: Optional[List[SubscriptionGame]]) -> None: + def _subscription_games_import_success(self, subscription_name: str, + subscription_games: Optional[List[SubscriptionGame]]) -> None: self._connection.send_notification( "subscription_games_import_success", { @@ -689,6 +722,9 @@ class Plugin: } ) + def _subscriptions_games_partial_import_finished(self) -> None: + self._connection.send_notification("subscription_games_partial_import_finished", None) + def _subscription_games_import_finished(self) -> None: self._connection.send_notification("subscription_games_import_finished", None) @@ -773,7 +809,7 @@ class Plugin: raise NotImplementedError() async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \ - -> Union[NextStep, Authentication]: + -> Union[NextStep, Authentication]: """This method is called if we return :class:`~galaxy.api.types.NextStep` from :meth:`.authenticate` or :meth:`.pass_login_credentials`. This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on. @@ -1117,7 +1153,8 @@ class Plugin: """ return None - async def get_subscription_games(self, subscription_name: str, context: Any) -> AsyncGenerator[List[SubscriptionGame], None]: + async def get_subscription_games(self, subscription_name: str, context: Any) -> AsyncGenerator[ + List[SubscriptionGame], None]: """Override this method to provide SubscriptionGames for a given subscription. This method should `yield` a list of SubscriptionGames -> yield [sub_games] @@ -1193,7 +1230,6 @@ def create_and_run_plugin(plugin_class, argv): writer.close() await writer.wait_closed() - try: if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) diff --git a/tests/test_subscriptions.py b/tests/test_subscriptions.py index 4da441e..8329590 100644 --- a/tests/test_subscriptions.py +++ b/tests/test_subscriptions.py @@ -140,6 +140,11 @@ async def test_get_subscription_games_success(plugin, read, write): ] } }, + { + 'jsonrpc': '2.0', + 'method': + 'subscription_games_partial_import_finished', 'params': None + }, { "jsonrpc": "2.0", "method": "subscription_games_import_finished", @@ -183,6 +188,11 @@ async def test_get_subscription_games_success_empty(plugin, read, write): "subscription_games": None } }, + { + 'jsonrpc': '2.0', + 'method': + 'subscription_games_partial_import_finished', 'params': None + }, { "jsonrpc": "2.0", "method": "subscription_games_import_finished", @@ -229,6 +239,11 @@ async def test_get_subscription_games_error(exception, code, message, plugin, re } } }, + { + 'jsonrpc': '2.0', + 'method': + 'subscription_games_partial_import_finished', 'params': None + }, { "jsonrpc": "2.0", "method": "subscription_games_import_finished",