Compare commits

...

10 Commits
0.60 ... 0.63

Author SHA1 Message Date
Aleksej Pawlowskij
26102dd832 Increment version 2019-12-17 15:56:37 +01:00
Aleksej Pawlowskij
cdcebda529 SDK-3136: Relax install requirements 2019-12-17 15:43:47 +01:00
Romuald Bierbasz
a83f348d7d Increment version 2019-12-10 16:02:40 +01:00
Romuald Bierbasz
1c196d60d5 SDK-3199: Log response json 2019-12-10 16:00:46 +01:00
Aleksej Pawlowskij
deb125ec48 Add missing psutil setup requirement 2019-12-05 16:22:26 +01:00
Rafal Makagon
4cc0055119 Increment version 2019-12-05 13:58:04 +01:00
Romuald Bierbasz
00164fab67 Correctly set _import_in_progress 2019-12-05 11:39:09 +01:00
Romuald Juchnowicz-Bierbasz
453cd1cc70 Do not send notificaitons when import is cancelled 2019-12-03 14:06:55 +01:00
Romuald Juchnowicz-Bierbasz
1f55253fd7 Wait until writer is closed 2019-12-03 14:04:19 +01:00
Romuald Juchnowicz-Bierbasz
7aa3b01abd Add Importer class (reuse code for importers) 2019-12-03 14:03:53 +01:00
3 changed files with 143 additions and 173 deletions

View File

@@ -2,14 +2,15 @@ from setuptools import setup, find_packages
setup( setup(
name="galaxy.plugin.api", name="galaxy.plugin.api",
version="0.60", version="0.63",
description="GOG Galaxy Integrations Python API", description="GOG Galaxy Integrations Python API",
author='Galaxy team', author='Galaxy team',
author_email='galaxy@gog.com', author_email='galaxy@gog.com',
packages=find_packages("src"), packages=find_packages("src"),
package_dir={'': 'src'}, package_dir={'': 'src'},
install_requires=[ install_requires=[
"aiohttp==3.5.4", "aiohttp>=3.5.4",
"certifi==2019.3.9" "certifi>=2019.3.9",
"psutil>=5.6.3; sys_platform == 'darwin'"
] ]
) )

View File

@@ -299,11 +299,14 @@ class Connection():
except TypeError: except TypeError:
raise InvalidRequest() raise InvalidRequest()
def _send(self, data): def _send(self, data, sensitive=True):
try: try:
line = self._encoder.encode(data) line = self._encoder.encode(data)
data = (line + "\n").encode("utf-8") data = (line + "\n").encode("utf-8")
logger.debug("Sending %d byte of data", len(data)) if sensitive:
logger.debug("Sending %d bytes of data", len(data))
else:
logging.debug("Sending data: %s", line)
self._writer.write(data) self._writer.write(data)
except TypeError as error: except TypeError as error:
logger.error(str(error)) logger.error(str(error))
@@ -314,7 +317,7 @@ class Connection():
"id": request_id, "id": request_id,
"result": result "result": result
} }
self._send(response) self._send(response, sensitive=False)
def _send_error(self, request_id, error): def _send_error(self, request_id, error):
response = { response = {
@@ -323,7 +326,7 @@ class Connection():
"error": error.json() "error": error.json()
} }
self._send(response) self._send(response, sensitive=False)
def _send_request(self, request_id, method, params): def _send_request(self, request_id, method, params):
request = { request = {
@@ -332,7 +335,7 @@ class Connection():
"id": request_id, "id": request_id,
"params": params "params": params
} }
self._send(request) self._send(request, sensitive=True)
def _send_notification(self, method, params): def _send_notification(self, method, params):
notification = { notification = {
@@ -340,7 +343,7 @@ class Connection():
"method": method, "method": method,
"params": params "params": params
} }
self._send(notification) self._send(notification, sensitive=True)
@staticmethod @staticmethod
def _log_request(request, sensitive_params): def _log_request(request, sensitive_params):

View File

@@ -31,6 +31,69 @@ class JSONEncoder(json.JSONEncoder):
return super().default(o) return super().default(o)
class Importer:
def __init__(
self,
task_manger,
name,
get,
prepare_context,
notification_success,
notification_failure,
notification_finished,
complete
):
self._task_manager = task_manger
self._name = name
self._get = get
self._prepare_context = prepare_context
self._notification_success = notification_success
self._notification_failure = notification_failure
self._notification_finished = notification_finished
self._complete = complete
self._import_in_progress = False
async def start(self, ids):
if self._import_in_progress:
raise ImportInProgress()
async def import_element(id_, context_):
try:
element = await self._get(id_, context_)
self._notification_success(id_, element)
except ApplicationError as error:
self._notification_failure(id_, error)
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Unexpected exception raised in %s importer", self._name)
self._notification_failure(id_, UnknownError())
async def import_elements(ids_, context_):
try:
imports = [import_element(id_, context_) for id_ in ids_]
await asyncio.gather(*imports)
self._notification_finished()
self._complete()
except asyncio.CancelledError:
logger.debug("Importing %s cancelled", self._name)
finally:
self._import_in_progress = False
self._import_in_progress = True
try:
context = await self._prepare_context(ids)
self._task_manager.create_task(
import_elements(ids, context),
"{} import".format(self._name),
handle_exceptions=False
)
except:
self._import_in_progress = False
raise
class Plugin: class Plugin:
"""Use and override methods of this class to create a new platform integration.""" """Use and override methods of this class to create a new platform integration."""
@@ -48,17 +111,62 @@ class Plugin:
encoder = JSONEncoder() encoder = JSONEncoder()
self._connection = Connection(self._reader, self._writer, encoder) self._connection = Connection(self._reader, self._writer, encoder)
self._achievements_import_in_progress = False
self._game_times_import_in_progress = False
self._game_library_settings_import_in_progress = False
self._os_compatibility_import_in_progress = False
self._user_presence_import_in_progress = False
self._persistent_cache = dict() self._persistent_cache = dict()
self._internal_task_manager = TaskManager("plugin internal") self._internal_task_manager = TaskManager("plugin internal")
self._external_task_manager = TaskManager("plugin external") self._external_task_manager = TaskManager("plugin external")
self._achievements_importer = Importer(
self._external_task_manager,
"achievements",
self.get_unlocked_achievements,
self.prepare_achievements_context,
self._game_achievements_import_success,
self._game_achievements_import_failure,
self._achievements_import_finished,
self.achievements_import_complete
)
self._game_time_importer = Importer(
self._external_task_manager,
"game times",
self.get_game_time,
self.prepare_game_times_context,
self._game_time_import_success,
self._game_time_import_failure,
self._game_times_import_finished,
self.game_times_import_complete
)
self._game_library_settings_importer = Importer(
self._external_task_manager,
"game library settings",
self.get_game_library_settings,
self.prepare_game_library_settings_context,
self._game_library_settings_import_success,
self._game_library_settings_import_failure,
self._game_library_settings_import_finished,
self.game_library_settings_import_complete
)
self._os_compatibility_importer = Importer(
self._external_task_manager,
"os compatibility",
self.get_os_compatibility,
self.prepare_os_compatibility_context,
self._os_compatibility_import_success,
self._os_compatibility_import_failure,
self._os_compatibility_import_finished,
self.os_compatibility_import_complete
)
self._user_presence_importer = Importer(
self._external_task_manager,
"users presence",
self.get_user_presence,
self.prepare_user_presence_context,
self._user_presence_import_success,
self._user_presence_import_failure,
self._user_presence_import_finished,
self.user_presence_import_complete
)
# internal # internal
self._register_method("shutdown", self._shutdown, internal=True) self._register_method("shutdown", self._shutdown, internal=True)
self._register_method("get_capabilities", self._get_capabilities, internal=True, immediate=True) self._register_method("get_capabilities", self._get_capabilities, internal=True, immediate=True)
@@ -435,7 +543,7 @@ class Plugin:
} }
) )
def _game_time_import_success(self, game_time: GameTime) -> None: def _game_time_import_success(self, game_id: str, game_time: GameTime) -> None:
params = {"game_time": game_time} params = {"game_time": game_time}
self._connection.send_notification("game_time_import_success", params) self._connection.send_notification("game_time_import_success", params)
@@ -449,7 +557,7 @@ class Plugin:
def _game_times_import_finished(self) -> None: def _game_times_import_finished(self) -> None:
self._connection.send_notification("game_times_import_finished", None) self._connection.send_notification("game_times_import_finished", None)
def _game_library_settings_import_success(self, game_library_settings: GameLibrarySettings) -> None: def _game_library_settings_import_success(self, game_id: str, game_library_settings: GameLibrarySettings) -> None:
params = {"game_library_settings": game_library_settings} params = {"game_library_settings": game_library_settings}
self._connection.send_notification("game_library_settings_import_success", params) self._connection.send_notification("game_library_settings_import_success", params)
@@ -636,36 +744,7 @@ class Plugin:
raise NotImplementedError() raise NotImplementedError()
async def _start_achievements_import(self, game_ids: List[str]) -> None: async def _start_achievements_import(self, game_ids: List[str]) -> None:
if self._achievements_import_in_progress: await self._achievements_importer.start(game_ids)
raise ImportInProgress()
context = await self.prepare_achievements_context(game_ids)
async def import_game_achievements(game_id, context_):
try:
achievements = await self.get_unlocked_achievements(game_id, context_)
self._game_achievements_import_success(game_id, achievements)
except ApplicationError as error:
self._game_achievements_import_failure(game_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_game_achievements")
self._game_achievements_import_failure(game_id, UnknownError())
async def import_games_achievements(game_ids_, context_):
try:
imports = [import_game_achievements(game_id, context_) for game_id in game_ids_]
await asyncio.gather(*imports)
finally:
self._achievements_import_finished()
self._achievements_import_in_progress = False
self.achievements_import_complete()
self._external_task_manager.create_task(
import_games_achievements(game_ids, context),
"unlocked achievements import",
handle_exceptions=False
)
self._achievements_import_in_progress = True
async def prepare_achievements_context(self, game_ids: List[str]) -> Any: async def prepare_achievements_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_unlocked_achievements. """Override this method to prepare context for get_unlocked_achievements.
@@ -800,36 +879,7 @@ class Plugin:
raise NotImplementedError() raise NotImplementedError()
async def _start_game_times_import(self, game_ids: List[str]) -> None: async def _start_game_times_import(self, game_ids: List[str]) -> None:
if self._game_times_import_in_progress: await self._game_time_importer.start(game_ids)
raise ImportInProgress()
context = await self.prepare_game_times_context(game_ids)
async def import_game_time(game_id, context_):
try:
game_time = await self.get_game_time(game_id, context_)
self._game_time_import_success(game_time)
except ApplicationError as error:
self._game_time_import_failure(game_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_game_time")
self._game_time_import_failure(game_id, UnknownError())
async def import_game_times(game_ids_, context_):
try:
imports = [import_game_time(game_id, context_) for game_id in game_ids_]
await asyncio.gather(*imports)
finally:
self._game_times_import_finished()
self._game_times_import_in_progress = False
self.game_times_import_complete()
self._external_task_manager.create_task(
import_game_times(game_ids, context),
"game times import",
handle_exceptions=False
)
self._game_times_import_in_progress = True
async def prepare_game_times_context(self, game_ids: List[str]) -> Any: async def prepare_game_times_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_game_time. """Override this method to prepare context for get_game_time.
@@ -858,36 +908,7 @@ class Plugin:
""" """
async def _start_game_library_settings_import(self, game_ids: List[str]) -> None: async def _start_game_library_settings_import(self, game_ids: List[str]) -> None:
if self._game_library_settings_import_in_progress: await self._game_library_settings_importer.start(game_ids)
raise ImportInProgress()
context = await self.prepare_game_library_settings_context(game_ids)
async def import_game_library_settings(game_id, context_):
try:
game_library_settings = await self.get_game_library_settings(game_id, context_)
self._game_library_settings_import_success(game_library_settings)
except ApplicationError as error:
self._game_library_settings_import_failure(game_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_game_library_settings")
self._game_library_settings_import_failure(game_id, UnknownError())
async def import_game_library_settings_set(game_ids_, context_):
try:
imports = [import_game_library_settings(game_id, context_) for game_id in game_ids_]
await asyncio.gather(*imports)
finally:
self._game_library_settings_import_finished()
self._game_library_settings_import_in_progress = False
self.game_library_settings_import_complete()
self._external_task_manager.create_task(
import_game_library_settings_set(game_ids, context),
"game library settings import",
handle_exceptions=False
)
self._game_library_settings_import_in_progress = True
async def prepare_game_library_settings_context(self, game_ids: List[str]) -> Any: async def prepare_game_library_settings_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_game_library_settings. """Override this method to prepare context for get_game_library_settings.
@@ -916,37 +937,7 @@ class Plugin:
""" """
async def _start_os_compatibility_import(self, game_ids: List[str]) -> None: async def _start_os_compatibility_import(self, game_ids: List[str]) -> None:
if self._os_compatibility_import_in_progress: await self._os_compatibility_importer.start(game_ids)
raise ImportInProgress()
context = await self.prepare_os_compatibility_context(game_ids)
async def import_os_compatibility(game_id, context_):
try:
os_compatibility = await self.get_os_compatibility(game_id, context_)
self._os_compatibility_import_success(game_id, os_compatibility)
except ApplicationError as error:
self._os_compatibility_import_failure(game_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_os_compatibility")
self._os_compatibility_import_failure(game_id, UnknownError())
async def import_os_compatibility_set(game_ids_, context_):
try:
await asyncio.gather(*[
import_os_compatibility(game_id, context_) for game_id in game_ids_
])
finally:
self._os_compatibility_import_finished()
self._os_compatibility_import_in_progress = False
self.os_compatibility_import_complete()
self._external_task_manager.create_task(
import_os_compatibility_set(game_ids, context),
"game OS compatibility import",
handle_exceptions=False
)
self._os_compatibility_import_in_progress = True
async def prepare_os_compatibility_context(self, game_ids: List[str]) -> Any: async def prepare_os_compatibility_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_os_compatibility. """Override this method to prepare context for get_os_compatibility.
@@ -972,37 +963,7 @@ class Plugin:
"""Override this method to handle operations after OS compatibility import is finished (like updating cache).""" """Override this method to handle operations after OS compatibility import is finished (like updating cache)."""
async def _start_user_presence_import(self, user_id_list: List[str]) -> None: async def _start_user_presence_import(self, user_id_list: List[str]) -> None:
if self._user_presence_import_in_progress: await self._user_presence_importer.start(user_id_list)
raise ImportInProgress()
context = await self.prepare_user_presence_context(user_id_list)
async def import_user_presence(user_id, context_) -> None:
try:
self._user_presence_import_success(user_id, await self.get_user_presence(user_id, context_))
except ApplicationError as error:
self._user_presence_import_failure(user_id, error)
except Exception:
logger.exception("Unexpected exception raised in import_user_presence")
self._user_presence_import_failure(user_id, UnknownError())
async def import_user_presence_set(user_id_list_, context_) -> None:
try:
await asyncio.gather(*[
import_user_presence(user_id, context_)
for user_id in user_id_list_
])
finally:
self._user_presence_import_finished()
self._user_presence_import_in_progress = False
self.user_presence_import_complete()
self._external_task_manager.create_task(
import_user_presence_set(user_id_list, context),
"user presence import",
handle_exceptions=False
)
self._user_presence_import_in_progress = True
async def prepare_user_presence_context(self, user_id_list: List[str]) -> Any: async def prepare_user_presence_context(self, user_id_list: List[str]) -> Any:
"""Override this method to prepare context for get_user_presence. """Override this method to prepare context for get_user_presence.
@@ -1067,10 +1028,15 @@ def create_and_run_plugin(plugin_class, argv):
async def coroutine(): async def coroutine():
reader, writer = await asyncio.open_connection("127.0.0.1", port) reader, writer = await asyncio.open_connection("127.0.0.1", port)
extra_info = writer.get_extra_info("sockname") try:
logger.info("Using local address: %s:%u", *extra_info) extra_info = writer.get_extra_info("sockname")
async with plugin_class(reader, writer, token) as plugin: logger.info("Using local address: %s:%u", *extra_info)
await plugin.run() async with plugin_class(reader, writer, token) as plugin:
await plugin.run()
finally:
writer.close()
await writer.wait_closed()
try: try:
if sys.platform == "win32": if sys.platform == "win32":