version 0.31.1

This commit is contained in:
GOG Galaxy SDK Team
2019-05-29 13:09:13 +02:00
parent 6f717a1e31
commit 60fab25a55
30 changed files with 2598 additions and 1 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# pytest
__pycache__/

View File

@@ -1 +1,63 @@
# galaxy-integrations-python-api
# GOG Galaxy - Community Integration - Python API
This document is still work in progress.
## Basic Usage
Basic implementation:
```python
import sys
from galaxy.api.plugin import Plugin, create_and_run_plugin
from galaxy.api.consts import Platform
class PluginExample(Plugin):
def __init__(self, reader, writer, token):
super().__init__(
Platform.Generic, # Choose platform from available list
"0.1", # Version
reader,
writer,
token
)
# implement methods
async def authenticate(self, stored_credentials=None):
pass
def main():
create_and_run_plugin(PluginExample, sys.argv)
# run plugin event loop
if __name__ == "__main__":
main()
```
Plugin should be deployed with manifest:
```json
{
"name": "Example plugin",
"platform": "generic",
"guid": "UNIQUE-GUID",
"version": "0.1",
"description": "Example plugin",
"author": "Name",
"email": "author@email.com",
"url": "https://github.com/user/galaxy-plugin-example",
"script": "plugin.py"
}
```
## Development
Install required packages:
```bash
pip install -r requirements.txt
```
Run tests:
```bash
pytest
```
## Methods Documentation
TODO

2
pytest.ini Normal file
View File

@@ -0,0 +1,2 @@
[pytest]
addopts = --flakes

8
requirements.txt Normal file
View File

@@ -0,0 +1,8 @@
-e .
pytest==4.2.0
pytest-asyncio==0.10.0
pytest-mock==1.10.3
pytest-flakes==4.0.0
# because of pip bug https://github.com/pypa/pip/issues/4780
aiohttp==3.5.4
certifi==2019.3.9

1
src/galaxy/__init__.py Normal file
View File

@@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)

View File

44
src/galaxy/api/consts.py Normal file
View File

@@ -0,0 +1,44 @@
from enum import Enum, Flag
class Platform(Enum):
Unknown = "unknown"
Gog = "gog"
Steam = "steam"
Psn = "psn"
XBoxOne = "xboxone"
Generic = "generic"
Origin = "origin"
Uplay = "uplay"
Battlenet = "battlenet"
Epic = "epic"
class Feature(Enum):
Unknown = "Unknown"
ImportInstalledGames = "ImportInstalledGames"
ImportOwnedGames = "ImportOwnedGames"
LaunchGame = "LaunchGame"
InstallGame = "InstallGame"
UninstallGame = "UninstallGame"
ImportAchievements = "ImportAchievements"
ImportGameTime = "ImportGameTime"
Chat = "Chat"
ImportUsers = "ImportUsers"
VerifyGame = "VerifyGame"
ImportFriends = "ImportFriends"
class LicenseType(Enum):
Unknown = "Unknown"
SinglePurchase = "SinglePurchase"
FreeToPlay = "FreeToPlay"
OtherUserLicense = "OtherUserLicense"
class LocalGameState(Flag):
None_ = 0
Installed = 1
Running = 2
class PresenceState(Enum):
Unknown = "Unknown"
Online = "online"
Offline = "offline"
Away = "away"

83
src/galaxy/api/errors.py Normal file
View File

@@ -0,0 +1,83 @@
from galaxy.api.jsonrpc import ApplicationError, UnknownError
UnknownError = UnknownError
class AuthenticationRequired(ApplicationError):
def __init__(self, data=None):
super().__init__(1, "Authentication required", data)
class BackendNotAvailable(ApplicationError):
def __init__(self, data=None):
super().__init__(2, "Backend not available", data)
class BackendTimeout(ApplicationError):
def __init__(self, data=None):
super().__init__(3, "Backend timed out", data)
class BackendError(ApplicationError):
def __init__(self, data=None):
super().__init__(4, "Backend error", data)
class UnknownBackendResponse(ApplicationError):
def __init__(self, data=None):
super().__init__(4, "Backend responded in uknown way", data)
class InvalidCredentials(ApplicationError):
def __init__(self, data=None):
super().__init__(100, "Invalid credentials", data)
class NetworkError(ApplicationError):
def __init__(self, data=None):
super().__init__(101, "Network error", data)
class LoggedInElsewhere(ApplicationError):
def __init__(self, data=None):
super().__init__(102, "Logged in elsewhere", data)
class ProtocolError(ApplicationError):
def __init__(self, data=None):
super().__init__(103, "Protocol error", data)
class TemporaryBlocked(ApplicationError):
def __init__(self, data=None):
super().__init__(104, "Temporary blocked", data)
class Banned(ApplicationError):
def __init__(self, data=None):
super().__init__(105, "Banned", data)
class AccessDenied(ApplicationError):
def __init__(self, data=None):
super().__init__(106, "Access denied", data)
class ParentalControlBlock(ApplicationError):
def __init__(self, data=None):
super().__init__(107, "Parental control block", data)
class DeviceBlocked(ApplicationError):
def __init__(self, data=None):
super().__init__(108, "Device blocked", data)
class RegionBlocked(ApplicationError):
def __init__(self, data=None):
super().__init__(109, "Region blocked", data)
class FailedParsingManifest(ApplicationError):
def __init__(self, data=None):
super().__init__(200, "Failed parsing manifest", data)
class TooManyMessagesSent(ApplicationError):
def __init__(self, data=None):
super().__init__(300, "Too many messages sent", data)
class IncoherentLastMessage(ApplicationError):
def __init__(self, data=None):
super().__init__(400, "Different last message id on backend", data)
class MessageNotFound(ApplicationError):
def __init__(self, data=None):
super().__init__(500, "Message not found", data)
class ImportInProgress(ApplicationError):
def __init__(self, data=None):
super().__init__(600, "Import already in progress", data)

285
src/galaxy/api/jsonrpc.py Normal file
View File

@@ -0,0 +1,285 @@
import asyncio
from collections import namedtuple
from collections.abc import Iterable
import logging
import inspect
import json
class JsonRpcError(Exception):
def __init__(self, code, message, data=None):
self.code = code
self.message = message
self.data = data
super().__init__()
def __eq__(self, other):
return self.code == other.code and self.message == other.message and self.data == other.data
class ParseError(JsonRpcError):
def __init__(self):
super().__init__(-32700, "Parse error")
class InvalidRequest(JsonRpcError):
def __init__(self):
super().__init__(-32600, "Invalid Request")
class MethodNotFound(JsonRpcError):
def __init__(self):
super().__init__(-32601, "Method not found")
class InvalidParams(JsonRpcError):
def __init__(self):
super().__init__(-32602, "Invalid params")
class Timeout(JsonRpcError):
def __init__(self):
super().__init__(-32000, "Method timed out")
class Aborted(JsonRpcError):
def __init__(self):
super().__init__(-32001, "Method aborted")
class ApplicationError(JsonRpcError):
def __init__(self, code, message, data):
if code >= -32768 and code <= -32000:
raise ValueError("The error code in reserved range")
super().__init__(code, message, data)
class UnknownError(ApplicationError):
def __init__(self, data=None):
super().__init__(0, "Unknown error", data)
Request = namedtuple("Request", ["method", "params", "id"], defaults=[{}, None])
Method = namedtuple("Method", ["callback", "signature", "internal", "sensitive_params"])
def anonymise_sensitive_params(params, sensitive_params):
anomized_data = "****"
if not sensitive_params:
return params
if isinstance(sensitive_params, Iterable):
anomized_params = params.copy()
for key in anomized_params.keys():
if key in sensitive_params:
anomized_params[key] = anomized_data
return anomized_params
return anomized_data
class Server():
def __init__(self, reader, writer, encoder=json.JSONEncoder()):
self._active = True
self._reader = reader
self._writer = writer
self._encoder = encoder
self._methods = {}
self._notifications = {}
self._eof_listeners = []
def register_method(self, name, callback, internal, sensitive_params=False):
"""
Register method
:param name:
:param callback:
:param internal: if True the callback will be processed immediately (synchronously)
:param sensitive_params: list of parameters that will by anonymized before logging; if False - no params
are considered sensitive, if True - all params are considered sensitive
"""
self._methods[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
def register_notification(self, name, callback, internal, sensitive_params=False):
"""
Register notification
:param name:
:param callback:
:param internal: if True the callback will be processed immediately (synchronously)
:param sensitive_params: list of parameters that will by anonymized before logging; if False - no params
are considered sensitive, if True - all params are considered sensitive
"""
self._notifications[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
def register_eof(self, callback):
self._eof_listeners.append(callback)
async def run(self):
while self._active:
try:
data = await self._reader.readline()
if not data:
self._eof()
continue
except:
self._eof()
continue
data = data.strip()
logging.debug("Received %d bytes of data", len(data))
self._handle_input(data)
def stop(self):
self._active = False
def _eof(self):
logging.info("Received EOF")
self.stop()
for listener in self._eof_listeners:
listener()
def _handle_input(self, data):
try:
request = self._parse_request(data)
except JsonRpcError as error:
self._send_error(None, error)
return
if request.id is not None:
self._handle_request(request)
else:
self._handle_notification(request)
def _handle_notification(self, request):
method = self._notifications.get(request.method)
if not method:
logging.error("Received unknown notification: %s", request.method)
return
callback, signature, internal, sensitive_params = method
self._log_request(request, sensitive_params)
try:
bound_args = signature.bind(**request.params)
except TypeError:
self._send_error(request.id, InvalidParams())
if internal:
# internal requests are handled immediately
callback(*bound_args.args, **bound_args.kwargs)
else:
try:
asyncio.create_task(callback(*bound_args.args, **bound_args.kwargs))
except Exception:
logging.exception("Unexpected exception raised in notification handler")
def _handle_request(self, request):
method = self._methods.get(request.method)
if not method:
logging.error("Received unknown request: %s", request.method)
self._send_error(request.id, MethodNotFound())
return
callback, signature, internal, sensitive_params = method
self._log_request(request, sensitive_params)
try:
bound_args = signature.bind(**request.params)
except TypeError:
self._send_error(request.id, InvalidParams())
if internal:
# internal requests are handled immediately
response = callback(*bound_args.args, **bound_args.kwargs)
self._send_response(request.id, response)
else:
async def handle():
try:
result = await callback(*bound_args.args, **bound_args.kwargs)
self._send_response(request.id, result)
except NotImplementedError:
self._send_error(request.id, MethodNotFound())
except JsonRpcError as error:
self._send_error(request.id, error)
except Exception as e: #pylint: disable=broad-except
logging.exception("Unexpected exception raised in plugin handler")
self._send_error(request.id, UnknownError(str(e)))
asyncio.create_task(handle())
@staticmethod
def _parse_request(data):
try:
jsonrpc_request = json.loads(data, encoding="utf-8")
if jsonrpc_request.get("jsonrpc") != "2.0":
raise InvalidRequest()
del jsonrpc_request["jsonrpc"]
return Request(**jsonrpc_request)
except json.JSONDecodeError:
raise ParseError()
except TypeError:
raise InvalidRequest()
def _send(self, data):
try:
line = self._encoder.encode(data)
logging.debug("Sending data: %s", line)
data = (line + "\n").encode("utf-8")
self._writer.write(data)
asyncio.create_task(self._writer.drain())
except TypeError as error:
logging.error(str(error))
def _send_response(self, request_id, result):
response = {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
self._send(response)
def _send_error(self, request_id, error):
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": error.code,
"message": error.message
}
}
if error.data is not None:
response["error"]["data"] = error.data
self._send(response)
@staticmethod
def _log_request(request, sensitive_params):
params = anonymise_sensitive_params(request.params, sensitive_params)
if request.id is not None:
logging.info("Handling request: id=%s, method=%s, params=%s", request.id, request.method, params)
else:
logging.info("Handling notification: method=%s, params=%s", request.method, params)
class NotificationClient():
def __init__(self, writer, encoder=json.JSONEncoder()):
self._writer = writer
self._encoder = encoder
self._methods = {}
def notify(self, method, params, sensitive_params=False):
"""
Send notification
:param method:
:param params:
:param sensitive_params: list of parameters that will by anonymized before logging; if False - no params
are considered sensitive, if True - all params are considered sensitive
"""
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params
}
self._log(method, params, sensitive_params)
self._send(notification)
def _send(self, data):
try:
line = self._encoder.encode(data)
data = (line + "\n").encode("utf-8")
logging.debug("Sending %d byte of data", len(data))
self._writer.write(data)
asyncio.create_task(self._writer.drain())
except TypeError as error:
logging.error("Failed to parse outgoing message: %s", str(error))
@staticmethod
def _log(method, params, sensitive_params):
params = anonymise_sensitive_params(params, sensitive_params)
logging.info("Sending notification: method=%s, params=%s", method, params)

462
src/galaxy/api/plugin.py Normal file
View File

@@ -0,0 +1,462 @@
import asyncio
import json
import logging
import logging.handlers
import dataclasses
from enum import Enum
from collections import OrderedDict
import sys
from galaxy.api.jsonrpc import Server, NotificationClient
from galaxy.api.consts import Feature
from galaxy.api.errors import UnknownError, ImportInProgress
class JSONEncoder(json.JSONEncoder):
def default(self, o): # pylint: disable=method-hidden
if dataclasses.is_dataclass(o):
# filter None values
def dict_factory(elements):
return {k: v for k, v in elements if v is not None}
return dataclasses.asdict(o, dict_factory=dict_factory)
if isinstance(o, Enum):
return o.value
return super().default(o)
class Plugin():
def __init__(self, platform, version, reader, writer, handshake_token):
logging.info("Creating plugin for platform %s, version %s", platform.value, version)
self._platform = platform
self._version = version
self._feature_methods = OrderedDict()
self._active = True
self._reader, self._writer = reader, writer
self._handshake_token = handshake_token
encoder = JSONEncoder()
self._server = Server(self._reader, self._writer, encoder)
self._notification_client = NotificationClient(self._writer, encoder)
def eof_handler():
self._shutdown()
self._server.register_eof(eof_handler)
self._achievements_import_in_progress = False
self._game_times_import_in_progress = False
# internal
self._register_method("shutdown", self._shutdown, internal=True)
self._register_method("get_capabilities", self._get_capabilities, internal=True)
self._register_method("ping", self._ping, internal=True)
# implemented by developer
self._register_method(
"init_authentication",
self.authenticate,
sensitive_params=["stored_credentials"]
)
self._register_method(
"pass_login_credentials",
self.pass_login_credentials,
sensitive_params=["cookies", "credentials"]
)
self._register_method(
"import_owned_games",
self.get_owned_games,
result_name="owned_games",
feature=Feature.ImportOwnedGames
)
self._register_method(
"import_unlocked_achievements",
self.get_unlocked_achievements,
result_name="unlocked_achievements",
feature=Feature.ImportAchievements
)
self._register_method(
"start_achievements_import",
self.start_achievements_import,
)
self._register_method(
"import_local_games",
self.get_local_games,
result_name="local_games",
feature=Feature.ImportInstalledGames
)
self._register_notification("launch_game", self.launch_game, feature=Feature.LaunchGame)
self._register_notification("install_game", self.install_game, feature=Feature.InstallGame)
self._register_notification(
"uninstall_game",
self.uninstall_game,
feature=Feature.UninstallGame
)
self._register_method(
"import_friends",
self.get_friends,
result_name="friend_info_list",
feature=Feature.ImportFriends
)
self._register_method(
"import_user_infos",
self.get_users,
result_name="user_info_list",
feature=Feature.ImportUsers
)
self._register_method(
"send_message",
self.send_message,
feature=Feature.Chat
)
self._register_method(
"mark_as_read",
self.mark_as_read,
feature=Feature.Chat
)
self._register_method(
"import_rooms",
self.get_rooms,
result_name="rooms",
feature=Feature.Chat
)
self._register_method(
"import_room_history_from_message",
self.get_room_history_from_message,
result_name="messages",
feature=Feature.Chat
)
self._register_method(
"import_room_history_from_timestamp",
self.get_room_history_from_timestamp,
result_name="messages",
feature=Feature.Chat
)
self._register_method(
"import_game_times",
self.get_game_times,
result_name="game_times",
feature=Feature.ImportGameTime
)
self._register_method(
"start_game_times_import",
self.start_game_times_import,
)
@property
def features(self):
features = []
if self.__class__ != Plugin:
for feature, handlers in self._feature_methods.items():
if self._implements(handlers):
features.append(feature)
return features
def _implements(self, handlers):
for handler in handlers:
if handler.__name__ not in self.__class__.__dict__:
return False
return True
def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False, feature=None):
if internal:
def method(*args, **kwargs):
result = handler(*args, **kwargs)
if result_name:
result = {
result_name: result
}
return result
self._server.register_method(name, method, True, sensitive_params)
else:
async def method(*args, **kwargs):
result = await handler(*args, **kwargs)
if result_name:
result = {
result_name: result
}
return result
self._server.register_method(name, method, False, sensitive_params)
if feature is not None:
self._feature_methods.setdefault(feature, []).append(handler)
def _register_notification(self, name, handler, internal=False, sensitive_params=False, feature=None):
self._server.register_notification(name, handler, internal, sensitive_params)
if feature is not None:
self._feature_methods.setdefault(feature, []).append(handler)
async def run(self):
"""Plugin main coorutine"""
async def pass_control():
while self._active:
try:
self.tick()
except Exception:
logging.exception("Unexpected exception raised in plugin tick")
await asyncio.sleep(1)
await asyncio.gather(pass_control(), self._server.run())
def _shutdown(self):
logging.info("Shuting down")
self._server.stop()
self._active = False
self.shutdown()
def _get_capabilities(self):
return {
"platform_name": self._platform,
"features": self.features,
"token": self._handshake_token
}
@staticmethod
def _ping():
pass
# notifications
def store_credentials(self, credentials):
"""Notify client to store plugin credentials.
They will be pass to next authencicate calls.
"""
self._notification_client.notify("store_credentials", credentials, sensitive_params=True)
def add_game(self, game):
params = {"owned_game" : game}
self._notification_client.notify("owned_game_added", params)
def remove_game(self, game_id):
params = {"game_id" : game_id}
self._notification_client.notify("owned_game_removed", params)
def update_game(self, game):
params = {"owned_game" : game}
self._notification_client.notify("owned_game_updated", params)
def unlock_achievement(self, game_id, achievement):
params = {
"game_id": game_id,
"achievement": achievement
}
self._notification_client.notify("achievement_unlocked", params)
def game_achievements_import_success(self, game_id, achievements):
params = {
"game_id": game_id,
"unlocked_achievements": achievements
}
self._notification_client.notify("game_achievements_import_success", params)
def game_achievements_import_failure(self, game_id, error):
params = {
"game_id": game_id,
"error": {
"code": error.code,
"message": error.message
}
}
self._notification_client.notify("game_achievements_import_failure", params)
def achievements_import_finished(self):
self._notification_client.notify("achievements_import_finished", None)
def update_local_game_status(self, local_game):
params = {"local_game" : local_game}
self._notification_client.notify("local_game_status_changed", params)
def add_friend(self, user):
params = {"friend_info" : user}
self._notification_client.notify("friend_added", params)
def remove_friend(self, user_id):
params = {"user_id" : user_id}
self._notification_client.notify("friend_removed", params)
def update_room(self, room_id, unread_message_count=None, new_messages=None):
params = {"room_id": room_id}
if unread_message_count is not None:
params["unread_message_count"] = unread_message_count
if new_messages is not None:
params["messages"] = new_messages
self._notification_client.notify("chat_room_updated", params)
def update_game_time(self, game_time):
params = {"game_time" : game_time}
self._notification_client.notify("game_time_updated", params)
def game_time_import_success(self, game_time):
params = {"game_time" : game_time}
self._notification_client.notify("game_time_import_success", params)
def game_time_import_failure(self, game_id, error):
params = {
"game_id": game_id,
"error": {
"code": error.code,
"message": error.message
}
}
self._notification_client.notify("game_time_import_failure", params)
def game_times_import_finished(self):
self._notification_client.notify("game_times_import_finished", None)
def lost_authentication(self):
self._notification_client.notify("authentication_lost", None)
# handlers
def tick(self):
"""This method is called periodicaly.
Override it to implement periodical tasks like refreshing cache.
This method should not be blocking - any longer actions should be
handled by asycio tasks.
"""
def shutdown(self):
"""This method is called on plugin shutdown.
Override it to implement tear down.
"""
# methods
async def authenticate(self, stored_credentials=None):
"""Overide this method to handle plugin authentication.
The method should return galaxy.api.types.Authentication
or raise galaxy.api.types.LoginError on authentication failure.
"""
raise NotImplementedError()
async def pass_login_credentials(self, step, credentials, cookies):
raise NotImplementedError()
async def get_owned_games(self):
raise NotImplementedError()
async def get_unlocked_achievements(self, game_id):
raise NotImplementedError()
async def start_achievements_import(self, game_ids):
if self._achievements_import_in_progress:
raise ImportInProgress()
async def import_games_achievements_task(game_ids):
try:
await self.import_games_achievements(game_ids)
finally:
self.achievements_import_finished()
self._achievements_import_in_progress = False
asyncio.create_task(import_games_achievements_task(game_ids))
self._achievements_import_in_progress = True
async def import_games_achievements(self, game_ids):
"""Call game_achievements_import_success/game_achievements_import_failure for each game_id on the list"""
async def import_game_achievements(game_id):
try:
achievements = await self.get_unlocked_achievements(game_id)
self.game_achievements_import_success(game_id, achievements)
except Exception as error:
self.game_achievements_import_failure(game_id, error)
imports = [import_game_achievements(game_id) for game_id in game_ids]
await asyncio.gather(*imports)
async def get_local_games(self):
raise NotImplementedError()
async def launch_game(self, game_id):
raise NotImplementedError()
async def install_game(self, game_id):
raise NotImplementedError()
async def uninstall_game(self, game_id):
raise NotImplementedError()
async def get_friends(self):
raise NotImplementedError()
async def get_users(self, user_id_list):
raise NotImplementedError()
async def send_message(self, room_id, message_text):
raise NotImplementedError()
async def mark_as_read(self, room_id, last_message_id):
raise NotImplementedError()
async def get_rooms(self):
raise NotImplementedError()
async def get_room_history_from_message(self, room_id, message_id):
raise NotImplementedError()
async def get_room_history_from_timestamp(self, room_id, from_timestamp):
raise NotImplementedError()
async def get_game_times(self):
raise NotImplementedError()
async def start_game_times_import(self, game_ids):
if self._game_times_import_in_progress:
raise ImportInProgress()
async def import_game_times_task(game_ids):
try:
await self.import_game_times(game_ids)
finally:
self.game_times_import_finished()
self._game_times_import_in_progress = False
asyncio.create_task(import_game_times_task(game_ids))
self._game_times_import_in_progress = True
async def import_game_times(self, game_ids):
"""Call game_time_import_success/game_time_import_failure for each game_id on the list"""
try:
game_times = await self.get_game_times()
game_ids_set = set(game_ids)
for game_time in game_times:
if game_time.game_id not in game_ids_set:
continue
self.game_time_import_success(game_time)
game_ids_set.discard(game_time.game_id)
for game_id in game_ids_set:
self.game_time_import_failure(game_id, UnknownError())
except Exception as error:
for game_id in game_ids:
self.game_time_import_failure(game_id, error)
def create_and_run_plugin(plugin_class, argv):
if len(argv) < 3:
logging.critical("Not enough parameters, required: token, port")
sys.exit(1)
token = argv[1]
try:
port = int(argv[2])
except ValueError:
logging.critical("Failed to parse port value: %s", argv[2])
sys.exit(2)
if not (1 <= port <= 65535):
logging.critical("Port value out of range (1, 65535)")
sys.exit(3)
if not issubclass(plugin_class, Plugin):
logging.critical("plugin_class must be subclass of Plugin")
sys.exit(4)
async def coroutine():
reader, writer = await asyncio.open_connection("127.0.0.1", port)
extra_info = writer.get_extra_info('sockname')
logging.info("Using local address: %s:%u", *extra_info)
plugin = plugin_class(reader, writer, token)
await plugin.run()
try:
asyncio.run(coroutine())
except Exception:
logging.exception("Error while running plugin")
sys.exit(5)

94
src/galaxy/api/types.py Normal file
View File

@@ -0,0 +1,94 @@
from dataclasses import dataclass
from typing import List, Dict, Optional
from galaxy.api.consts import LicenseType, LocalGameState, PresenceState
@dataclass
class Authentication():
user_id: str
user_name: str
@dataclass
class Cookie():
name: str
value: str
domain: Optional[str] = None
path: Optional[str] = None
@dataclass
class NextStep():
next_step: str
auth_params: Dict[str, str]
cookies: Optional[List[Cookie]] = None
js: Optional[Dict[str, List[str]]] = None
@dataclass
class LicenseInfo():
license_type: LicenseType
owner: Optional[str] = None
@dataclass
class Dlc():
dlc_id: str
dlc_title: str
license_info: LicenseInfo
@dataclass
class Game():
game_id: str
game_title: str
dlcs: Optional[List[Dlc]]
license_info: LicenseInfo
@dataclass
class Achievement():
unlock_time: int
achievement_id: Optional[str] = None
achievement_name: Optional[str] = None
def __post_init__(self):
assert self.achievement_id or self.achievement_name, \
"One of achievement_id or achievement_name is required"
@dataclass
class LocalGame():
game_id: str
local_game_state: LocalGameState
@dataclass
class Presence():
presence_state: PresenceState
game_id: Optional[str] = None
presence_status: Optional[str] = None
@dataclass
class UserInfo():
user_id: str
is_friend: bool
user_name: str
avatar_url: str
presence: Presence
@dataclass
class FriendInfo():
user_id: str
user_name: str
@dataclass
class Room():
room_id: str
unread_message_count: int
last_message_id: str
@dataclass
class Message():
message_id: str
sender_id: str
sent_time: int
message_text: str
@dataclass
class GameTime():
game_id: str
time_played: int
last_played_time: int

47
src/galaxy/http.py Normal file
View File

@@ -0,0 +1,47 @@
import asyncio
import ssl
from http import HTTPStatus
import aiohttp
import certifi
from galaxy.api.errors import (
AccessDenied, AuthenticationRequired,
BackendTimeout, BackendNotAvailable, BackendError, NetworkError, UnknownBackendResponse, UnknownError
)
class HttpClient:
def __init__(self, limit=20, timeout=aiohttp.ClientTimeout(total=60), cookie_jar=None):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(certifi.where())
connector = aiohttp.TCPConnector(limit=limit, ssl=ssl_context)
self._session = aiohttp.ClientSession(connector=connector, timeout=timeout, cookie_jar=cookie_jar)
async def close(self):
await self._session.close()
async def request(self, method, *args, **kwargs):
try:
response = await self._session.request(method, *args, **kwargs)
except asyncio.TimeoutError:
raise BackendTimeout()
except aiohttp.ServerDisconnectedError:
raise BackendNotAvailable()
except aiohttp.ClientConnectionError:
raise NetworkError()
except aiohttp.ContentTypeError:
raise UnknownBackendResponse()
except aiohttp.ClientError:
raise UnknownError()
if response.status == HTTPStatus.UNAUTHORIZED:
raise AuthenticationRequired()
if response.status == HTTPStatus.FORBIDDEN:
raise AccessDenied()
if response.status == HTTPStatus.SERVICE_UNAVAILABLE:
raise BackendNotAvailable()
if response.status >= 500:
raise BackendError()
if response.status >= 400:
raise UnknownError()
return response

20
src/galaxy/tools.py Normal file
View File

@@ -0,0 +1,20 @@
import io
import os
import zipfile
from glob import glob
def zip_folder(folder):
files = glob(os.path.join(folder, "**"), recursive=True)
files = [file.replace(folder + os.sep, "") for file in files]
files = [file for file in files if file]
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as zipf:
for file in files:
zipf.write(os.path.join(folder, file), arcname=file)
return zip_buffer
def zip_folder_to_file(folder, filename):
zip_content = zip_folder(folder).getbuffer()
with open(filename, "wb") as archive:
archive.write(zip_content)

View File

View File

@@ -0,0 +1,12 @@
from asyncio import coroutine
from unittest.mock import MagicMock
class AsyncMock(MagicMock):
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)
def coroutine_mock():
coro = MagicMock(name="CoroutineResult")
corofunc = MagicMock(name="CoroutineFunction", side_effect=coroutine(coro))
corofunc.coro = coro
return corofunc

0
tests/__init__.py Normal file
View File

67
tests/conftest.py Normal file
View File

@@ -0,0 +1,67 @@
from contextlib import ExitStack
import logging
from unittest.mock import patch, MagicMock
import pytest
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
from galaxy.unittest.mock import AsyncMock, coroutine_mock
@pytest.fixture()
def reader():
stream = MagicMock(name="stream_reader")
stream.readline = AsyncMock()
yield stream
@pytest.fixture()
def writer():
stream = MagicMock(name="stream_writer")
stream.write = MagicMock()
stream.drain = AsyncMock()
yield stream
@pytest.fixture()
def readline(reader):
yield reader.readline
@pytest.fixture()
def write(writer):
yield writer.write
@pytest.fixture()
def plugin(reader, writer):
"""Return plugin instance with all feature methods mocked"""
async_methods = (
"authenticate",
"get_owned_games",
"get_unlocked_achievements",
"get_local_games",
"launch_game",
"install_game",
"uninstall_game",
"get_friends",
"get_users",
"send_message",
"mark_as_read",
"get_rooms",
"get_room_history_from_message",
"get_room_history_from_timestamp",
"get_game_times"
)
methods = (
"shutdown",
"tick"
)
with ExitStack() as stack:
for method in async_methods:
stack.enter_context(patch.object(Plugin, method, new_callable=coroutine_mock))
for method in methods:
stack.enter_context(patch.object(Plugin, method))
yield Plugin(Platform.Generic, "0.1", reader, writer, "token")
@pytest.fixture(autouse=True)
def my_caplog(caplog):
caplog.set_level(logging.DEBUG)

193
tests/test_achievements.py Normal file
View File

@@ -0,0 +1,193 @@
import asyncio
import json
from unittest.mock import call
import pytest
from pytest import raises
from galaxy.api.types import Achievement
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError
def test_initialization_no_unlock_time():
with raises(Exception):
Achievement(achievement_id="lvl30", achievement_name="Got level 30")
def test_initialization_no_id_nor_name():
with raises(AssertionError):
Achievement(unlock_time=1234567890)
def test_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_unlocked_achievements",
"params": {
"game_id": "14"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_unlocked_achievements.coro.return_value = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395),
Achievement(achievement_id="lvl30", achievement_name="Got level 30", unlock_time=1548495633)
]
asyncio.run(plugin.run())
plugin.get_unlocked_achievements.assert_called_with(game_id="14")
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"unlocked_achievements": [
{
"achievement_id": "lvl10",
"unlock_time": 1548421241
},
{
"achievement_name": "Got level 20",
"unlock_time": 1548422395
},
{
"achievement_id": "lvl30",
"achievement_name": "Got level 30",
"unlock_time": 1548495633
}
]
}
}
def test_failure(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_unlocked_achievements",
"params": {
"game_id": "14"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_unlocked_achievements.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_unlocked_achievements.assert_called()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error"
}
}
def test_unlock_achievement(plugin, write):
achievement = Achievement(achievement_id="lvl20", unlock_time=1548422395)
async def couritine():
plugin.unlock_achievement("14", achievement)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "achievement_unlocked",
"params": {
"game_id": "14",
"achievement": {
"achievement_id": "lvl20",
"unlock_time": 1548422395
}
}
}
@pytest.mark.asyncio
async def test_game_achievements_import_success(plugin, write):
achievements = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395)
]
plugin.game_achievements_import_success("134", achievements)
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_achievements_import_success",
"params": {
"game_id": "134",
"unlocked_achievements": [
{
"achievement_id": "lvl10",
"unlock_time": 1548421241
},
{
"achievement_name": "Got level 20",
"unlock_time": 1548422395
}
]
}
}
@pytest.mark.asyncio
async def test_game_achievements_import_failure(plugin, write):
plugin.game_achievements_import_failure("134", ImportInProgress())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_achievements_import_failure",
"params": {
"game_id": "134",
"error": {
"code": 600,
"message": "Import already in progress"
}
}
}
@pytest.mark.asyncio
async def test_achievements_import_finished(plugin, write):
plugin.achievements_import_finished()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "achievements_import_finished",
"params": None
}
@pytest.mark.asyncio
async def test_start_achievements_import(plugin, write, mocker):
game_achievements_import_success = mocker.patch.object(plugin, "game_achievements_import_success")
game_achievements_import_failure = mocker.patch.object(plugin, "game_achievements_import_failure")
achievements_import_finished = mocker.patch.object(plugin, "achievements_import_finished")
game_ids = ["1", "5", "9"]
error = BackendError()
achievements = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395)
]
plugin.get_unlocked_achievements.coro.side_effect = [
achievements,
[],
error
]
await plugin.start_achievements_import(game_ids)
with pytest.raises(ImportInProgress):
await plugin.start_achievements_import(["4", "8"])
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_unlocked_achievements.coro.assert_has_calls([call("1"), call("5"), call("9")])
game_achievements_import_success.assert_has_calls([
call("1", achievements),
call("5", [])
])
game_achievements_import_failure.assert_called_once_with("9", error)
achievements_import_finished.assert_called_once_with()

119
tests/test_authenticate.py Normal file
View File

@@ -0,0 +1,119 @@
import asyncio
import json
import pytest
from galaxy.api.types import Authentication
from galaxy.api.errors import (
UnknownError, InvalidCredentials, NetworkError, LoggedInElsewhere, ProtocolError,
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied,
ParentalControlBlock, DeviceBlocked, RegionBlocked
)
def test_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication"
}
readline.side_effect = [json.dumps(request), ""]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek")
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"user_id": "132",
"user_name": "Zenek"
}
}
@pytest.mark.parametrize("error,code,message", [
pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"),
pytest.param(BackendNotAvailable, 2, "Backend not available", id="backend_not_available"),
pytest.param(BackendTimeout, 3, "Backend timed out", id="backend_timeout"),
pytest.param(BackendError, 4, "Backend error", id="backend_error"),
pytest.param(InvalidCredentials, 100, "Invalid credentials", id="invalid_credentials"),
pytest.param(NetworkError, 101, "Network error", id="network_error"),
pytest.param(LoggedInElsewhere, 102, "Logged in elsewhere", id="logged_elsewhere"),
pytest.param(ProtocolError, 103, "Protocol error", id="protocol_error"),
pytest.param(TemporaryBlocked, 104, "Temporary blocked", id="temporary_blocked"),
pytest.param(Banned, 105, "Banned", id="banned"),
pytest.param(AccessDenied, 106, "Access denied", id="access_denied"),
pytest.param(ParentalControlBlock, 107, "Parental control block", id="parental_control_clock"),
pytest.param(DeviceBlocked, 108, "Device blocked", id="device_blocked"),
pytest.param(RegionBlocked, 109, "Region blocked", id="region_blocked")
])
def test_failure(plugin, readline, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication"
}
readline.side_effect = [json.dumps(request), ""]
plugin.authenticate.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": code,
"message": message
}
}
def test_stored_credentials(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication",
"params": {
"stored_credentials": {
"token": "ABC"
}
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek")
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with(stored_credentials={"token": "ABC"})
write.assert_called()
def test_store_credentials(plugin, write):
credentials = {
"token": "ABC"
}
async def couritine():
plugin.store_credentials(credentials)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "store_credentials",
"params": credentials
}
def test_lost_authentication(plugin, readline, write):
async def couritine():
plugin.lost_authentication()
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "authentication_lost",
"params": None
}

354
tests/test_chat.py Normal file
View File

@@ -0,0 +1,354 @@
import asyncio
import json
import pytest
from galaxy.api.types import Room, Message
from galaxy.api.errors import (
UnknownError, AuthenticationRequired, BackendNotAvailable, BackendTimeout, BackendError,
TooManyMessagesSent, IncoherentLastMessage, MessageNotFound
)
def test_send_message_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "send_message",
"params": {
"room_id": "14",
"message": "Hello!"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.send_message.coro.return_value = None
asyncio.run(plugin.run())
plugin.send_message.assert_called_with(room_id="14", message="Hello!")
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": None
}
@pytest.mark.parametrize("error,code,message", [
pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"),
pytest.param(AuthenticationRequired, 1, "Authentication required", id="not_authenticated"),
pytest.param(BackendNotAvailable, 2, "Backend not available", id="backend_not_available"),
pytest.param(BackendTimeout, 3, "Backend timed out", id="backend_timeout"),
pytest.param(BackendError, 4, "Backend error", id="backend_error"),
pytest.param(TooManyMessagesSent, 300, "Too many messages sent", id="too_many_messages")
])
def test_send_message_failure(plugin, readline, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "6",
"method": "send_message",
"params": {
"room_id": "15",
"message": "Bye"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.send_message.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.send_message.assert_called_with(room_id="15", message="Bye")
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "6",
"error": {
"code": code,
"message": message
}
}
def test_mark_as_read_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "7",
"method": "mark_as_read",
"params": {
"room_id": "14",
"last_message_id": "67"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.mark_as_read.coro.return_value = None
asyncio.run(plugin.run())
plugin.mark_as_read.assert_called_with(room_id="14", last_message_id="67")
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "7",
"result": None
}
@pytest.mark.parametrize("error,code,message", [
pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"),
pytest.param(AuthenticationRequired, 1, "Authentication required", id="not_authenticated"),
pytest.param(BackendNotAvailable, 2, "Backend not available", id="backend_not_available"),
pytest.param(BackendTimeout, 3, "Backend timed out", id="backend_timeout"),
pytest.param(BackendError, 4, "Backend error", id="backend_error"),
pytest.param(
IncoherentLastMessage,
400,
"Different last message id on backend",
id="incoherent_last_message"
)
])
def test_mark_as_read_failure(plugin, readline, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "4",
"method": "mark_as_read",
"params": {
"room_id": "18",
"last_message_id": "7"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.mark_as_read.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.mark_as_read.assert_called_with(room_id="18", last_message_id="7")
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": code,
"message": message
}
}
def test_get_rooms_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "2",
"method": "import_rooms"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_rooms.coro.return_value = [
Room("13", 0, None),
Room("15", 34, "8")
]
asyncio.run(plugin.run())
plugin.get_rooms.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "2",
"result": {
"rooms": [
{
"room_id": "13",
"unread_message_count": 0,
},
{
"room_id": "15",
"unread_message_count": 34,
"last_message_id": "8"
}
]
}
}
def test_get_rooms_failure(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "9",
"method": "import_rooms"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_rooms.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_rooms.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "9",
"error": {
"code": 0,
"message": "Unknown error"
}
}
def test_get_room_history_from_message_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "2",
"method": "import_room_history_from_message",
"params": {
"room_id": "34",
"message_id": "66"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_room_history_from_message.coro.return_value = [
Message("13", "149", 1549454837, "Hello"),
Message("14", "812", 1549454899, "Hi")
]
asyncio.run(plugin.run())
plugin.get_room_history_from_message.assert_called_with(room_id="34", message_id="66")
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "2",
"result": {
"messages": [
{
"message_id": "13",
"sender_id": "149",
"sent_time": 1549454837,
"message_text": "Hello"
},
{
"message_id": "14",
"sender_id": "812",
"sent_time": 1549454899,
"message_text": "Hi"
}
]
}
}
@pytest.mark.parametrize("error,code,message", [
pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"),
pytest.param(AuthenticationRequired, 1, "Authentication required", id="not_authenticated"),
pytest.param(BackendNotAvailable, 2, "Backend not available", id="backend_not_available"),
pytest.param(BackendTimeout, 3, "Backend timed out", id="backend_timeout"),
pytest.param(BackendError, 4, "Backend error", id="backend_error"),
pytest.param(MessageNotFound, 500, "Message not found", id="message_not_found")
])
def test_get_room_history_from_message_failure(plugin, readline, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "7",
"method": "import_room_history_from_message",
"params": {
"room_id": "33",
"message_id": "88"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_room_history_from_message.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.get_room_history_from_message.assert_called_with(room_id="33", message_id="88")
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "7",
"error": {
"code": code,
"message": message
}
}
def test_get_room_history_from_timestamp_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "7",
"method": "import_room_history_from_timestamp",
"params": {
"room_id": "12",
"from_timestamp": 1549454835
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_room_history_from_timestamp.coro.return_value = [
Message("12", "155", 1549454836, "Bye")
]
asyncio.run(plugin.run())
plugin.get_room_history_from_timestamp.assert_called_with(
room_id="12",
from_timestamp=1549454835
)
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "7",
"result": {
"messages": [
{
"message_id": "12",
"sender_id": "155",
"sent_time": 1549454836,
"message_text": "Bye"
}
]
}
}
def test_get_room_history_from_timestamp_failure(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_room_history_from_timestamp",
"params": {
"room_id": "10",
"from_timestamp": 1549454800
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_room_history_from_timestamp.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_room_history_from_timestamp.assert_called_with(
room_id="10",
from_timestamp=1549454800
)
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error"
}
}
def test_update_room(plugin, write):
messages = [
Message("10", "898", 1549454832, "Hi")
]
async def couritine():
plugin.update_room("14", 15, messages)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "chat_room_updated",
"params": {
"room_id": "14",
"unread_message_count": 15,
"messages": [
{
"message_id": "10",
"sender_id": "898",
"sent_time": 1549454832,
"message_text": "Hi"
}
]
}
}

45
tests/test_features.py Normal file
View File

@@ -0,0 +1,45 @@
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform, Feature
def test_base_class():
plugin = Plugin(Platform.Generic, "0.1", None, None, None)
assert plugin.features == []
def test_no_overloads():
class PluginImpl(Plugin): #pylint: disable=abstract-method
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == []
def test_one_method_feature():
class PluginImpl(Plugin): #pylint: disable=abstract-method
async def get_owned_games(self):
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [Feature.ImportOwnedGames]
def test_multiple_methods_feature_all():
class PluginImpl(Plugin): #pylint: disable=abstract-method
async def send_message(self, room_id, message):
pass
async def mark_as_read(self, room_id, last_message_id):
pass
async def get_rooms(self):
pass
async def get_room_history_from_message(self, room_id, message_id):
pass
async def get_room_history_from_timestamp(self, room_id, timestamp):
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [Feature.Chat]
def test_multiple_methods_feature_not_all():
class PluginImpl(Plugin): #pylint: disable=abstract-method
async def send_message(self, room_id, message):
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == []

90
tests/test_friends.py Normal file
View File

@@ -0,0 +1,90 @@
import asyncio
import json
from galaxy.api.types import FriendInfo
from galaxy.api.errors import UnknownError
def test_get_friends_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_friends.coro.return_value = [
FriendInfo("3", "Jan"),
FriendInfo("5", "Ola")
]
asyncio.run(plugin.run())
plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"friend_info_list": [
{"user_id": "3", "user_name": "Jan"},
{"user_id": "5", "user_name": "Ola"}
]
}
}
def test_get_friends_failure(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_friends.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error",
}
}
def test_add_friend(plugin, write):
friend = FriendInfo("7", "Kuba")
async def couritine():
plugin.add_friend(friend)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_added",
"params": {
"friend_info": {"user_id": "7", "user_name": "Kuba"}
}
}
def test_remove_friend(plugin, write):
async def couritine():
plugin.remove_friend("5")
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_removed",
"params": {
"user_id": "5"
}
}

175
tests/test_game_times.py Normal file
View File

@@ -0,0 +1,175 @@
import asyncio
import json
from unittest.mock import call
import pytest
from galaxy.api.types import GameTime
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError
def test_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_game_times"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_game_times.coro.return_value = [
GameTime("3", 60, 1549550504),
GameTime("5", 10, 1549550502)
]
asyncio.run(plugin.run())
plugin.get_game_times.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"game_times": [
{
"game_id": "3",
"time_played": 60,
"last_played_time": 1549550504
},
{
"game_id": "5",
"time_played": 10,
"last_played_time": 1549550502
}
]
}
}
def test_failure(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_game_times"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_game_times.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_game_times.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error",
}
}
def test_update_game(plugin, write):
game_time = GameTime("3", 60, 1549550504)
async def couritine():
plugin.update_game_time(game_time)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_updated",
"params": {
"game_time": {
"game_id": "3",
"time_played": 60,
"last_played_time": 1549550504
}
}
}
@pytest.mark.asyncio
async def test_game_time_import_success(plugin, write):
plugin.game_time_import_success(GameTime("3", 60, 1549550504))
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "3",
"time_played": 60,
"last_played_time": 1549550504
}
}
}
@pytest.mark.asyncio
async def test_game_time_import_failure(plugin, write):
plugin.game_time_import_failure("134", ImportInProgress())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_import_failure",
"params": {
"game_id": "134",
"error": {
"code": 600,
"message": "Import already in progress"
}
}
}
@pytest.mark.asyncio
async def test_game_times_import_finished(plugin, write):
plugin.game_times_import_finished()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_times_import_finished",
"params": None
}
@pytest.mark.asyncio
async def test_start_game_times_import(plugin, write, mocker):
game_time_import_success = mocker.patch.object(plugin, "game_time_import_success")
game_time_import_failure = mocker.patch.object(plugin, "game_time_import_failure")
game_times_import_finished = mocker.patch.object(plugin, "game_times_import_finished")
game_ids = ["1", "5"]
game_time = GameTime("1", 10, 1549550502)
plugin.get_game_times.coro.return_value = [
game_time
]
await plugin.start_game_times_import(game_ids)
with pytest.raises(ImportInProgress):
await plugin.start_game_times_import(["4", "8"])
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_game_times.coro.assert_called_once_with()
game_time_import_success.assert_called_once_with(game_time)
game_time_import_failure.assert_called_once_with("5", UnknownError())
game_times_import_finished.assert_called_once_with()
@pytest.mark.asyncio
async def test_start_game_times_import_failure(plugin, write, mocker):
game_time_import_failure = mocker.patch.object(plugin, "game_time_import_failure")
game_times_import_finished = mocker.patch.object(plugin, "game_times_import_finished")
game_ids = ["1", "5"]
error = BackendError()
plugin.get_game_times.coro.side_effect = error
await plugin.start_game_times_import(game_ids)
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_game_times.coro.assert_called_once_with()
assert game_time_import_failure.mock_calls == [call("1", error), call("5", error)]
game_times_import_finished.assert_called_once_with()

View File

@@ -0,0 +1,16 @@
import asyncio
import json
def test_success(plugin, readline):
request = {
"jsonrpc": "2.0",
"method": "install_game",
"params": {
"game_id": "3"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.install_game.assert_called_with(game_id="3")

68
tests/test_internal.py Normal file
View File

@@ -0,0 +1,68 @@
import asyncio
import json
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
def test_get_capabilites(reader, writer, readline, write):
class PluginImpl(Plugin): #pylint: disable=abstract-method
async def get_owned_games(self):
pass
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "get_capabilities"
}
token = "token"
plugin = PluginImpl(Platform.Generic, "0.1", reader, writer, token)
readline.side_effect = [json.dumps(request), ""]
asyncio.run(plugin.run())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"platform_name": "generic",
"features": [
"ImportOwnedGames"
],
"token": token
}
}
def test_shutdown(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "5",
"method": "shutdown"
}
readline.side_effect = [json.dumps(request)]
asyncio.run(plugin.run())
plugin.shutdown.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "5",
"result": None
}
def test_ping(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "7",
"method": "ping"
}
readline.side_effect = [json.dumps(request), ""]
asyncio.run(plugin.run())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "7",
"result": None
}
def test_tick(plugin, readline):
readline.side_effect = [""]
asyncio.run(plugin.run())
plugin.tick.assert_called_with()

16
tests/test_launch_game.py Normal file
View File

@@ -0,0 +1,16 @@
import asyncio
import json
def test_success(plugin, readline):
request = {
"jsonrpc": "2.0",
"method": "launch_game",
"params": {
"game_id": "3"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.launch_game.assert_called_with(game_id="3")

96
tests/test_local_games.py Normal file
View File

@@ -0,0 +1,96 @@
import asyncio
import json
import pytest
from galaxy.api.types import LocalGame
from galaxy.api.consts import LocalGameState
from galaxy.api.errors import UnknownError, FailedParsingManifest
def test_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_local_games"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_local_games.coro.return_value = [
LocalGame("1", LocalGameState.Running),
LocalGame("2", LocalGameState.Installed),
LocalGame("3", LocalGameState.Installed | LocalGameState.Running)
]
asyncio.run(plugin.run())
plugin.get_local_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"local_games" : [
{
"game_id": "1",
"local_game_state": LocalGameState.Running.value
},
{
"game_id": "2",
"local_game_state": LocalGameState.Installed.value
},
{
"game_id": "3",
"local_game_state": (LocalGameState.Installed | LocalGameState.Running).value
}
]
}
}
@pytest.mark.parametrize(
"error,code,message",
[
pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"),
pytest.param(FailedParsingManifest, 200, "Failed parsing manifest", id="failed_parsing")
],
)
def test_failure(plugin, readline, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_local_games"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_local_games.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.get_local_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": code,
"message": message
}
}
def test_local_game_state_update(plugin, write):
game = LocalGame("1", LocalGameState.Running)
async def couritine():
plugin.update_local_game_status(game)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "local_game_status_changed",
"params": {
"local_game": {
"game_id": "1",
"local_game_state": LocalGameState.Running.value
}
}
}

151
tests/test_owned_games.py Normal file
View File

@@ -0,0 +1,151 @@
import asyncio
import json
from galaxy.api.types import Game, Dlc, LicenseInfo
from galaxy.api.consts import LicenseType
from galaxy.api.errors import UnknownError
def test_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_owned_games"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_owned_games.coro.return_value = [
Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None)),
Game(
"5",
"Witcher 3",
[
Dlc("7", "Hearts of Stone", LicenseInfo(LicenseType.SinglePurchase, None)),
Dlc("8", "Temerian Armor Set", LicenseInfo(LicenseType.FreeToPlay, None)),
],
LicenseInfo(LicenseType.SinglePurchase, None))
]
asyncio.run(plugin.run())
plugin.get_owned_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"owned_games": [
{
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
}
},
{
"game_id": "5",
"game_title": "Witcher 3",
"dlcs": [
{
"dlc_id": "7",
"dlc_title": "Hearts of Stone",
"license_info": {
"license_type": "SinglePurchase"
}
},
{
"dlc_id": "8",
"dlc_title": "Temerian Armor Set",
"license_info": {
"license_type": "FreeToPlay"
}
}
],
"license_info": {
"license_type": "SinglePurchase"
}
}
]
}
}
def test_failure(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_owned_games"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_owned_games.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_owned_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error"
}
}
def test_add_game(plugin, write):
game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None))
async def couritine():
plugin.add_game(game)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "owned_game_added",
"params": {
"owned_game": {
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
}
}
}
}
def test_remove_game(plugin, write):
async def couritine():
plugin.remove_game("5")
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "owned_game_removed",
"params": {
"game_id": "5"
}
}
def test_update_game(plugin, write):
game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None))
async def couritine():
plugin.update_game(game)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "owned_game_updated",
"params": {
"owned_game": {
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
}
}
}
}

View File

@@ -0,0 +1,16 @@
import asyncio
import json
def test_success(plugin, readline):
request = {
"jsonrpc": "2.0",
"method": "uninstall_game",
"params": {
"game_id": "3"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.uninstall_game.assert_called_with(game_id="3")

69
tests/test_users.py Normal file
View File

@@ -0,0 +1,69 @@
import asyncio
import json
from galaxy.api.types import UserInfo, Presence
from galaxy.api.errors import UnknownError
from galaxy.api.consts import PresenceState
def test_get_users_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "8",
"method": "import_user_infos",
"params": {
"user_id_list": ["13"]
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_users.coro.return_value = [
UserInfo("5", False, "Ula", "http://avatar.png", Presence(PresenceState.Offline))
]
asyncio.run(plugin.run())
plugin.get_users.assert_called_with(user_id_list=["13"])
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "8",
"result": {
"user_info_list": [
{
"user_id": "5",
"is_friend": False,
"user_name": "Ula",
"avatar_url": "http://avatar.png",
"presence": {
"presence_state": "offline"
}
}
]
}
}
def test_get_users_failure(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "12",
"method": "import_user_infos",
"params": {
"user_id_list": ["10", "11", "12"]
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_users.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_users.assert_called_with(user_id_list=["10", "11", "12"])
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "12",
"error": {
"code": 0,
"message": "Unknown error"
}
}