mirror of
https://github.com/gogcom/galaxy-integrations-python-api.git
synced 2026-01-21 21:28:20 -05:00
adhere to comments, move importers to seperate module
This commit is contained in:
89
src/galaxy/api/importer.py
Normal file
89
src/galaxy/api/importer.py
Normal file
@@ -0,0 +1,89 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from galaxy.api.jsonrpc import ApplicationError
|
||||
from galaxy.api.errors import ImportInProgress, UnknownError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Importer:
|
||||
def __init__(
|
||||
self,
|
||||
task_manger,
|
||||
name,
|
||||
get,
|
||||
prepare_context,
|
||||
notification_success,
|
||||
notification_failure,
|
||||
notification_finished,
|
||||
complete,
|
||||
):
|
||||
self._task_manager = task_manger
|
||||
self._name = name
|
||||
self._get = get
|
||||
self._prepare_context = prepare_context
|
||||
self._notification_success = notification_success
|
||||
self._notification_failure = notification_failure
|
||||
self._notification_finished = notification_finished
|
||||
self._complete = complete
|
||||
|
||||
self._import_in_progress = False
|
||||
|
||||
async def _import_element(self, id_, context_):
|
||||
try:
|
||||
element = await self._get(id_, context_)
|
||||
self._notification_success(id_, element)
|
||||
except ApplicationError as error:
|
||||
self._notification_failure(id_, error)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception:
|
||||
logger.exception("Unexpected exception raised in %s importer", self._name)
|
||||
self._notification_failure(id_, UnknownError())
|
||||
|
||||
async def _import_elements(self, ids_, context_):
|
||||
try:
|
||||
imports = [self._import_element(id_, context_) for id_ in ids_]
|
||||
await asyncio.gather(*imports)
|
||||
self._notification_finished()
|
||||
self._complete()
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Importing %s cancelled", self._name)
|
||||
finally:
|
||||
self._import_in_progress = False
|
||||
|
||||
async def start(self, ids):
|
||||
if self._import_in_progress:
|
||||
raise ImportInProgress()
|
||||
|
||||
self._import_in_progress = True
|
||||
try:
|
||||
context = await self._prepare_context(ids)
|
||||
self._task_manager.create_task(
|
||||
self._import_elements(ids, context),
|
||||
"{} import".format(self._name),
|
||||
handle_exceptions=False
|
||||
)
|
||||
except:
|
||||
self._import_in_progress = False
|
||||
raise
|
||||
|
||||
|
||||
class CollectionImporter(Importer):
|
||||
def __init__(self, notification_partialy_finished, *args):
|
||||
super().__init__(*args)
|
||||
self._notification_partial_finished = notification_partialy_finished
|
||||
|
||||
async def _import_element(self, id_, context_):
|
||||
try:
|
||||
async for element in self._get(id_, context_):
|
||||
self._notification_success(id_, element)
|
||||
except ApplicationError as error:
|
||||
self._notification_failure(id_, error)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception:
|
||||
logger.exception("Unexpected exception raised in %s importer", self._name)
|
||||
self._notification_failure(id_, UnknownError())
|
||||
finally:
|
||||
self._notification_partial_finished(id_)
|
||||
@@ -7,17 +7,18 @@ from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Set, Union, AsyncGenerator
|
||||
|
||||
from galaxy.api.consts import Feature, OSCompatibility
|
||||
from galaxy.api.errors import ImportInProgress, UnknownError
|
||||
from galaxy.api.jsonrpc import ApplicationError, Connection
|
||||
from galaxy.api.types import (
|
||||
Achievement, Authentication, Game, GameLibrarySettings, GameTime, LocalGame, NextStep, UserInfo, UserPresence,
|
||||
Subscription, SubscriptionGame
|
||||
)
|
||||
from galaxy.task_manager import TaskManager
|
||||
from galaxy.api.importer import Importer, CollectionImporter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
class JSONEncoder(json.JSONEncoder):
|
||||
def default(self, o): # pylint: disable=method-hidden
|
||||
if dataclasses.is_dataclass(o):
|
||||
@@ -31,107 +32,6 @@ class JSONEncoder(json.JSONEncoder):
|
||||
return super().default(o)
|
||||
|
||||
|
||||
class Importer:
|
||||
def __init__(
|
||||
self,
|
||||
task_manger,
|
||||
name,
|
||||
get,
|
||||
prepare_context,
|
||||
notification_success,
|
||||
notification_failure,
|
||||
notification_finished,
|
||||
complete,
|
||||
):
|
||||
self._task_manager = task_manger
|
||||
self._name = name
|
||||
self._get = get
|
||||
self._prepare_context = prepare_context
|
||||
self._notification_success = notification_success
|
||||
self._notification_failure = notification_failure
|
||||
self._notification_finished = notification_finished
|
||||
self._complete = complete
|
||||
|
||||
self._import_in_progress = False
|
||||
|
||||
async def _import_element(self, id_, context_):
|
||||
try:
|
||||
element = await self._get(id_, context_)
|
||||
self._notification_success(id_, element)
|
||||
except ApplicationError as error:
|
||||
self._notification_failure(id_, error)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception:
|
||||
logger.exception("Unexpected exception raised in %s importer", self._name)
|
||||
self._notification_failure(id_, UnknownError())
|
||||
|
||||
async def _import_elements(self, ids_, context_):
|
||||
try:
|
||||
imports = [self._import_element(id_, context_) for id_ in ids_]
|
||||
await asyncio.gather(*imports)
|
||||
self._notification_finished()
|
||||
self._complete()
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("Importing %s cancelled", self._name)
|
||||
finally:
|
||||
self._import_in_progress = False
|
||||
|
||||
async def start(self, ids):
|
||||
if self._import_in_progress:
|
||||
raise ImportInProgress()
|
||||
|
||||
self._import_in_progress = True
|
||||
try:
|
||||
context = await self._prepare_context(ids)
|
||||
self._task_manager.create_task(
|
||||
self._import_elements(ids, context),
|
||||
"{} import".format(self._name),
|
||||
handle_exceptions=False
|
||||
)
|
||||
except:
|
||||
self._import_in_progress = False
|
||||
raise
|
||||
|
||||
|
||||
class SubscriptionGamesImporter(Importer):
|
||||
def __init__(
|
||||
self,
|
||||
task_manger,
|
||||
name,
|
||||
get,
|
||||
prepare_context,
|
||||
notification_success,
|
||||
notification_failure,
|
||||
notification_finished,
|
||||
notification_partial_finished,
|
||||
complete
|
||||
):
|
||||
super(SubscriptionGamesImporter, self).__init__(task_manger,
|
||||
name,
|
||||
get,
|
||||
prepare_context,
|
||||
notification_success,
|
||||
notification_failure,
|
||||
notification_finished,
|
||||
complete)
|
||||
self._notification_partial_finished = notification_partial_finished
|
||||
|
||||
async def _import_element(self, id_, context_):
|
||||
try:
|
||||
async for element in self._get(id_, context_):
|
||||
self._notification_success(id_, element)
|
||||
except ApplicationError as error:
|
||||
self._notification_failure(id_, error)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception:
|
||||
logger.exception("Unexpected exception raised in %s importer", self._name)
|
||||
self._notification_failure(id_, UnknownError())
|
||||
finally:
|
||||
self._notification_partial_finished(id_)
|
||||
|
||||
|
||||
class Plugin:
|
||||
"""Use and override methods of this class to create a new platform integration."""
|
||||
|
||||
@@ -214,7 +114,9 @@ class Plugin:
|
||||
self._local_size_import_finished,
|
||||
self.local_size_import_complete
|
||||
)
|
||||
self._subscription_games_importer = SubscriptionGamesImporter(
|
||||
self._subscription_games_importer = CollectionImporter(
|
||||
self._subscriptions_games_partial_import_finished,
|
||||
|
||||
self._external_task_manager,
|
||||
"subscription games",
|
||||
self.get_subscription_games,
|
||||
@@ -222,7 +124,6 @@ class Plugin:
|
||||
self._subscription_games_import_success,
|
||||
self._subscription_games_import_failure,
|
||||
self._subscription_games_import_finished,
|
||||
self._subscriptions_games_partial_import_finished,
|
||||
self.subscription_games_import_complete
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user