separate sub importer, notify partial finished per subscription

This commit is contained in:
unknown
2020-02-10 09:26:48 +01:00
parent f5683d222a
commit 19c9f14ca9
2 changed files with 77 additions and 26 deletions

View File

@@ -15,7 +15,6 @@ from galaxy.api.types import (
)
from galaxy.task_manager import TaskManager
logger = logging.getLogger(__name__)
@@ -34,16 +33,15 @@ class JSONEncoder(json.JSONEncoder):
class Importer:
def __init__(
self,
task_manger,
name,
get,
prepare_context,
notification_success,
notification_failure,
notification_finished,
complete,
is_generator=False
self,
task_manger,
name,
get,
prepare_context,
notification_success,
notification_failure,
notification_finished,
complete,
):
self._task_manager = task_manger
self._name = name
@@ -55,16 +53,11 @@ class Importer:
self._complete = complete
self._import_in_progress = False
self._is_generator = is_generator
async def _import_element(self, id_, context_):
try:
if self._is_generator:
async for element in self._get(id_, context_):
self._notification_success(id_, element)
else:
element = await self._get(id_, context_)
self._notification_success(id_, element)
element = await self._get(id_, context_)
self._notification_success(id_, element)
except ApplicationError as error:
self._notification_failure(id_, error)
except asyncio.CancelledError:
@@ -101,6 +94,44 @@ class Importer:
raise
class SubscriptionGamesImporter(Importer):
def __init__(
self,
task_manger,
name,
get,
prepare_context,
notification_success,
notification_failure,
notification_finished,
notification_partial_finished,
complete
):
super(SubscriptionGamesImporter, self).__init__(task_manger,
name,
get,
prepare_context,
notification_success,
notification_failure,
notification_finished,
complete)
self._notification_partial_finished = notification_partial_finished
async def _import_element(self, id_, context_):
try:
async for element in self._get(id_, context_):
self._notification_success(id_, element)
except ApplicationError as error:
self._notification_failure(id_, error)
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Unexpected exception raised in %s importer", self._name)
self._notification_failure(id_, UnknownError())
finally:
self._notification_partial_finished()
class Plugin:
"""Use and override methods of this class to create a new platform integration."""
@@ -183,7 +214,7 @@ class Plugin:
self._local_size_import_finished,
self.local_size_import_complete
)
self._subscription_games_importer = Importer(
self._subscription_games_importer = SubscriptionGamesImporter(
self._external_task_manager,
"subscription games",
self.get_subscription_games,
@@ -191,8 +222,8 @@ class Plugin:
self._subscription_games_import_success,
self._subscription_games_import_failure,
self._subscription_games_import_finished,
self.subscription_games_import_complete,
is_generator=True
self._subscriptions_games_partial_import_finished,
self.subscription_games_import_complete
)
# internal
@@ -297,7 +328,8 @@ class Plugin:
if self._implements(methods):
self._features.add(feature)
def _register_method(self, name, handler, result_name=None, internal=False, immediate=False, sensitive_params=False):
def _register_method(self, name, handler, result_name=None, internal=False, immediate=False,
sensitive_params=False):
def wrap_result(result):
if result_name:
result = {
@@ -671,7 +703,8 @@ class Plugin:
def _local_size_import_finished(self) -> None:
self._connection.send_notification("local_size_import_finished", None)
def _subscription_games_import_success(self, subscription_name: str, subscription_games: Optional[List[SubscriptionGame]]) -> None:
def _subscription_games_import_success(self, subscription_name: str,
subscription_games: Optional[List[SubscriptionGame]]) -> None:
self._connection.send_notification(
"subscription_games_import_success",
{
@@ -689,6 +722,9 @@ class Plugin:
}
)
def _subscriptions_games_partial_import_finished(self) -> None:
self._connection.send_notification("subscription_games_partial_import_finished", None)
def _subscription_games_import_finished(self) -> None:
self._connection.send_notification("subscription_games_import_finished", None)
@@ -773,7 +809,7 @@ class Plugin:
raise NotImplementedError()
async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \
-> Union[NextStep, Authentication]:
-> Union[NextStep, Authentication]:
"""This method is called if we return :class:`~galaxy.api.types.NextStep` from :meth:`.authenticate`
or :meth:`.pass_login_credentials`.
This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on.
@@ -1117,7 +1153,8 @@ class Plugin:
"""
return None
async def get_subscription_games(self, subscription_name: str, context: Any) -> AsyncGenerator[List[SubscriptionGame], None]:
async def get_subscription_games(self, subscription_name: str, context: Any) -> AsyncGenerator[
List[SubscriptionGame], None]:
"""Override this method to provide SubscriptionGames for a given subscription.
This method should `yield` a list of SubscriptionGames -> yield [sub_games]
@@ -1193,7 +1230,6 @@ def create_and_run_plugin(plugin_class, argv):
writer.close()
await writer.wait_closed()
try:
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())

View File

@@ -140,6 +140,11 @@ async def test_get_subscription_games_success(plugin, read, write):
]
}
},
{
'jsonrpc': '2.0',
'method':
'subscription_games_partial_import_finished', 'params': None
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_finished",
@@ -183,6 +188,11 @@ async def test_get_subscription_games_success_empty(plugin, read, write):
"subscription_games": None
}
},
{
'jsonrpc': '2.0',
'method':
'subscription_games_partial_import_finished', 'params': None
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_finished",
@@ -229,6 +239,11 @@ async def test_get_subscription_games_error(exception, code, message, plugin, re
}
}
},
{
'jsonrpc': '2.0',
'method':
'subscription_games_partial_import_finished', 'params': None
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_finished",