diff --git a/src/galaxy/api/importer.py b/src/galaxy/api/importer.py new file mode 100644 index 0000000..9fa193a --- /dev/null +++ b/src/galaxy/api/importer.py @@ -0,0 +1,89 @@ +import asyncio +import logging +from galaxy.api.jsonrpc import ApplicationError +from galaxy.api.errors import ImportInProgress, UnknownError + +logger = logging.getLogger(__name__) + + +class Importer: + def __init__( + self, + task_manger, + name, + get, + prepare_context, + notification_success, + notification_failure, + notification_finished, + complete, + ): + self._task_manager = task_manger + self._name = name + self._get = get + self._prepare_context = prepare_context + self._notification_success = notification_success + self._notification_failure = notification_failure + self._notification_finished = notification_finished + self._complete = complete + + self._import_in_progress = False + + async def _import_element(self, id_, context_): + try: + element = await self._get(id_, context_) + self._notification_success(id_, element) + except ApplicationError as error: + self._notification_failure(id_, error) + except asyncio.CancelledError: + pass + except Exception: + logger.exception("Unexpected exception raised in %s importer", self._name) + self._notification_failure(id_, UnknownError()) + + async def _import_elements(self, ids_, context_): + try: + imports = [self._import_element(id_, context_) for id_ in ids_] + await asyncio.gather(*imports) + self._notification_finished() + self._complete() + except asyncio.CancelledError: + logger.debug("Importing %s cancelled", self._name) + finally: + self._import_in_progress = False + + async def start(self, ids): + if self._import_in_progress: + raise ImportInProgress() + + self._import_in_progress = True + try: + context = await self._prepare_context(ids) + self._task_manager.create_task( + self._import_elements(ids, context), + "{} import".format(self._name), + handle_exceptions=False + ) + except: + self._import_in_progress = False + raise + + +class CollectionImporter(Importer): + def __init__(self, notification_partialy_finished, *args): + super().__init__(*args) + self._notification_partial_finished = notification_partialy_finished + + async def _import_element(self, id_, context_): + try: + async for element in self._get(id_, context_): + self._notification_success(id_, element) + except ApplicationError as error: + self._notification_failure(id_, error) + except asyncio.CancelledError: + pass + except Exception: + logger.exception("Unexpected exception raised in %s importer", self._name) + self._notification_failure(id_, UnknownError()) + finally: + self._notification_partial_finished(id_) diff --git a/src/galaxy/api/plugin.py b/src/galaxy/api/plugin.py index fab5b97..bb0d3d0 100644 --- a/src/galaxy/api/plugin.py +++ b/src/galaxy/api/plugin.py @@ -7,17 +7,18 @@ from enum import Enum from typing import Any, Dict, List, Optional, Set, Union, AsyncGenerator from galaxy.api.consts import Feature, OSCompatibility -from galaxy.api.errors import ImportInProgress, UnknownError from galaxy.api.jsonrpc import ApplicationError, Connection from galaxy.api.types import ( Achievement, Authentication, Game, GameLibrarySettings, GameTime, LocalGame, NextStep, UserInfo, UserPresence, Subscription, SubscriptionGame ) from galaxy.task_manager import TaskManager +from galaxy.api.importer import Importer, CollectionImporter logger = logging.getLogger(__name__) + class JSONEncoder(json.JSONEncoder): def default(self, o): # pylint: disable=method-hidden if dataclasses.is_dataclass(o): @@ -31,107 +32,6 @@ class JSONEncoder(json.JSONEncoder): return super().default(o) -class Importer: - def __init__( - self, - task_manger, - name, - get, - prepare_context, - notification_success, - notification_failure, - notification_finished, - complete, - ): - self._task_manager = task_manger - self._name = name - self._get = get - self._prepare_context = prepare_context - self._notification_success = notification_success - self._notification_failure = notification_failure - self._notification_finished = notification_finished - self._complete = complete - - self._import_in_progress = False - - async def _import_element(self, id_, context_): - try: - element = await self._get(id_, context_) - self._notification_success(id_, element) - except ApplicationError as error: - self._notification_failure(id_, error) - except asyncio.CancelledError: - pass - except Exception: - logger.exception("Unexpected exception raised in %s importer", self._name) - self._notification_failure(id_, UnknownError()) - - async def _import_elements(self, ids_, context_): - try: - imports = [self._import_element(id_, context_) for id_ in ids_] - await asyncio.gather(*imports) - self._notification_finished() - self._complete() - except asyncio.CancelledError: - logger.debug("Importing %s cancelled", self._name) - finally: - self._import_in_progress = False - - async def start(self, ids): - if self._import_in_progress: - raise ImportInProgress() - - self._import_in_progress = True - try: - context = await self._prepare_context(ids) - self._task_manager.create_task( - self._import_elements(ids, context), - "{} import".format(self._name), - handle_exceptions=False - ) - except: - self._import_in_progress = False - raise - - -class SubscriptionGamesImporter(Importer): - def __init__( - self, - task_manger, - name, - get, - prepare_context, - notification_success, - notification_failure, - notification_finished, - notification_partial_finished, - complete - ): - super(SubscriptionGamesImporter, self).__init__(task_manger, - name, - get, - prepare_context, - notification_success, - notification_failure, - notification_finished, - complete) - self._notification_partial_finished = notification_partial_finished - - async def _import_element(self, id_, context_): - try: - async for element in self._get(id_, context_): - self._notification_success(id_, element) - except ApplicationError as error: - self._notification_failure(id_, error) - except asyncio.CancelledError: - pass - except Exception: - logger.exception("Unexpected exception raised in %s importer", self._name) - self._notification_failure(id_, UnknownError()) - finally: - self._notification_partial_finished(id_) - - class Plugin: """Use and override methods of this class to create a new platform integration.""" @@ -214,7 +114,9 @@ class Plugin: self._local_size_import_finished, self.local_size_import_complete ) - self._subscription_games_importer = SubscriptionGamesImporter( + self._subscription_games_importer = CollectionImporter( + self._subscriptions_games_partial_import_finished, + self._external_task_manager, "subscription games", self.get_subscription_games, @@ -222,7 +124,6 @@ class Plugin: self._subscription_games_import_success, self._subscription_games_import_failure, self._subscription_games_import_finished, - self._subscriptions_games_partial_import_finished, self.subscription_games_import_complete )