Compare commits

...

7 Commits
0.59 ... 0.61

Author SHA1 Message Date
Rafal Makagon
4cc0055119 Increment version 2019-12-05 13:58:04 +01:00
Romuald Bierbasz
00164fab67 Correctly set _import_in_progress 2019-12-05 11:39:09 +01:00
Romuald Juchnowicz-Bierbasz
453cd1cc70 Do not send notificaitons when import is cancelled 2019-12-03 14:06:55 +01:00
Romuald Juchnowicz-Bierbasz
1f55253fd7 Wait until writer is closed 2019-12-03 14:04:19 +01:00
Romuald Juchnowicz-Bierbasz
7aa3b01abd Add Importer class (reuse code for importers) 2019-12-03 14:03:53 +01:00
Rafal Makagon
bd14d58bad Increment version 2019-11-28 14:37:46 +01:00
Romuald Juchnowicz-Bierbasz
274b9a2c18 Do not wait for drain 2019-11-28 13:10:58 +01:00
3 changed files with 132 additions and 172 deletions

View File

@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name="galaxy.plugin.api", name="galaxy.plugin.api",
version="0.59", version="0.61",
description="GOG Galaxy Integrations Python API", description="GOG Galaxy Integrations Python API",
author='Galaxy team', author='Galaxy team',
author_email='galaxy@gog.com', author_email='galaxy@gog.com',

View File

@@ -93,7 +93,6 @@ class Connection():
self._methods = {} self._methods = {}
self._notifications = {} self._notifications = {}
self._task_manager = TaskManager("jsonrpc server") self._task_manager = TaskManager("jsonrpc server")
self._write_lock = asyncio.Lock()
self._last_request_id = 0 self._last_request_id = 0
self._requests_futures = {} self._requests_futures = {}
@@ -301,16 +300,11 @@ class Connection():
raise InvalidRequest() raise InvalidRequest()
def _send(self, data): def _send(self, data):
async def send_task(data_):
async with self._write_lock:
self._writer.write(data_)
await self._writer.drain()
try: try:
line = self._encoder.encode(data) line = self._encoder.encode(data)
data = (line + "\n").encode("utf-8") data = (line + "\n").encode("utf-8")
logger.debug("Sending %d byte of data", len(data)) logger.debug("Sending %d byte of data", len(data))
self._task_manager.create_task(send_task(data), "send") self._writer.write(data)
except TypeError as error: except TypeError as error:
logger.error(str(error)) logger.error(str(error))

View File

@@ -31,6 +31,69 @@ class JSONEncoder(json.JSONEncoder):
return super().default(o) return super().default(o)
class Importer:
def __init__(
self,
task_manger,
name,
get,
prepare_context,
notification_success,
notification_failure,
notification_finished,
complete
):
self._task_manager = task_manger
self._name = name
self._get = get
self._prepare_context = prepare_context
self._notification_success = notification_success
self._notification_failure = notification_failure
self._notification_finished = notification_finished
self._complete = complete
self._import_in_progress = False
async def start(self, ids):
if self._import_in_progress:
raise ImportInProgress()
async def import_element(id_, context_):
try:
element = await self._get(id_, context_)
self._notification_success(id_, element)
except ApplicationError as error:
self._notification_failure(id_, error)
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Unexpected exception raised in %s importer", self._name)
self._notification_failure(id_, UnknownError())
async def import_elements(ids_, context_):
try:
imports = [import_element(id_, context_) for id_ in ids_]
await asyncio.gather(*imports)
self._notification_finished()
self._complete()
except asyncio.CancelledError:
logger.debug("Importing %s cancelled", self._name)
finally:
self._import_in_progress = False
self._import_in_progress = True
try:
context = await self._prepare_context(ids)
self._task_manager.create_task(
import_elements(ids, context),
"{} import".format(self._name),
handle_exceptions=False
)
except:
self._import_in_progress = False
raise
class Plugin: class Plugin:
"""Use and override methods of this class to create a new platform integration.""" """Use and override methods of this class to create a new platform integration."""
@@ -48,17 +111,62 @@ class Plugin:
encoder = JSONEncoder() encoder = JSONEncoder()
self._connection = Connection(self._reader, self._writer, encoder) self._connection = Connection(self._reader, self._writer, encoder)
self._achievements_import_in_progress = False
self._game_times_import_in_progress = False
self._game_library_settings_import_in_progress = False
self._os_compatibility_import_in_progress = False
self._user_presence_import_in_progress = False
self._persistent_cache = dict() self._persistent_cache = dict()
self._internal_task_manager = TaskManager("plugin internal") self._internal_task_manager = TaskManager("plugin internal")
self._external_task_manager = TaskManager("plugin external") self._external_task_manager = TaskManager("plugin external")
self._achievements_importer = Importer(
self._external_task_manager,
"achievements",
self.get_unlocked_achievements,
self.prepare_achievements_context,
self._game_achievements_import_success,
self._game_achievements_import_failure,
self._achievements_import_finished,
self.achievements_import_complete
)
self._game_time_importer = Importer(
self._external_task_manager,
"game times",
self.get_game_time,
self.prepare_game_times_context,
self._game_time_import_success,
self._game_time_import_failure,
self._game_times_import_finished,
self.game_times_import_complete
)
self._game_library_settings_importer = Importer(
self._external_task_manager,
"game library settings",
self.get_game_library_settings,
self.prepare_game_library_settings_context,
self._game_library_settings_import_success,
self._game_library_settings_import_failure,
self._game_library_settings_import_finished,
self.game_library_settings_import_complete
)
self._os_compatibility_importer = Importer(
self._external_task_manager,
"os compatibility",
self.get_os_compatibility,
self.prepare_os_compatibility_context,
self._os_compatibility_import_success,
self._os_compatibility_import_failure,
self._os_compatibility_import_finished,
self.os_compatibility_import_complete
)
self._user_presence_importer = Importer(
self._external_task_manager,
"users presence",
self.get_user_presence,
self.prepare_user_presence_context,
self._user_presence_import_success,
self._user_presence_import_failure,
self._user_presence_import_finished,
self.user_presence_import_complete
)
# internal # internal
self._register_method("shutdown", self._shutdown, internal=True) self._register_method("shutdown", self._shutdown, internal=True)
self._register_method("get_capabilities", self._get_capabilities, internal=True, immediate=True) self._register_method("get_capabilities", self._get_capabilities, internal=True, immediate=True)
@@ -435,7 +543,7 @@ class Plugin:
} }
) )
def _game_time_import_success(self, game_time: GameTime) -> None: def _game_time_import_success(self, game_id: str, game_time: GameTime) -> None:
params = {"game_time": game_time} params = {"game_time": game_time}
self._connection.send_notification("game_time_import_success", params) self._connection.send_notification("game_time_import_success", params)
@@ -449,7 +557,7 @@ class Plugin:
def _game_times_import_finished(self) -> None: def _game_times_import_finished(self) -> None:
self._connection.send_notification("game_times_import_finished", None) self._connection.send_notification("game_times_import_finished", None)
def _game_library_settings_import_success(self, game_library_settings: GameLibrarySettings) -> None: def _game_library_settings_import_success(self, game_id: str, game_library_settings: GameLibrarySettings) -> None:
params = {"game_library_settings": game_library_settings} params = {"game_library_settings": game_library_settings}
self._connection.send_notification("game_library_settings_import_success", params) self._connection.send_notification("game_library_settings_import_success", params)
@@ -636,36 +744,7 @@ class Plugin:
raise NotImplementedError() raise NotImplementedError()
async def _start_achievements_import(self, game_ids: List[str]) -> None: async def _start_achievements_import(self, game_ids: List[str]) -> None:
if self._achievements_import_in_progress: await self._achievements_importer.start(game_ids)
raise ImportInProgress()
context = await self.prepare_achievements_context(game_ids)
async def import_game_achievements(game_id, context_):
try:
achievements = await self.get_unlocked_achievements(game_id, context_)
self._game_achievements_import_success(game_id, achievements)
except ApplicationError as error:
self._game_achievements_import_failure(game_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_game_achievements")
self._game_achievements_import_failure(game_id, UnknownError())
async def import_games_achievements(game_ids_, context_):
try:
imports = [import_game_achievements(game_id, context_) for game_id in game_ids_]
await asyncio.gather(*imports)
finally:
self._achievements_import_finished()
self._achievements_import_in_progress = False
self.achievements_import_complete()
self._external_task_manager.create_task(
import_games_achievements(game_ids, context),
"unlocked achievements import",
handle_exceptions=False
)
self._achievements_import_in_progress = True
async def prepare_achievements_context(self, game_ids: List[str]) -> Any: async def prepare_achievements_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_unlocked_achievements. """Override this method to prepare context for get_unlocked_achievements.
@@ -800,36 +879,7 @@ class Plugin:
raise NotImplementedError() raise NotImplementedError()
async def _start_game_times_import(self, game_ids: List[str]) -> None: async def _start_game_times_import(self, game_ids: List[str]) -> None:
if self._game_times_import_in_progress: await self._game_time_importer.start(game_ids)
raise ImportInProgress()
context = await self.prepare_game_times_context(game_ids)
async def import_game_time(game_id, context_):
try:
game_time = await self.get_game_time(game_id, context_)
self._game_time_import_success(game_time)
except ApplicationError as error:
self._game_time_import_failure(game_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_game_time")
self._game_time_import_failure(game_id, UnknownError())
async def import_game_times(game_ids_, context_):
try:
imports = [import_game_time(game_id, context_) for game_id in game_ids_]
await asyncio.gather(*imports)
finally:
self._game_times_import_finished()
self._game_times_import_in_progress = False
self.game_times_import_complete()
self._external_task_manager.create_task(
import_game_times(game_ids, context),
"game times import",
handle_exceptions=False
)
self._game_times_import_in_progress = True
async def prepare_game_times_context(self, game_ids: List[str]) -> Any: async def prepare_game_times_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_game_time. """Override this method to prepare context for get_game_time.
@@ -858,36 +908,7 @@ class Plugin:
""" """
async def _start_game_library_settings_import(self, game_ids: List[str]) -> None: async def _start_game_library_settings_import(self, game_ids: List[str]) -> None:
if self._game_library_settings_import_in_progress: await self._game_library_settings_importer.start(game_ids)
raise ImportInProgress()
context = await self.prepare_game_library_settings_context(game_ids)
async def import_game_library_settings(game_id, context_):
try:
game_library_settings = await self.get_game_library_settings(game_id, context_)
self._game_library_settings_import_success(game_library_settings)
except ApplicationError as error:
self._game_library_settings_import_failure(game_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_game_library_settings")
self._game_library_settings_import_failure(game_id, UnknownError())
async def import_game_library_settings_set(game_ids_, context_):
try:
imports = [import_game_library_settings(game_id, context_) for game_id in game_ids_]
await asyncio.gather(*imports)
finally:
self._game_library_settings_import_finished()
self._game_library_settings_import_in_progress = False
self.game_library_settings_import_complete()
self._external_task_manager.create_task(
import_game_library_settings_set(game_ids, context),
"game library settings import",
handle_exceptions=False
)
self._game_library_settings_import_in_progress = True
async def prepare_game_library_settings_context(self, game_ids: List[str]) -> Any: async def prepare_game_library_settings_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_game_library_settings. """Override this method to prepare context for get_game_library_settings.
@@ -916,37 +937,7 @@ class Plugin:
""" """
async def _start_os_compatibility_import(self, game_ids: List[str]) -> None: async def _start_os_compatibility_import(self, game_ids: List[str]) -> None:
if self._os_compatibility_import_in_progress: await self._os_compatibility_importer.start(game_ids)
raise ImportInProgress()
context = await self.prepare_os_compatibility_context(game_ids)
async def import_os_compatibility(game_id, context_):
try:
os_compatibility = await self.get_os_compatibility(game_id, context_)
self._os_compatibility_import_success(game_id, os_compatibility)
except ApplicationError as error:
self._os_compatibility_import_failure(game_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_os_compatibility")
self._os_compatibility_import_failure(game_id, UnknownError())
async def import_os_compatibility_set(game_ids_, context_):
try:
await asyncio.gather(*[
import_os_compatibility(game_id, context_) for game_id in game_ids_
])
finally:
self._os_compatibility_import_finished()
self._os_compatibility_import_in_progress = False
self.os_compatibility_import_complete()
self._external_task_manager.create_task(
import_os_compatibility_set(game_ids, context),
"game OS compatibility import",
handle_exceptions=False
)
self._os_compatibility_import_in_progress = True
async def prepare_os_compatibility_context(self, game_ids: List[str]) -> Any: async def prepare_os_compatibility_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_os_compatibility. """Override this method to prepare context for get_os_compatibility.
@@ -972,37 +963,7 @@ class Plugin:
"""Override this method to handle operations after OS compatibility import is finished (like updating cache).""" """Override this method to handle operations after OS compatibility import is finished (like updating cache)."""
async def _start_user_presence_import(self, user_id_list: List[str]) -> None: async def _start_user_presence_import(self, user_id_list: List[str]) -> None:
if self._user_presence_import_in_progress: await self._user_presence_importer.start(user_id_list)
raise ImportInProgress()
context = await self.prepare_user_presence_context(user_id_list)
async def import_user_presence(user_id, context_) -> None:
try:
self._user_presence_import_success(user_id, await self.get_user_presence(user_id, context_))
except ApplicationError as error:
self._user_presence_import_failure(user_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_user_presence")
self._user_presence_import_failure(user_id, UnknownError())
async def import_user_presence_set(user_id_list_, context_) -> None:
try:
await asyncio.gather(*[
import_user_presence(user_id, context_)
for user_id in user_id_list_
])
finally:
self._user_presence_import_finished()
self._user_presence_import_in_progress = False
self.user_presence_import_complete()
self._external_task_manager.create_task(
import_user_presence_set(user_id_list, context),
"user presence import",
handle_exceptions=False
)
self._user_presence_import_in_progress = True
async def prepare_user_presence_context(self, user_id_list: List[str]) -> Any: async def prepare_user_presence_context(self, user_id_list: List[str]) -> Any:
"""Override this method to prepare context for get_user_presence. """Override this method to prepare context for get_user_presence.
@@ -1067,10 +1028,15 @@ def create_and_run_plugin(plugin_class, argv):
async def coroutine(): async def coroutine():
reader, writer = await asyncio.open_connection("127.0.0.1", port) reader, writer = await asyncio.open_connection("127.0.0.1", port)
extra_info = writer.get_extra_info("sockname") try:
logger.info("Using local address: %s:%u", *extra_info) extra_info = writer.get_extra_info("sockname")
async with plugin_class(reader, writer, token) as plugin: logger.info("Using local address: %s:%u", *extra_info)
await plugin.run() async with plugin_class(reader, writer, token) as plugin:
await plugin.run()
finally:
writer.close()
await writer.wait_closed()
try: try:
if sys.platform == "win32": if sys.platform == "win32":