Compare commits

...

9 Commits
0.43 ... 0.45

Author SHA1 Message Date
Romuald Juchnowicz-Bierbasz
e33dd09a8d Increment version 2019-08-02 12:57:13 +02:00
Aleksej Pawlowskij
f4ea2af924 Make 'handshake_complete' safe 2019-08-02 12:29:48 +02:00
Romuald Bierbasz
3bcc674518 Use MagicMocks with async_return_value and pytest.mark.asyncio 2019-08-02 12:28:13 +02:00
Romuald Juchnowicz-Bierbasz
b14595bef5 Fix types 2019-08-02 12:20:27 +02:00
Romuald Bierbasz
a9acb7a0db SDK-2931: Refactor import methods 2019-08-02 10:10:58 +02:00
Mesco
d95aacb9d8 Fix typo in docstring 2019-08-01 09:53:50 +02:00
Aleksej Pawlowskij
49ae2beab9 SDK-2983: Split entry methods from feature detection 2019-07-29 12:59:45 +02:00
Aleksej Pawlowskij
c9e190772c Increment version 2019-07-26 16:28:47 +02:00
Aleksej Pawlowskij
789415d31b SDK-2974: add process info tools 2019-07-26 13:41:43 +02:00
24 changed files with 1097 additions and 894 deletions

View File

@@ -5,4 +5,5 @@ pytest-mock==1.10.3
pytest-flakes==4.0.0 pytest-flakes==4.0.0
# because of pip bug https://github.com/pypa/pip/issues/4780 # because of pip bug https://github.com/pypa/pip/issues/4780
aiohttp==3.5.4 aiohttp==3.5.4
certifi==2019.3.9 certifi==2019.3.9
psutil==5.6.3; sys_platform == 'darwin'

View File

@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name="galaxy.plugin.api", name="galaxy.plugin.api",
version="0.43", version="0.45",
description="GOG Galaxy Integrations Python API", description="GOG Galaxy Integrations Python API",
author='Galaxy team', author='Galaxy team',
author_email='galaxy@gog.com', author_email='galaxy@gog.com',

View File

@@ -1,21 +1,18 @@
import asyncio import asyncio
import dataclasses
import json import json
import logging import logging
import logging.handlers import logging.handlers
import dataclasses
from enum import Enum
from collections import OrderedDict
from itertools import count
import sys import sys
from collections import OrderedDict
from enum import Enum
from itertools import count
from typing import Any, Dict, List, Optional, Set, Union
from typing import Any, List, Dict, Optional, Union
from galaxy.api.types import Achievement, Game, LocalGame, FriendInfo, GameTime
from galaxy.api.jsonrpc import Server, NotificationClient, ApplicationError
from galaxy.api.consts import Feature from galaxy.api.consts import Feature
from galaxy.api.errors import UnknownError, ImportInProgress from galaxy.api.errors import ImportInProgress, UnknownError
from galaxy.api.types import Authentication, NextStep from galaxy.api.jsonrpc import ApplicationError, NotificationClient, Server
from galaxy.api.types import Achievement, Authentication, FriendInfo, Game, GameTime, LocalGame, NextStep
class JSONEncoder(json.JSONEncoder): class JSONEncoder(json.JSONEncoder):
@@ -24,6 +21,7 @@ class JSONEncoder(json.JSONEncoder):
# filter None values # filter None values
def dict_factory(elements): def dict_factory(elements):
return {k: v for k, v in elements if v is not None} return {k: v for k, v in elements if v is not None}
return dataclasses.asdict(o, dict_factory=dict_factory) return dataclasses.asdict(o, dict_factory=dict_factory)
if isinstance(o, Enum): if isinstance(o, Enum):
return o.value return o.value
@@ -32,12 +30,13 @@ class JSONEncoder(json.JSONEncoder):
class Plugin: class Plugin:
"""Use and override methods of this class to create a new platform integration.""" """Use and override methods of this class to create a new platform integration."""
def __init__(self, platform, version, reader, writer, handshake_token): def __init__(self, platform, version, reader, writer, handshake_token):
logging.info("Creating plugin for platform %s, version %s", platform.value, version) logging.info("Creating plugin for platform %s, version %s", platform.value, version)
self._platform = platform self._platform = platform
self._version = version self._version = version
self._feature_methods = OrderedDict() self._features: Set[Feature] = set()
self._active = True self._active = True
self._pass_control_task = None self._pass_control_task = None
@@ -50,6 +49,7 @@ class Plugin:
def eof_handler(): def eof_handler():
self._shutdown() self._shutdown()
self._server.register_eof(eof_handler) self._server.register_eof(eof_handler)
self._achievements_import_in_progress = False self._achievements_import_in_progress = False
@@ -85,63 +85,37 @@ class Plugin:
self._register_method( self._register_method(
"import_owned_games", "import_owned_games",
self.get_owned_games, self.get_owned_games,
result_name="owned_games", result_name="owned_games"
feature=Feature.ImportOwnedGames
)
self._register_method(
"import_unlocked_achievements",
self.get_unlocked_achievements,
result_name="unlocked_achievements",
feature=Feature.ImportAchievements
)
self._register_method(
"start_achievements_import",
self.start_achievements_import,
)
self._register_method(
"import_local_games",
self.get_local_games,
result_name="local_games",
feature=Feature.ImportInstalledGames
)
self._register_notification("launch_game", self.launch_game, feature=Feature.LaunchGame)
self._register_notification("install_game", self.install_game, feature=Feature.InstallGame)
self._register_notification(
"uninstall_game",
self.uninstall_game,
feature=Feature.UninstallGame
)
self._register_notification(
"shutdown_platform_client",
self.shutdown_platform_client,
feature=Feature.ShutdownPlatformClient
)
self._register_method(
"import_friends",
self.get_friends,
result_name="friend_info_list",
feature=Feature.ImportFriends
)
self._register_method(
"import_game_times",
self.get_game_times,
result_name="game_times",
feature=Feature.ImportGameTime
)
self._register_method(
"start_game_times_import",
self.start_game_times_import,
) )
self._detect_feature(Feature.ImportOwnedGames, ["get_owned_games"])
self._register_method("start_achievements_import", self._start_achievements_import)
self._detect_feature(Feature.ImportAchievements, ["get_unlocked_achievements"])
self._register_method("import_local_games", self.get_local_games, result_name="local_games")
self._detect_feature(Feature.ImportInstalledGames, ["get_local_games"])
self._register_notification("launch_game", self.launch_game)
self._detect_feature(Feature.LaunchGame, ["launch_game"])
self._register_notification("install_game", self.install_game)
self._detect_feature(Feature.InstallGame, ["install_game"])
self._register_notification("uninstall_game", self.uninstall_game)
self._detect_feature(Feature.UninstallGame, ["uninstall_game"])
self._register_notification("shutdown_platform_client", self.shutdown_platform_client)
self._detect_feature(Feature.ShutdownPlatformClient, ["shutdown_platform_client"])
self._register_method("import_friends", self.get_friends, result_name="friend_info_list")
self._detect_feature(Feature.ImportFriends, ["get_friends"])
self._register_method("start_game_times_import", self._start_game_times_import)
self._detect_feature(Feature.ImportGameTime, ["get_game_time"])
@property @property
def features(self): def features(self) -> List[Feature]:
features = [] return list(self._features)
if self.__class__ != Plugin:
for feature, handlers in self._feature_methods.items():
if self._implements(handlers):
features.append(feature)
return features
@property @property
def persistent_cache(self) -> Dict: def persistent_cache(self) -> Dict:
@@ -149,13 +123,17 @@ class Plugin:
""" """
return self._persistent_cache return self._persistent_cache
def _implements(self, handlers): def _implements(self, methods: List[str]) -> bool:
for handler in handlers: for method in methods:
if handler.__name__ not in self.__class__.__dict__: if method not in self.__class__.__dict__:
return False return False
return True return True
def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False, feature=None): def _detect_feature(self, feature: Feature, methods: List[str]):
if self._implements(methods):
self._features.add(feature)
def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False):
if internal: if internal:
def method(*args, **kwargs): def method(*args, **kwargs):
result = handler(*args, **kwargs) result = handler(*args, **kwargs)
@@ -164,6 +142,7 @@ class Plugin:
result_name: result result_name: result
} }
return result return result
self._server.register_method(name, method, True, sensitive_params) self._server.register_method(name, method, True, sensitive_params)
else: else:
async def method(*args, **kwargs): async def method(*args, **kwargs):
@@ -173,17 +152,12 @@ class Plugin:
result_name: result result_name: result
} }
return result return result
self._server.register_method(name, method, False, sensitive_params) self._server.register_method(name, method, False, sensitive_params)
if feature is not None: def _register_notification(self, name, handler, internal=False, sensitive_params=False):
self._feature_methods.setdefault(feature, []).append(handler)
def _register_notification(self, name, handler, internal=False, sensitive_params=False, feature=None):
self._server.register_notification(name, handler, internal, sensitive_params) self._server.register_notification(name, handler, internal, sensitive_params)
if feature is not None:
self._feature_methods.setdefault(feature, []).append(handler)
async def run(self): async def run(self):
"""Plugin's main coroutine.""" """Plugin's main coroutine."""
await self._server.run() await self._server.run()
@@ -192,6 +166,7 @@ class Plugin:
def create_task(self, coro, description): def create_task(self, coro, description):
"""Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown""" """Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown"""
async def task_wrapper(task_id): async def task_wrapper(task_id):
try: try:
return await coro return await coro
@@ -233,7 +208,10 @@ class Plugin:
def _initialize_cache(self, data: Dict): def _initialize_cache(self, data: Dict):
self._persistent_cache = data self._persistent_cache = data
self.handshake_complete() try:
self.handshake_complete()
except Exception:
logging.exception("Unhandled exception during `handshake_complete` step")
self._pass_control_task = asyncio.create_task(self._pass_control()) self._pass_control_task = asyncio.create_task(self._pass_control())
@staticmethod @staticmethod
@@ -331,26 +309,14 @@ class Plugin:
} }
self._notification_client.notify("achievement_unlocked", params) self._notification_client.notify("achievement_unlocked", params)
def game_achievements_import_success(self, game_id: str, achievements: List[Achievement]) -> None: def _game_achievements_import_success(self, game_id: str, achievements: List[Achievement]) -> None:
"""Notify the client that import of achievements for a given game has succeeded.
This method is called by import_games_achievements.
:param game_id: id of the game for which the achievements were imported
:param achievements: list of imported achievements
"""
params = { params = {
"game_id": game_id, "game_id": game_id,
"unlocked_achievements": achievements "unlocked_achievements": achievements
} }
self._notification_client.notify("game_achievements_import_success", params) self._notification_client.notify("game_achievements_import_success", params)
def game_achievements_import_failure(self, game_id: str, error: ApplicationError) -> None: def _game_achievements_import_failure(self, game_id: str, error: ApplicationError) -> None:
"""Notify the client that import of achievements for a given game has failed.
This method is called by import_games_achievements.
:param game_id: id of the game for which the achievements import failed
:param error: error which prevented the achievements import
"""
params = { params = {
"game_id": game_id, "game_id": game_id,
"error": { "error": {
@@ -360,9 +326,7 @@ class Plugin:
} }
self._notification_client.notify("game_achievements_import_failure", params) self._notification_client.notify("game_achievements_import_failure", params)
def achievements_import_finished(self) -> None: def _achievements_import_finished(self) -> None:
"""Notify the client that importing achievements has finished.
This method is called by import_games_achievements_task"""
self._notification_client.notify("achievements_import_finished", None) self._notification_client.notify("achievements_import_finished", None)
def update_local_game_status(self, local_game: LocalGame) -> None: def update_local_game_status(self, local_game: LocalGame) -> None:
@@ -382,7 +346,7 @@ class Plugin:
continue continue
self.update_local_game_status(LocalGame(game.id, game.status)) self.update_local_game_status(LocalGame(game.id, game.status))
self._cached_games_statuses[game.id] = game.status self._cached_games_statuses[game.id] = game.status
asyncio.sleep(5) # interval await asyncio.sleep(5) # interval
def tick(self): def tick(self):
if self._check_statuses_task is None or self._check_statuses_task.done(): if self._check_statuses_task is None or self._check_statuses_task.done():
@@ -415,22 +379,11 @@ class Plugin:
params = {"game_time": game_time} params = {"game_time": game_time}
self._notification_client.notify("game_time_updated", params) self._notification_client.notify("game_time_updated", params)
def game_time_import_success(self, game_time: GameTime) -> None: def _game_time_import_success(self, game_time: GameTime) -> None:
"""Notify the client that import of a given game_time has succeeded.
This method is called by import_game_times.
:param game_time: game_time which was imported
"""
params = {"game_time": game_time} params = {"game_time": game_time}
self._notification_client.notify("game_time_import_success", params) self._notification_client.notify("game_time_import_success", params)
def game_time_import_failure(self, game_id: str, error: ApplicationError) -> None: def _game_time_import_failure(self, game_id: str, error: ApplicationError) -> None:
"""Notify the client that import of a game time for a given game has failed.
This method is called by import_game_times.
:param game_id: id of the game for which the game time could not be imported
:param error: error which prevented the game time import
"""
params = { params = {
"game_id": game_id, "game_id": game_id,
"error": { "error": {
@@ -440,10 +393,7 @@ class Plugin:
} }
self._notification_client.notify("game_time_import_failure", params) self._notification_client.notify("game_time_import_failure", params)
def game_times_import_finished(self) -> None: def _game_times_import_finished(self) -> None:
"""Notify the client that importing game times has finished.
This method is called by :meth:`~.import_game_times_task`.
"""
self._notification_client.notify("game_times_import_finished", None) self._notification_client.notify("game_times_import_finished", None)
def lost_authentication(self) -> None: def lost_authentication(self) -> None:
@@ -524,7 +474,7 @@ class Plugin:
raise NotImplementedError() raise NotImplementedError()
async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \ async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \
-> Union[NextStep, Authentication]: -> Union[NextStep, Authentication]:
"""This method is called if we return galaxy.api.types.NextStep from authenticate or from pass_login_credentials. """This method is called if we return galaxy.api.types.NextStep from authenticate or from pass_login_credentials.
This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on. This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on.
This method should either return galaxy.api.types.Authentication if the authentication is finished This method should either return galaxy.api.types.Authentication if the authentication is finished
@@ -572,50 +522,51 @@ class Plugin:
""" """
raise NotImplementedError() raise NotImplementedError()
async def get_unlocked_achievements(self, game_id: str) -> List[Achievement]: async def _start_achievements_import(self, game_ids: List[str]) -> None:
"""
.. deprecated:: 0.33
Use :meth:`~.import_games_achievements`.
"""
raise NotImplementedError()
async def start_achievements_import(self, game_ids: List[str]) -> None:
"""Starts the task of importing achievements.
This method is called by the GOG Galaxy Client.
:param game_ids: ids of the games for which the achievements are imported
"""
if self._achievements_import_in_progress: if self._achievements_import_in_progress:
raise ImportInProgress() raise ImportInProgress()
async def import_games_achievements_task(game_ids): context = await self.prepare_achievements_context(game_ids)
async def import_game_achievements(game_id, context_):
try: try:
await self.import_games_achievements(game_ids) achievements = await self.get_unlocked_achievements(game_id, context_)
self._game_achievements_import_success(game_id, achievements)
except ApplicationError as error:
self._game_achievements_import_failure(game_id, error)
except Exception:
logging.exception("Unexpected exception raised in import_game_achievements")
self._game_achievements_import_failure(game_id, UnknownError())
async def import_games_achievements(game_ids_, context_):
try:
imports = [import_game_achievements(game_id, context_) for game_id in game_ids_]
await asyncio.gather(*imports)
finally: finally:
self.achievements_import_finished() self._achievements_import_finished()
self._achievements_import_in_progress = False self._achievements_import_in_progress = False
asyncio.create_task(import_games_achievements_task(game_ids)) self.create_task(import_games_achievements(game_ids, context), "Games unlocked achievements import")
self._achievements_import_in_progress = True self._achievements_import_in_progress = True
async def import_games_achievements(self, game_ids: List[str]) -> None: async def prepare_achievements_context(self, game_ids: List[str]) -> Any:
""" """Override this method to prepare context for get_unlocked_achievements.
Override this method to return the unlocked achievements
of the user that is currently logged in to the plugin.
Call game_achievements_import_success/game_achievements_import_failure for each game_id on the list.
This method is called by the GOG Galaxy Client.
:param game_ids: ids of the games for which to import unlocked achievements This allows for optimizations like batch requests to platform API.
Default implementation returns None.
""" """
async def import_game_achievements(game_id): return None
try:
achievements = await self.get_unlocked_achievements(game_id)
self.game_achievements_import_success(game_id, achievements)
except Exception as error:
self.game_achievements_import_failure(game_id, error)
imports = [import_game_achievements(game_id) for game_id in game_ids] async def get_unlocked_achievements(self, game_id: str, context: Any) -> List[Achievement]:
await asyncio.gather(*imports) """Override this method to return list of unlocked achievements
for the game identified by the provided game_id.
This method is called by import task initialized by GOG Galaxy Client.
:param game_id:
:param context: Value return from :meth:`prepare_achievements_context`
:return:
"""
raise NotImplementedError()
async def get_local_games(self) -> List[LocalGame]: async def get_local_games(self) -> List[LocalGame]:
"""Override this method to return the list of """Override this method to return the list of
@@ -718,54 +669,50 @@ class Plugin:
""" """
raise NotImplementedError() raise NotImplementedError()
async def get_game_times(self) -> List[GameTime]: async def _start_game_times_import(self, game_ids: List[str]) -> None:
"""
.. deprecated:: 0.33
Use :meth:`~.import_game_times`.
"""
raise NotImplementedError()
async def start_game_times_import(self, game_ids: List[str]) -> None:
"""Starts the task of importing game times
This method is called by the GOG Galaxy Client.
:param game_ids: ids of the games for which the game time is imported
"""
if self._game_times_import_in_progress: if self._game_times_import_in_progress:
raise ImportInProgress() raise ImportInProgress()
async def import_game_times_task(game_ids): context = await self.prepare_game_times_context(game_ids)
async def import_game_time(game_id, context_):
try: try:
await self.import_game_times(game_ids) game_time = await self.get_game_time(game_id, context_)
self._game_time_import_success(game_time)
except ApplicationError as error:
self._game_time_import_failure(game_id, error)
except Exception:
logging.exception("Unexpected exception raised in import_game_time")
self._game_time_import_failure(game_id, UnknownError())
async def import_game_times(game_ids_, context_):
try:
imports = [import_game_time(game_id, context_) for game_id in game_ids_]
await asyncio.gather(*imports)
finally: finally:
self.game_times_import_finished() self._game_times_import_finished()
self._game_times_import_in_progress = False self._game_times_import_in_progress = False
asyncio.create_task(import_game_times_task(game_ids)) self.create_task(import_game_times(game_ids, context), "Game times import")
self._game_times_import_in_progress = True self._game_times_import_in_progress = True
async def import_game_times(self, game_ids: List[str]) -> None: async def prepare_game_times_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_game_time.
This allows for optimizations like batch requests to platform API.
Default implementation returns None.
""" """
Override this method to return game times for return None
games owned by the currently authenticated user.
Call game_time_import_success/game_time_import_failure for each game_id on the list.
This method is called by GOG Galaxy Client.
:param game_ids: ids of the games for which the game time is imported async def get_game_time(self, game_id: str, context: Any) -> GameTime:
"""Override this method to return the game time for the game
identified by the provided game_id.
This method is called by import task initialized by GOG Galaxy Client.
:param game_id:
:param context: Value return from :meth:`prepare_game_times_context`
:return:
""" """
try: raise NotImplementedError()
game_times = await self.get_game_times()
game_ids_set = set(game_ids)
for game_time in game_times:
if game_time.game_id not in game_ids_set:
continue
self.game_time_import_success(game_time)
game_ids_set.discard(game_time.game_id)
for game_id in game_ids_set:
self.game_time_import_failure(game_id, UnknownError())
except Exception as error:
for game_id in game_ids:
self.game_time_import_failure(game_id, error)
def create_and_run_plugin(plugin_class, argv): def create_and_run_plugin(plugin_class, argv):

View File

@@ -61,7 +61,6 @@ class NextStep():
:param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`, "window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`} :param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`, "window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`}
:param cookies: browser initial set of cookies :param cookies: browser initial set of cookies
:param js: a map of the url regex patterns into the list of *js* scripts that should be executed on every document at given step of internal browser authentication. :param js: a map of the url regex patterns into the list of *js* scripts that should be executed on every document at given step of internal browser authentication.
""" """
next_step: str next_step: str
auth_params: Dict[str, str] auth_params: Dict[str, str]

91
src/galaxy/proc_tools.py Normal file
View File

@@ -0,0 +1,91 @@
import platform
from dataclasses import dataclass
from typing import Iterable, NewType, Optional, Set
def is_windows():
return platform.system() == "Windows"
ProcessId = NewType("ProcessId", int)
@dataclass
class ProcessInfo:
pid: ProcessId
binary_path: Optional[str]
if is_windows():
from ctypes import byref, sizeof, windll, create_unicode_buffer, FormatError, WinError
from ctypes.wintypes import DWORD
def pids() -> Iterable[ProcessId]:
_PROC_ID_T = DWORD
list_size = 4096
def try_get_pids(list_size: int) -> Set[ProcessId]:
result_size = DWORD()
proc_id_list = (_PROC_ID_T * list_size)()
if not windll.psapi.EnumProcesses(byref(proc_id_list), sizeof(proc_id_list), byref(result_size)):
raise WinError(descr="Failed to get process ID list: %s" % FormatError())
return proc_id_list[:int(result_size.value / sizeof(_PROC_ID_T()))]
while True:
proc_ids = try_get_pids(list_size)
if len(proc_ids) < list_size:
return proc_ids
list_size *= 2
def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]:
_PROC_QUERY_LIMITED_INFORMATION = 0x1000
process_info = ProcessInfo(pid=pid, binary_path=None)
h_process = windll.kernel32.OpenProcess(_PROC_QUERY_LIMITED_INFORMATION, False, pid)
if not h_process:
return process_info
try:
def get_exe_path() -> Optional[str]:
_MAX_PATH = 260
_WIN32_PATH_FORMAT = 0x0000
exe_path_buffer = create_unicode_buffer(_MAX_PATH)
exe_path_len = DWORD(len(exe_path_buffer))
return exe_path_buffer[:exe_path_len.value] if windll.kernel32.QueryFullProcessImageNameW(
h_process, _WIN32_PATH_FORMAT, exe_path_buffer, byref(exe_path_len)
) else None
process_info.binary_path = get_exe_path()
finally:
windll.kernel32.CloseHandle(h_process)
return process_info
else:
import psutil
def pids() -> Iterable[ProcessId]:
for pid in psutil.pids():
yield pid
def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]:
process_info = ProcessInfo(pid=pid, binary_path=None)
try:
process_info.binary_path = psutil.Process(pid=pid).as_dict(attrs=["exe"])["exe"]
except psutil.NoSuchProcess:
pass
finally:
return process_info
def process_iter() -> Iterable[ProcessInfo]:
for pid in pids():
yield get_process_info(pid)

View File

@@ -3,6 +3,7 @@ import os
import zipfile import zipfile
from glob import glob from glob import glob
def zip_folder(folder): def zip_folder(folder):
files = glob(os.path.join(folder, "**"), recursive=True) files = glob(os.path.join(folder, "**"), recursive=True)
files = [file.replace(folder + os.sep, "") for file in files] files = [file.replace(folder + os.sep, "") for file in files]
@@ -14,6 +15,7 @@ def zip_folder(folder):
zipf.write(os.path.join(folder, file), arcname=file) zipf.write(os.path.join(folder, file), arcname=file)
return zip_buffer return zip_buffer
def zip_folder_to_file(folder, filename): def zip_folder_to_file(folder, filename):
zip_content = zip_folder(folder).getbuffer() zip_content = zip_folder(folder).getbuffer()
with open(filename, "wb") as archive: with open(filename, "wb") as archive:

View File

@@ -1,12 +1,31 @@
from asyncio import coroutine import asyncio
from unittest.mock import MagicMock from unittest.mock import MagicMock
class AsyncMock(MagicMock): class AsyncMock(MagicMock):
"""
..deprecated:: 0.45
Use: :class:`MagicMock` with meth:`~.async_return_value`.
"""
async def __call__(self, *args, **kwargs): async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs) return super(AsyncMock, self).__call__(*args, **kwargs)
def coroutine_mock(): def coroutine_mock():
"""
..deprecated:: 0.45
Use: :class:`MagicMock` with meth:`~.async_return_value`.
"""
coro = MagicMock(name="CoroutineResult") coro = MagicMock(name="CoroutineResult")
corofunc = MagicMock(name="CoroutineFunction", side_effect=coroutine(coro)) corofunc = MagicMock(name="CoroutineFunction", side_effect=asyncio.coroutine(coro))
corofunc.coro = coro corofunc.coro = coro
return corofunc return corofunc
async def skip_loop(iterations=1):
for _ in range(iterations):
await asyncio.sleep(0)
async def async_return_value(return_value, loop_iterations_delay=0):
await skip_loop(loop_iterations_delay)
return return_value

View File

@@ -0,0 +1,19 @@
import json
def create_message(request):
return json.dumps(request).encode() + b"\n"
def get_messages(write_mock):
messages = []
print("call_args_list", write_mock.call_args_list)
for call_args in write_mock.call_args_list:
print("call_args", call_args)
data = call_args[0][0]
print("data", data)
for line in data.splitlines():
message = json.loads(line)
messages.append(message)
return messages

View File

@@ -6,19 +6,18 @@ import pytest
from galaxy.api.plugin import Plugin from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform from galaxy.api.consts import Platform
from galaxy.unittest.mock import AsyncMock, coroutine_mock
@pytest.fixture() @pytest.fixture()
def reader(): def reader():
stream = MagicMock(name="stream_reader") stream = MagicMock(name="stream_reader")
stream.read = AsyncMock() stream.read = MagicMock()
yield stream yield stream
@pytest.fixture() @pytest.fixture()
def writer(): async def writer():
stream = MagicMock(name="stream_writer") stream = MagicMock(name="stream_writer")
stream.write = MagicMock() stream.write = MagicMock()
stream.drain = AsyncMock() stream.drain = MagicMock()
yield stream yield stream
@pytest.fixture() @pytest.fixture()
@@ -32,32 +31,30 @@ def write(writer):
@pytest.fixture() @pytest.fixture()
def plugin(reader, writer): def plugin(reader, writer):
"""Return plugin instance with all feature methods mocked""" """Return plugin instance with all feature methods mocked"""
async_methods = ( methods = (
"handshake_complete", "handshake_complete",
"authenticate", "authenticate",
"get_owned_games", "get_owned_games",
"prepare_achievements_context",
"get_unlocked_achievements", "get_unlocked_achievements",
"get_local_games", "get_local_games",
"launch_game", "launch_game",
"install_game", "install_game",
"uninstall_game", "uninstall_game",
"get_friends", "get_friends",
"get_game_times", "get_game_time",
"shutdown_platform_client" "prepare_game_times_context",
) "shutdown_platform_client",
methods = (
"shutdown", "shutdown",
"tick" "tick"
) )
with ExitStack() as stack: with ExitStack() as stack:
for method in async_methods:
stack.enter_context(patch.object(Plugin, method, new_callable=coroutine_mock))
for method in methods: for method in methods:
stack.enter_context(patch.object(Plugin, method)) stack.enter_context(patch.object(Plugin, method))
yield Plugin(Platform.Generic, "0.1", reader, writer, "token") yield Plugin(Platform.Generic, "0.1", reader, writer, "token")
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def my_caplog(caplog): def my_caplog(caplog):
caplog.set_level(logging.DEBUG) caplog.set_level(logging.DEBUG)

View File

@@ -1,94 +1,203 @@
import asyncio
import json import json
from unittest.mock import call
import pytest import pytest
from pytest import raises from pytest import raises
from galaxy.api.types import Achievement from galaxy.api.types import Achievement
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError from galaxy.api.errors import BackendError
from galaxy.unittest.mock import async_return_value
from tests import create_message, get_messages
def test_initialization_no_unlock_time(): def test_initialization_no_unlock_time():
with raises(Exception): with raises(Exception):
Achievement(achievement_id="lvl30", achievement_name="Got level 30") Achievement(achievement_id="lvl30", achievement_name="Got level 30")
def test_initialization_no_id_nor_name(): def test_initialization_no_id_nor_name():
with raises(AssertionError): with raises(AssertionError):
Achievement(unlock_time=1234567890) Achievement(unlock_time=1234567890)
def test_success(plugin, read, write):
@pytest.mark.asyncio
async def test_get_unlocked_achievements_success(plugin, read, write):
plugin.prepare_achievements_context.return_value = async_return_value(5)
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_unlocked_achievements", "method": "start_achievements_import",
"params": { "params": {
"game_id": "14" "game_ids": ["14"]
} }
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_unlocked_achievements.coro.return_value = [ plugin.get_unlocked_achievements.return_value = async_return_value([
Achievement(achievement_id="lvl10", unlock_time=1548421241), Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395), Achievement(achievement_name="Got level 20", unlock_time=1548422395),
Achievement(achievement_id="lvl30", achievement_name="Got level 30", unlock_time=1548495633) Achievement(achievement_id="lvl30", achievement_name="Got level 30", unlock_time=1548495633)
] ])
asyncio.run(plugin.run()) await plugin.run()
plugin.get_unlocked_achievements.assert_called_with(game_id="14") plugin.prepare_achievements_context.assert_called_with(["14"])
response = json.loads(write.call_args[0][0]) plugin.get_unlocked_achievements.assert_called_with("14", 5)
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"id": "3", "jsonrpc": "2.0",
"result": { "id": "3",
"unlocked_achievements": [ "result": None
{ },
"achievement_id": "lvl10", {
"unlock_time": 1548421241 "jsonrpc": "2.0",
}, "method": "game_achievements_import_success",
{ "params": {
"achievement_name": "Got level 20", "game_id": "14",
"unlock_time": 1548422395 "unlocked_achievements": [
}, {
{ "achievement_id": "lvl10",
"achievement_id": "lvl30", "unlock_time": 1548421241
"achievement_name": "Got level 30", },
"unlock_time": 1548495633 {
} "achievement_name": "Got level 20",
] "unlock_time": 1548422395
},
{
"achievement_id": "lvl30",
"achievement_name": "Got level 30",
"unlock_time": 1548495633
}
]
}
},
{
"jsonrpc": "2.0",
"method": "achievements_import_finished",
"params": None
} }
} ]
def test_failure(plugin, read, write):
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_unlocked_achievements_error(exception, code, message, plugin, read, write):
plugin.prepare_achievements_context.return_value = async_return_value(None)
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_unlocked_achievements", "method": "start_achievements_import",
"params": { "params": {
"game_id": "14" "game_ids": ["14"]
} }
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_unlocked_achievements.coro.side_effect = UnknownError() plugin.get_unlocked_achievements.side_effect = exception
asyncio.run(plugin.run()) await plugin.run()
plugin.get_unlocked_achievements.assert_called() plugin.get_unlocked_achievements.assert_called()
response = json.loads(write.call_args[0][0])
assert response == { assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "game_achievements_import_failure",
"params": {
"game_id": "14",
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "achievements_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_unlocked_achievements_context_error(plugin, read, write):
plugin.prepare_achievements_context.side_effect = BackendError()
request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"error": { "method": "start_achievements_import",
"code": 0, "params": {
"message": "Unknown error" "game_ids": ["14"]
} }
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
await plugin.run()
def test_unlock_achievement(plugin, write): assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 4,
"message": "Backend error"
}
}
]
@pytest.mark.asyncio
async def test_import_in_progress(plugin, read, write):
plugin.prepare_achievements_context.return_value = async_return_value(None)
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_achievements_import",
"params": {
"game_ids": ["14"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_achievements_import",
"params": {
"game_ids": ["15"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"")
]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
}
]
@pytest.mark.asyncio
async def test_unlock_achievement(plugin, write):
achievement = Achievement(achievement_id="lvl20", unlock_time=1548422395) achievement = Achievement(achievement_id="lvl20", unlock_time=1548422395)
plugin.unlock_achievement("14", achievement)
async def couritine():
plugin.unlock_achievement("14", achievement)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0]) response = json.loads(write.call_args[0][0])
assert response == { assert response == {
@@ -102,92 +211,3 @@ def test_unlock_achievement(plugin, write):
} }
} }
} }
@pytest.mark.asyncio
async def test_game_achievements_import_success(plugin, write):
achievements = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395)
]
plugin.game_achievements_import_success("134", achievements)
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_achievements_import_success",
"params": {
"game_id": "134",
"unlocked_achievements": [
{
"achievement_id": "lvl10",
"unlock_time": 1548421241
},
{
"achievement_name": "Got level 20",
"unlock_time": 1548422395
}
]
}
}
@pytest.mark.asyncio
async def test_game_achievements_import_failure(plugin, write):
plugin.game_achievements_import_failure("134", ImportInProgress())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_achievements_import_failure",
"params": {
"game_id": "134",
"error": {
"code": 600,
"message": "Import already in progress"
}
}
}
@pytest.mark.asyncio
async def test_achievements_import_finished(plugin, write):
plugin.achievements_import_finished()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "achievements_import_finished",
"params": None
}
@pytest.mark.asyncio
async def test_start_achievements_import(plugin, write, mocker):
game_achievements_import_success = mocker.patch.object(plugin, "game_achievements_import_success")
game_achievements_import_failure = mocker.patch.object(plugin, "game_achievements_import_failure")
achievements_import_finished = mocker.patch.object(plugin, "achievements_import_finished")
game_ids = ["1", "5", "9"]
error = BackendError()
achievements = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395)
]
plugin.get_unlocked_achievements.coro.side_effect = [
achievements,
[],
error
]
await plugin.start_achievements_import(game_ids)
with pytest.raises(ImportInProgress):
await plugin.start_achievements_import(["4", "8"])
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_unlocked_achievements.coro.assert_has_calls([call("1"), call("5"), call("9")])
game_achievements_import_success.assert_has_calls([
call("1", achievements),
call("5", [])
])
game_achievements_import_failure.assert_called_once_with("9", error)
achievements_import_finished.assert_called_once_with()

View File

@@ -1,6 +1,3 @@
import asyncio
import json
import pytest import pytest
from galaxy.api.types import Authentication from galaxy.api.types import Authentication
@@ -8,29 +5,36 @@ from galaxy.api.errors import (
UnknownError, InvalidCredentials, NetworkError, LoggedInElsewhere, ProtocolError, UnknownError, InvalidCredentials, NetworkError, LoggedInElsewhere, ProtocolError,
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied
) )
from galaxy.unittest.mock import async_return_value
def test_success(plugin, read, write): from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_success(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "init_authentication" "method": "init_authentication"
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
read.side_effect = [json.dumps(request).encode() + b"\n", b""] plugin.authenticate.return_value = async_return_value(Authentication("132", "Zenek"))
plugin.authenticate.coro.return_value = Authentication("132", "Zenek") await plugin.run()
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with() plugin.authenticate.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"id": "3", "jsonrpc": "2.0",
"result": { "id": "3",
"user_id": "132", "result": {
"user_name": "Zenek" "user_id": "132",
"user_name": "Zenek"
}
} }
} ]
@pytest.mark.asyncio
@pytest.mark.parametrize("error,code,message", [ @pytest.mark.parametrize("error,code,message", [
pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"), pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"),
pytest.param(BackendNotAvailable, 2, "Backend not available", id="backend_not_available"), pytest.param(BackendNotAvailable, 2, "Backend not available", id="backend_not_available"),
@@ -44,29 +48,32 @@ def test_success(plugin, read, write):
pytest.param(Banned, 105, "Banned", id="banned"), pytest.param(Banned, 105, "Banned", id="banned"),
pytest.param(AccessDenied, 106, "Access denied", id="access_denied"), pytest.param(AccessDenied, 106, "Access denied", id="access_denied"),
]) ])
def test_failure(plugin, read, write, error, code, message): async def test_failure(plugin, read, write, error, code, message):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "init_authentication" "method": "init_authentication"
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.authenticate.coro.side_effect = error() plugin.authenticate.side_effect = error()
asyncio.run(plugin.run()) await plugin.run()
plugin.authenticate.assert_called_with() plugin.authenticate.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"id": "3", "jsonrpc": "2.0",
"error": { "id": "3",
"code": code, "error": {
"message": message "code": code,
"message": message
}
} }
} ]
def test_stored_credentials(plugin, read, write):
@pytest.mark.asyncio
async def test_stored_credentials(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
@@ -77,39 +84,37 @@ def test_stored_credentials(plugin, read, write):
} }
} }
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek") plugin.authenticate.return_value = async_return_value(Authentication("132", "Zenek"))
asyncio.run(plugin.run()) await plugin.run()
plugin.authenticate.assert_called_with(stored_credentials={"token": "ABC"}) plugin.authenticate.assert_called_with(stored_credentials={"token": "ABC"})
write.assert_called() write.assert_called()
def test_store_credentials(plugin, write):
@pytest.mark.asyncio
async def test_store_credentials(plugin, write):
credentials = { credentials = {
"token": "ABC" "token": "ABC"
} }
plugin.store_credentials(credentials)
async def couritine(): assert get_messages(write) == [
plugin.store_credentials(credentials) {
"jsonrpc": "2.0",
"method": "store_credentials",
"params": credentials
}
]
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == { @pytest.mark.asyncio
"jsonrpc": "2.0", async def test_lost_authentication(plugin, write):
"method": "store_credentials", plugin.lost_authentication()
"params": credentials
}
def test_lost_authentication(plugin, write): assert get_messages(write) == [
{
async def couritine(): "jsonrpc": "2.0",
plugin.lost_authentication() "method": "authentication_lost",
"params": None
asyncio.run(couritine()) }
response = json.loads(write.call_args[0][0]) ]
assert response == {
"jsonrpc": "2.0",
"method": "authentication_lost",
"params": None
}

View File

@@ -1,7 +1,12 @@
import asyncio
import json import json
def test_chunked_messages(plugin, read): import pytest
from galaxy.unittest.mock import async_return_value
@pytest.mark.asyncio
async def test_chunked_messages(plugin, read):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "install_game", "method": "install_game",
@@ -11,11 +16,13 @@ def test_chunked_messages(plugin, read):
} }
message = json.dumps(request).encode() + b"\n" message = json.dumps(request).encode() + b"\n"
read.side_effect = [message[:5], message[5:], b""] read.side_effect = [async_return_value(message[:5]), async_return_value(message[5:]), async_return_value(b"")]
asyncio.run(plugin.run()) await plugin.run()
plugin.install_game.assert_called_with(game_id="3") plugin.install_game.assert_called_with(game_id="3")
def test_joined_messages(plugin, read):
@pytest.mark.asyncio
async def test_joined_messages(plugin, read):
requests = [ requests = [
{ {
"jsonrpc": "2.0", "jsonrpc": "2.0",
@@ -34,12 +41,14 @@ def test_joined_messages(plugin, read):
] ]
data = b"".join([json.dumps(request).encode() + b"\n" for request in requests]) data = b"".join([json.dumps(request).encode() + b"\n" for request in requests])
read.side_effect = [data, b""] read.side_effect = [async_return_value(data), async_return_value(b"")]
asyncio.run(plugin.run()) await plugin.run()
plugin.install_game.assert_called_with(game_id="3") plugin.install_game.assert_called_with(game_id="3")
plugin.launch_game.assert_called_with(game_id="3") plugin.launch_game.assert_called_with(game_id="3")
def test_not_finished(plugin, read):
@pytest.mark.asyncio
async def test_not_finished(plugin, read):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "install_game", "method": "install_game",
@@ -49,6 +58,6 @@ def test_not_finished(plugin, read):
} }
message = json.dumps(request).encode() # no new line message = json.dumps(request).encode() # no new line
read.side_effect = [message, b""] read.side_effect = [async_return_value(message), async_return_value(b"")]
asyncio.run(plugin.run()) await plugin.run()
plugin.install_game.assert_not_called() plugin.install_game.assert_not_called()

View File

@@ -1,21 +1,49 @@
from galaxy.api.consts import Feature, Platform
from galaxy.api.plugin import Plugin from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform, Feature
def test_base_class(): def test_base_class():
plugin = Plugin(Platform.Generic, "0.1", None, None, None) plugin = Plugin(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [] assert set(plugin.features) == {
Feature.ImportInstalledGames,
Feature.ImportOwnedGames,
Feature.LaunchGame,
Feature.InstallGame,
Feature.UninstallGame,
Feature.ImportAchievements,
Feature.ImportGameTime,
Feature.ImportFriends,
Feature.ShutdownPlatformClient
}
def test_no_overloads(): def test_no_overloads():
class PluginImpl(Plugin): #pylint: disable=abstract-method class PluginImpl(Plugin): # pylint: disable=abstract-method
pass pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None) plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [] assert plugin.features == []
def test_one_method_feature(): def test_one_method_feature():
class PluginImpl(Plugin): #pylint: disable=abstract-method class PluginImpl(Plugin): # pylint: disable=abstract-method
async def get_owned_games(self): async def get_owned_games(self):
pass pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None) plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [Feature.ImportOwnedGames] assert plugin.features == [Feature.ImportOwnedGames]
def test_multi_features():
class PluginImpl(Plugin): # pylint: disable=abstract-method
async def get_owned_games(self):
pass
async def get_unlocked_achievements(self, game_id, context):
pass
async def get_game_time(self, game_id, context):
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert set(plugin.features) == {Feature.ImportAchievements, Feature.ImportOwnedGames, Feature.ImportGameTime}

View File

@@ -1,90 +1,94 @@
import asyncio
import json
from galaxy.api.types import FriendInfo from galaxy.api.types import FriendInfo
from galaxy.api.errors import UnknownError from galaxy.api.errors import UnknownError
from galaxy.unittest.mock import async_return_value
import pytest
from tests import create_message, get_messages
def test_get_friends_success(plugin, read, write): @pytest.mark.asyncio
async def test_get_friends_success(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_friends" "method": "import_friends"
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.get_friends.coro.return_value = [ plugin.get_friends.return_value = async_return_value([
FriendInfo("3", "Jan"), FriendInfo("3", "Jan"),
FriendInfo("5", "Ola") FriendInfo("5", "Ola")
] ])
asyncio.run(plugin.run()) await plugin.run()
plugin.get_friends.assert_called_with() plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"id": "3", "jsonrpc": "2.0",
"result": { "id": "3",
"friend_info_list": [ "result": {
{"user_id": "3", "user_name": "Jan"}, "friend_info_list": [
{"user_id": "5", "user_name": "Ola"} {"user_id": "3", "user_name": "Jan"},
] {"user_id": "5", "user_name": "Ola"}
]
}
} }
} ]
def test_get_friends_failure(plugin, read, write): @pytest.mark.asyncio
async def test_get_friends_failure(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_friends" "method": "import_friends"
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.get_friends.coro.side_effect = UnknownError() plugin.get_friends.side_effect = UnknownError()
asyncio.run(plugin.run()) await plugin.run()
plugin.get_friends.assert_called_with() plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"id": "3", "jsonrpc": "2.0",
"error": { "id": "3",
"code": 0, "error": {
"message": "Unknown error", "code": 0,
"message": "Unknown error",
}
} }
} ]
def test_add_friend(plugin, write): @pytest.mark.asyncio
async def test_add_friend(plugin, write):
friend = FriendInfo("7", "Kuba") friend = FriendInfo("7", "Kuba")
async def couritine(): plugin.add_friend(friend)
plugin.add_friend(friend)
asyncio.run(couritine()) assert get_messages(write) == [
response = json.loads(write.call_args[0][0]) {
"jsonrpc": "2.0",
assert response == { "method": "friend_added",
"jsonrpc": "2.0", "params": {
"method": "friend_added", "friend_info": {"user_id": "7", "user_name": "Kuba"}
"params": { }
"friend_info": {"user_id": "7", "user_name": "Kuba"}
} }
} ]
def test_remove_friend(plugin, write): @pytest.mark.asyncio
async def couritine(): async def test_remove_friend(plugin, write):
plugin.remove_friend("5") plugin.remove_friend("5")
asyncio.run(couritine()) assert get_messages(write) == [
response = json.loads(write.call_args[0][0]) {
"jsonrpc": "2.0",
assert response == { "method": "friend_removed",
"jsonrpc": "2.0", "params": {
"method": "friend_removed", "user_id": "5"
"params": { }
"user_id": "5"
} }
} ]

View File

@@ -1,179 +1,214 @@
import asyncio
import json
from unittest.mock import call from unittest.mock import call
import pytest import pytest
from galaxy.api.types import GameTime from galaxy.api.types import GameTime
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError from galaxy.api.errors import BackendError
from galaxy.unittest.mock import async_return_value
def test_success(plugin, read, write): from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_game_time_success(plugin, read, write):
plugin.prepare_game_times_context.return_value = async_return_value("abc")
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_game_times" "method": "start_game_times_import",
"params": {
"game_ids": ["3", "5", "7"]
}
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
read.side_effect = [json.dumps(request).encode() + b"\n", b""] plugin.get_game_time.side_effect = [
plugin.get_game_times.coro.return_value = [ async_return_value(GameTime("3", 60, 1549550504)),
GameTime("3", 60, 1549550504), async_return_value(GameTime("5", 10, None)),
GameTime("5", 10, None), async_return_value(GameTime("7", None, 1549550502)),
GameTime("7", None, 1549550502),
] ]
asyncio.run(plugin.run()) await plugin.run()
plugin.get_game_times.assert_called_with() plugin.get_game_time.assert_has_calls([
response = json.loads(write.call_args[0][0]) call("3", "abc"),
call("5", "abc"),
call("7", "abc"),
])
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"id": "3", "jsonrpc": "2.0",
"result": { "id": "3",
"game_times": [ "result": None
{ },
{
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "3", "game_id": "3",
"time_played": 60, "last_played_time": 1549550504,
"last_played_time": 1549550504 "time_played": 60
},
{
"game_id": "5",
"time_played": 10,
},
{
"game_id": "7",
"last_played_time": 1549550502
} }
] }
},
{
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "5",
"time_played": 10
}
}
},
{
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "7",
"last_played_time": 1549550502
}
}
},
{
"jsonrpc": "2.0",
"method": "game_times_import_finished",
"params": None
} }
} ]
def test_failure(plugin, read, write):
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_game_time_error(exception, code, message, plugin, read, write):
plugin.prepare_game_times_context.return_value = async_return_value(None)
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_game_times" "method": "start_game_times_import",
"params": {
"game_ids": ["6"]
}
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_game_time.side_effect = exception
await plugin.run()
plugin.get_game_time.assert_called()
read.side_effect = [json.dumps(request).encode() + b"\n", b""] assert get_messages(write) == [
plugin.get_game_times.coro.side_effect = UnknownError() {
asyncio.run(plugin.run()) "jsonrpc": "2.0",
plugin.get_game_times.assert_called_with() "id": "3",
response = json.loads(write.call_args[0][0]) "result": None
},
{
"jsonrpc": "2.0",
"method": "game_time_import_failure",
"params": {
"game_id": "6",
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "game_times_import_finished",
"params": None
}
]
assert response == {
@pytest.mark.asyncio
async def test_prepare_get_game_time_context_error(plugin, read, write):
plugin.prepare_game_times_context.side_effect = BackendError()
request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"error": { "method": "start_game_times_import",
"code": 0, "params": {
"message": "Unknown error", "game_ids": ["6"]
} }
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
await plugin.run()
def test_update_game(plugin, write): assert get_messages(write) == [
game_time = GameTime("3", 60, 1549550504) {
"jsonrpc": "2.0",
async def couritine(): "id": "3",
plugin.update_game_time(game_time) "error": {
"code": 4,
asyncio.run(couritine()) "message": "Backend error"
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_updated",
"params": {
"game_time": {
"game_id": "3",
"time_played": 60,
"last_played_time": 1549550504
} }
} }
} ]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_game_time_import_success(plugin, write): async def test_import_in_progress(plugin, read, write):
plugin.game_time_import_success(GameTime("3", 60, 1549550504)) plugin.prepare_game_times_context.return_value = async_return_value(None)
response = json.loads(write.call_args[0][0]) requests = [
{
assert response == { "jsonrpc": "2.0",
"jsonrpc": "2.0", "id": "3",
"method": "game_time_import_success", "method": "start_game_times_import",
"params": { "params": {
"game_time": { "game_ids": ["6"]
"game_id": "3", }
"time_played": 60, },
"last_played_time": 1549550504 {
"jsonrpc": "2.0",
"id": "4",
"method": "start_game_times_import",
"params": {
"game_ids": ["7"]
} }
} }
} ]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"")
]
@pytest.mark.asyncio await plugin.run()
async def test_game_time_import_failure(plugin, write):
plugin.game_time_import_failure("134", ImportInProgress())
response = json.loads(write.call_args[0][0])
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"method": "game_time_import_failure", "jsonrpc": "2.0",
"params": { "id": "3",
"game_id": "134", "result": None
},
{
"jsonrpc": "2.0",
"id": "4",
"error": { "error": {
"code": 600, "code": 600,
"message": "Import already in progress" "message": "Import already in progress"
} }
} }
}
@pytest.mark.asyncio
async def test_game_times_import_finished(plugin, write):
plugin.game_times_import_finished()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_times_import_finished",
"params": None
}
@pytest.mark.asyncio
async def test_start_game_times_import(plugin, write, mocker):
game_time_import_success = mocker.patch.object(plugin, "game_time_import_success")
game_time_import_failure = mocker.patch.object(plugin, "game_time_import_failure")
game_times_import_finished = mocker.patch.object(plugin, "game_times_import_finished")
game_ids = ["1", "5"]
game_time = GameTime("1", 10, 1549550502)
plugin.get_game_times.coro.return_value = [
game_time
] ]
await plugin.start_game_times_import(game_ids)
with pytest.raises(ImportInProgress):
await plugin.start_game_times_import(["4", "8"])
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_game_times.coro.assert_called_once_with()
game_time_import_success.assert_called_once_with(game_time)
game_time_import_failure.assert_called_once_with("5", UnknownError())
game_times_import_finished.assert_called_once_with()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_start_game_times_import_failure(plugin, write, mocker): async def test_update_game(plugin, write):
game_time_import_failure = mocker.patch.object(plugin, "game_time_import_failure") game_time = GameTime("3", 60, 1549550504)
game_times_import_finished = mocker.patch.object(plugin, "game_times_import_finished") plugin.update_game_time(game_time)
game_ids = ["1", "5"] assert get_messages(write) == [
error = BackendError() {
plugin.get_game_times.coro.side_effect = error "jsonrpc": "2.0",
"method": "game_time_updated",
await plugin.start_game_times_import(game_ids) "params": {
"game_time": {
# wait until all tasks are finished "game_id": "3",
for _ in range(4): "time_played": 60,
await asyncio.sleep(0) "last_played_time": 1549550504
}
plugin.get_game_times.coro.assert_called_once_with() }
}
assert game_time_import_failure.mock_calls == [call("1", error), call("5", error)] ]
game_times_import_finished.assert_called_once_with()

View File

@@ -1,7 +1,12 @@
import asyncio import pytest
import json
def test_success(plugin, read): from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "install_game", "method": "install_game",
@@ -10,7 +15,6 @@ def test_success(plugin, read):
} }
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.get_owned_games.return_value = None await plugin.run()
asyncio.run(plugin.run())
plugin.install_game.assert_called_with(game_id="3") plugin.install_game.assert_called_with(game_id="3")

View File

@@ -1,10 +1,14 @@
import asyncio import pytest
import json
from galaxy.api.plugin import Plugin from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform from galaxy.api.consts import Platform
from galaxy.unittest.mock import async_return_value
def test_get_capabilites(reader, writer, read, write): from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_capabilities(reader, writer, read, write):
class PluginImpl(Plugin): #pylint: disable=abstract-method class PluginImpl(Plugin): #pylint: disable=abstract-method
async def get_owned_games(self): async def get_owned_games(self):
pass pass
@@ -16,64 +20,75 @@ def test_get_capabilites(reader, writer, read, write):
} }
token = "token" token = "token"
plugin = PluginImpl(Platform.Generic, "0.1", reader, writer, token) plugin = PluginImpl(Platform.Generic, "0.1", reader, writer, token)
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
asyncio.run(plugin.run()) await plugin.run()
response = json.loads(write.call_args[0][0]) assert get_messages(write) == [
assert response == { {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"result": { "result": {
"platform_name": "generic", "platform_name": "generic",
"features": [ "features": [
"ImportOwnedGames" "ImportOwnedGames"
], ],
"token": token "token": token
}
} }
} ]
def test_shutdown(plugin, read, write):
@pytest.mark.asyncio
async def test_shutdown(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "5", "id": "5",
"method": "shutdown" "method": "shutdown"
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request))]
asyncio.run(plugin.run()) await plugin.run()
plugin.shutdown.assert_called_with() plugin.shutdown.assert_called_with()
response = json.loads(write.call_args[0][0]) assert get_messages(write) == [
assert response == { {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "5", "id": "5",
"result": None "result": None
} }
]
def test_ping(plugin, read, write):
@pytest.mark.asyncio
async def test_ping(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "7", "id": "7",
"method": "ping" "method": "ping"
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
asyncio.run(plugin.run()) await plugin.run()
response = json.loads(write.call_args[0][0]) assert get_messages(write) == [
assert response == { {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "7", "id": "7",
"result": None "result": None
} }
]
def test_tick_before_handshake(plugin, read):
read.side_effect = [b""] @pytest.mark.asyncio
asyncio.run(plugin.run()) async def test_tick_before_handshake(plugin, read):
read.side_effect = [async_return_value(b"")]
await plugin.run()
plugin.tick.assert_not_called() plugin.tick.assert_not_called()
def test_tick_after_handshake(plugin, read):
@pytest.mark.asyncio
async def test_tick_after_handshake(plugin, read):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "6", "id": "6",
"method": "initialize_cache", "method": "initialize_cache",
"params": {"data": {}} "params": {"data": {}}
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
asyncio.run(plugin.run()) await plugin.run()
plugin.tick.assert_called_with() plugin.tick.assert_called_with()

View File

@@ -1,7 +1,12 @@
import asyncio import pytest
import json
def test_success(plugin, read): from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "launch_game", "method": "launch_game",
@@ -10,7 +15,6 @@ def test_success(plugin, read):
} }
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.get_owned_games.return_value = None await plugin.run()
asyncio.run(plugin.run())
plugin.launch_game.assert_called_with(game_id="3") plugin.launch_game.assert_called_with(game_id="3")

View File

@@ -1,51 +1,55 @@
import asyncio
import json
import pytest import pytest
from galaxy.api.types import LocalGame from galaxy.api.types import LocalGame
from galaxy.api.consts import LocalGameState from galaxy.api.consts import LocalGameState
from galaxy.api.errors import UnknownError, FailedParsingManifest from galaxy.api.errors import UnknownError, FailedParsingManifest
from galaxy.unittest.mock import async_return_value
def test_success(plugin, read, write): from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_success(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_local_games" "method": "import_local_games"
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
read.side_effect = [json.dumps(request).encode() + b"\n", b""] plugin.get_local_games.return_value = async_return_value([
plugin.get_local_games.coro.return_value = [
LocalGame("1", LocalGameState.Running), LocalGame("1", LocalGameState.Running),
LocalGame("2", LocalGameState.Installed), LocalGame("2", LocalGameState.Installed),
LocalGame("3", LocalGameState.Installed | LocalGameState.Running) LocalGame("3", LocalGameState.Installed | LocalGameState.Running)
] ])
asyncio.run(plugin.run()) await plugin.run()
plugin.get_local_games.assert_called_with() plugin.get_local_games.assert_called_with()
response = json.loads(write.call_args[0][0]) assert get_messages(write) == [
assert response == { {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"result": { "result": {
"local_games" : [ "local_games" : [
{ {
"game_id": "1", "game_id": "1",
"local_game_state": LocalGameState.Running.value "local_game_state": LocalGameState.Running.value
}, },
{ {
"game_id": "2", "game_id": "2",
"local_game_state": LocalGameState.Installed.value "local_game_state": LocalGameState.Installed.value
}, },
{ {
"game_id": "3", "game_id": "3",
"local_game_state": (LocalGameState.Installed | LocalGameState.Running).value "local_game_state": (LocalGameState.Installed | LocalGameState.Running).value
} }
] ]
}
} }
} ]
@pytest.mark.asyncio
@pytest.mark.parametrize( @pytest.mark.parametrize(
"error,code,message", "error,code,message",
[ [
@@ -53,44 +57,42 @@ def test_success(plugin, read, write):
pytest.param(FailedParsingManifest, 200, "Failed parsing manifest", id="failed_parsing") pytest.param(FailedParsingManifest, 200, "Failed parsing manifest", id="failed_parsing")
], ],
) )
def test_failure(plugin, read, write, error, code, message): async def test_failure(plugin, read, write, error, code, message):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_local_games" "method": "import_local_games"
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
read.side_effect = [json.dumps(request).encode() + b"\n", b""] plugin.get_local_games.side_effect = error()
plugin.get_local_games.coro.side_effect = error() await plugin.run()
asyncio.run(plugin.run())
plugin.get_local_games.assert_called_with() plugin.get_local_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"id": "3", "jsonrpc": "2.0",
"error": { "id": "3",
"code": code, "error": {
"message": message "code": code,
} "message": message
}
def test_local_game_state_update(plugin, write):
game = LocalGame("1", LocalGameState.Running)
async def couritine():
plugin.update_local_game_status(game)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "local_game_status_changed",
"params": {
"local_game": {
"game_id": "1",
"local_game_state": LocalGameState.Running.value
} }
} }
} ]
@pytest.mark.asyncio
async def test_local_game_state_update(plugin, write):
game = LocalGame("1", LocalGameState.Running)
plugin.update_local_game_status(game)
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "local_game_status_changed",
"params": {
"local_game": {
"game_id": "1",
"local_game_state": LocalGameState.Running.value
}
}
}
]

View File

@@ -1,19 +1,23 @@
import asyncio import pytest
import json
from galaxy.api.types import Game, Dlc, LicenseInfo from galaxy.api.types import Game, Dlc, LicenseInfo
from galaxy.api.consts import LicenseType from galaxy.api.consts import LicenseType
from galaxy.api.errors import UnknownError from galaxy.api.errors import UnknownError
from galaxy.unittest.mock import async_return_value
def test_success(plugin, read, write): from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_success(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_owned_games" "method": "import_owned_games"
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
read.side_effect = [json.dumps(request).encode() + b"\n", b""] plugin.get_owned_games.return_value = async_return_value([
plugin.get_owned_games.coro.return_value = [
Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None)), Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None)),
Game( Game(
"5", "5",
@@ -23,129 +27,126 @@ def test_success(plugin, read, write):
Dlc("8", "Temerian Armor Set", LicenseInfo(LicenseType.FreeToPlay, None)), Dlc("8", "Temerian Armor Set", LicenseInfo(LicenseType.FreeToPlay, None)),
], ],
LicenseInfo(LicenseType.SinglePurchase, None)) LicenseInfo(LicenseType.SinglePurchase, None))
] ])
asyncio.run(plugin.run()) await plugin.run()
plugin.get_owned_games.assert_called_with() plugin.get_owned_games.assert_called_with()
response = json.loads(write.call_args[0][0]) assert get_messages(write) == [
{
assert response == { "jsonrpc": "2.0",
"jsonrpc": "2.0", "id": "3",
"id": "3", "result": {
"result": { "owned_games": [
"owned_games": [ {
{ "game_id": "3",
"game_id": "3", "game_title": "Doom",
"game_title": "Doom", "license_info": {
"license_info": { "license_type": "SinglePurchase"
"license_type": "SinglePurchase" }
} },
}, {
{ "game_id": "5",
"game_id": "5", "game_title": "Witcher 3",
"game_title": "Witcher 3", "dlcs": [
"dlcs": [ {
{ "dlc_id": "7",
"dlc_id": "7", "dlc_title": "Hearts of Stone",
"dlc_title": "Hearts of Stone", "license_info": {
"license_info": { "license_type": "SinglePurchase"
"license_type": "SinglePurchase" }
} },
}, {
{ "dlc_id": "8",
"dlc_id": "8", "dlc_title": "Temerian Armor Set",
"dlc_title": "Temerian Armor Set", "license_info": {
"license_info": { "license_type": "FreeToPlay"
"license_type": "FreeToPlay" }
} }
],
"license_info": {
"license_type": "SinglePurchase"
} }
],
"license_info": {
"license_type": "SinglePurchase"
} }
} ]
] }
} }
} ]
def test_failure(plugin, read, write):
@pytest.mark.asyncio
async def test_failure(plugin, read, write):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": "3", "id": "3",
"method": "import_owned_games" "method": "import_owned_games"
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.get_owned_games.coro.side_effect = UnknownError() plugin.get_owned_games.side_effect = UnknownError()
asyncio.run(plugin.run()) await plugin.run()
plugin.get_owned_games.assert_called_with() plugin.get_owned_games.assert_called_with()
response = json.loads(write.call_args[0][0]) assert get_messages(write) == [
{
assert response == { "jsonrpc": "2.0",
"jsonrpc": "2.0", "id": "3",
"id": "3", "error": {
"error": { "code": 0,
"code": 0, "message": "Unknown error"
"message": "Unknown error" }
} }
} ]
def test_add_game(plugin, write):
@pytest.mark.asyncio
async def test_add_game(plugin, write):
game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None)) game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None))
plugin.add_game(game)
async def couritine(): assert get_messages(write) == [
plugin.add_game(game) {
"jsonrpc": "2.0",
asyncio.run(couritine()) "method": "owned_game_added",
response = json.loads(write.call_args[0][0]) "params": {
"owned_game": {
assert response == { "game_id": "3",
"jsonrpc": "2.0", "game_title": "Doom",
"method": "owned_game_added", "license_info": {
"params": { "license_type": "SinglePurchase"
"owned_game": { }
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
} }
} }
} }
} ]
def test_remove_game(plugin, write):
async def couritine():
plugin.remove_game("5")
asyncio.run(couritine()) @pytest.mark.asyncio
response = json.loads(write.call_args[0][0]) async def test_remove_game(plugin, write):
plugin.remove_game("5")
assert response == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"method": "owned_game_removed", "jsonrpc": "2.0",
"params": { "method": "owned_game_removed",
"game_id": "5" "params": {
"game_id": "5"
}
} }
} ]
def test_update_game(plugin, write):
@pytest.mark.asyncio
async def test_update_game(plugin, write):
game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None)) game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None))
plugin.update_game(game)
async def couritine(): assert get_messages(write) == [
plugin.update_game(game) {
"jsonrpc": "2.0",
asyncio.run(couritine()) "method": "owned_game_updated",
response = json.loads(write.call_args[0][0]) "params": {
"owned_game": {
assert response == { "game_id": "3",
"jsonrpc": "2.0", "game_title": "Doom",
"method": "owned_game_updated", "license_info": {
"params": { "license_type": "SinglePurchase"
"owned_game": { }
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
} }
} }
} }
} ]

View File

@@ -1,23 +1,28 @@
import asyncio
import json
import pytest import pytest
from galaxy.unittest.mock import async_return_value
from tests import create_message, get_messages
def assert_rpc_response(write, response_id, result=None): def assert_rpc_response(write, response_id, result=None):
assert json.loads(write.call_args[0][0]) == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"id": str(response_id), "jsonrpc": "2.0",
"result": result "id": str(response_id),
} "result": result
}
]
def assert_rpc_request(write, method, params=None): def assert_rpc_request(write, method, params=None):
assert json.loads(write.call_args[0][0]) == { assert get_messages(write) == [
"jsonrpc": "2.0", {
"method": method, "jsonrpc": "2.0",
"params": {"data": params} "method": method,
} "params": {"data": params}
}
]
@pytest.fixture @pytest.fixture
@@ -28,7 +33,8 @@ def cache_data():
} }
def test_initialize_cache(plugin, read, write, cache_data): @pytest.mark.asyncio
async def test_initialize_cache(plugin, read, write, cache_data):
request_id = 3 request_id = 3
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
@@ -36,36 +42,32 @@ def test_initialize_cache(plugin, read, write, cache_data):
"method": "initialize_cache", "method": "initialize_cache",
"params": {"data": cache_data} "params": {"data": cache_data}
} }
read.side_effect = [json.dumps(request).encode() + b"\n"] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
assert {} == plugin.persistent_cache assert {} == plugin.persistent_cache
asyncio.run(plugin.run()) await plugin.run()
plugin.handshake_complete.assert_called_once_with() plugin.handshake_complete.assert_called_once_with()
assert cache_data == plugin.persistent_cache assert cache_data == plugin.persistent_cache
assert_rpc_response(write, response_id=request_id) assert_rpc_response(write, response_id=request_id)
def test_set_cache(plugin, write, cache_data): @pytest.mark.asyncio
async def runner(): async def test_set_cache(plugin, write, cache_data):
assert {} == plugin.persistent_cache assert {} == plugin.persistent_cache
plugin.persistent_cache.update(cache_data) plugin.persistent_cache.update(cache_data)
plugin.push_cache() plugin.push_cache()
assert_rpc_request(write, "push_cache", cache_data) assert_rpc_request(write, "push_cache", cache_data)
assert cache_data == plugin.persistent_cache assert cache_data == plugin.persistent_cache
asyncio.run(runner())
def test_clear_cache(plugin, write, cache_data): @pytest.mark.asyncio
async def runner(): async def test_clear_cache(plugin, write, cache_data):
plugin._persistent_cache = cache_data plugin._persistent_cache = cache_data
plugin.persistent_cache.clear() plugin.persistent_cache.clear()
plugin.push_cache() plugin.push_cache()
assert_rpc_request(write, "push_cache", {}) assert_rpc_request(write, "push_cache", {})
assert {} == plugin.persistent_cache assert {} == plugin.persistent_cache
asyncio.run(runner())

View File

@@ -1,7 +1,9 @@
import json
import pytest import pytest
from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_success(plugin, read): async def test_success(plugin, read):
request = { request = {
@@ -9,7 +11,7 @@ async def test_success(plugin, read):
"method": "shutdown_platform_client" "method": "shutdown_platform_client"
} }
read.side_effect = [json.dumps(request).encode() + b"\n", b""] read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.shutdown_platform_client.return_value = None plugin.shutdown_platform_client.return_value = async_return_value(None)
await plugin.run() await plugin.run()
plugin.shutdown_platform_client.assert_called_with() plugin.shutdown_platform_client.assert_called_with()

View File

@@ -1,52 +1,46 @@
from unittest.mock import MagicMock
import pytest import pytest
from galaxy.reader import StreamLineReader from galaxy.reader import StreamLineReader
from galaxy.unittest.mock import AsyncMock from galaxy.unittest.mock import async_return_value
@pytest.fixture() @pytest.fixture()
def stream_reader(): def stream_line_reader(reader):
reader = MagicMock() return StreamLineReader(reader)
reader.read = AsyncMock()
return reader
@pytest.fixture()
def read(stream_reader):
return stream_reader.read
@pytest.fixture()
def reader(stream_reader):
return StreamLineReader(stream_reader)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_message(reader, read): async def test_message(stream_line_reader, read):
read.return_value = b"a\n" read.return_value = async_return_value(b"a\n")
assert await reader.readline() == b"a" assert await stream_line_reader.readline() == b"a"
read.assert_called_once() read.assert_called_once()
@pytest.mark.asyncio
async def test_separate_messages(reader, read):
read.side_effect = [b"a\n", b"b\n"]
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
assert read.call_count == 2
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_connected_messages(reader, read): async def test_separate_messages(stream_line_reader, read):
read.return_value = b"a\nb\n" read.side_effect = [async_return_value(b"a\n"), async_return_value(b"b\n")]
assert await reader.readline() == b"a" assert await stream_line_reader.readline() == b"a"
assert await reader.readline() == b"b" assert await stream_line_reader.readline() == b"b"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_connected_messages(stream_line_reader, read):
read.return_value = async_return_value(b"a\nb\n")
assert await stream_line_reader.readline() == b"a"
assert await stream_line_reader.readline() == b"b"
read.assert_called_once() read.assert_called_once()
@pytest.mark.asyncio
async def test_cut_message(reader, read):
read.side_effect = [b"a", b"b\n"]
assert await reader.readline() == b"ab"
assert read.call_count == 2
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_half_message(reader, read): async def test_cut_message(stream_line_reader, read):
read.side_effect = [b"a", b""] read.side_effect = [async_return_value(b"a"), async_return_value(b"b\n")]
assert await reader.readline() == b"" assert await stream_line_reader.readline() == b"ab"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_half_message(stream_line_reader, read):
read.side_effect = [async_return_value(b"a"), async_return_value(b"")]
assert await stream_line_reader.readline() == b""
assert read.call_count == 2 assert read.call_count == 2

View File

@@ -1,7 +1,11 @@
import asyncio import pytest
import json
def test_success(plugin, read): from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"method": "uninstall_game", "method": "uninstall_game",
@@ -9,8 +13,7 @@ def test_success(plugin, read):
"game_id": "3" "game_id": "3"
} }
} }
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None plugin.get_owned_games.return_value = None
asyncio.run(plugin.run()) await plugin.run()
plugin.uninstall_game.assert_called_with(game_id="3") plugin.uninstall_game.assert_called_with(game_id="3")