Compare commits

..

17 Commits
0.45 ... 0.48

Author SHA1 Message Date
Romuald Juchnowicz-Bierbasz
bab0be9994 Increment version 2019-09-02 17:31:47 +02:00
Romuald Juchnowicz-Bierbasz
0294e2a1f1 SDK-3023: Introduce task managers 2019-09-02 17:28:13 +02:00
Mieszko Banczerowski
0ab00e4119 Fix typing in persistent cache 2019-08-23 17:48:42 +02:00
Mieszko Banczerowski
b20fce057b GPI-661: Add docs to http module 2019-08-23 17:46:08 +02:00
Romuald Juchnowicz-Bierbasz
dec59f47dd Add more detials to launch_platform_client method 2019-08-22 09:42:10 +02:00
Mieszko Banczerowski
ca85b2428b GPI-517: Mark coroutines in docs 2019-08-20 14:35:54 +02:00
Pawel Czoppa
d8a00d58a6 Merge branch 'master' of https://gitlab.gog.com/galaxy-client/galaxy-plugin-api into platform_id_update
# Conflicts:
#	PLATFORM_IDs.md
2019-08-20 14:22:36 +02:00
Romuald Juchnowicz-Bierbasz
d4cd1cedfd Install wheel in deploy 2019-08-14 16:07:01 +02:00
Romuald Juchnowicz-Bierbasz
161122b94d SDK-2988: Upload to official pypi server 2019-08-14 13:48:35 +02:00
Romuald Juchnowicz-Bierbasz
cec36695b6 Increment version 2019-08-13 16:08:59 +02:00
Romuald Juchnowicz-Bierbasz
4cc8be8f5d SDK-2997: Add launch_platform_client method 2019-08-13 15:23:20 +02:00
Vadim Suharnikov
f5eb32aa19 Fix a comment typo 2019-08-04 01:06:22 +02:00
Romuald Bierbasz
a76345ff6b Docs fixes 2019-08-02 17:23:56 +02:00
Romuald Bierbasz
c3bbeee54d Turn on type checking 2019-08-02 15:15:29 +02:00
Romuald Juchnowicz-Bierbasz
13a3f7577b Increment version 2019-08-02 15:13:11 +02:00
Romuald Bierbasz
f5b9adfbd5 Add achievements_import_complete and game_times_import_complete 2019-08-02 15:11:05 +02:00
Piotr Marzec
8d210e7f3e platform name fixing 2019-06-19 20:43:25 +02:00
27 changed files with 386 additions and 180 deletions

View File

@@ -14,13 +14,19 @@ test_package:
deploy_package:
stage: deploy
variables:
TWINE_USERNAME: $PYPI_USERNAME
TWINE_PASSWORD: $PYPI_PASSWORD
script:
- pip install twine wheel
- rm -rf dist
- export VERSION=$(python setup.py --version)
- python setup.py sdist --formats=gztar upload -r gog-pypi
- python setup.py sdist --formats=gztar bdist_wheel
- twine upload dist/*
- curl -X POST --silent --show-error --fail
"https://gitlab.gog.com/api/v4/projects/${CI_PROJECT_ID}/repository/tags?tag_name=${VERSION}&ref=${CI_COMMIT_REF_NAME}&private_token=${PACKAGE_DEPLOYER_API_TOKEN}"
when: manual
only:
- master
except:
- tags
- tags

View File

@@ -20,7 +20,7 @@ Platform ID list for GOG Galaxy 2.0 Integrations
| nswitch | Nintendo Switch |
| nwiiu | Nintendo Wii U |
| nwii | Nintendo Wii |
| ncube | Nintendo Game Cube |
| ncube | Nintendo GameCube |
| riot | Riot |
| wargaming | Wargaming |
| ngameboy | Nintendo Game Boy |
@@ -58,25 +58,25 @@ Platform ID list for GOG Galaxy 2.0 Integrations
| bb | BestBuy |
| gameuk | Game UK |
| fanatical | Fanatical store |
| playasia | PlayAsia |
| playasia | Play-Asia |
| stadia | Google Stadia |
| arc | ARC |
| eso | ESO |
| glyph | Trion World |
| aionl | Aion: Legions of War |
| aion | Aion |
| blade | Blade and Soul |
| blade | Blade & Soul |
| gw | Guild Wars |
| gw2 | Guild Wars 2 |
| lin2 | Lineage 2 |
| ffxi | Final Fantasy XI |
| ffxiv | Final Fantasy XIV |
| totalwar | TotalWar |
| ffxiv | Final Fantasy XIV |
| totalwar | Total War |
| winstore | Windows Store |
| elites | Elite Dangerous |
| star | Star Citizen |
| psp | Playstation Portable |
| psvita | Playstation Vita |
| psp | PlayStation Portable |
| psvita | PlayStation Vita |
| nds | Nintendo DS |
| 3ds | Nintendo 3DS |
| pathofexile | Path of Exile |

View File

@@ -1,4 +1,5 @@
Sphinx==2.0.1
sphinx-rtd-theme==0.4.3
sphinx-autodoc-typehints==1.6.0
sphinxcontrib-asyncio==0.2.0
m2r==0.2.1

View File

@@ -32,12 +32,13 @@ release = _version
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinxcontrib.asyncio',
'sphinx_autodoc_typehints',
'm2r' # mdinclude directive for makrdown files
]
autodoc_member_order = 'bysource'
autodoc_inherit_docstrings = False
autodoc_mock_imports = ["galaxy.http"]
autodoc_mock_imports = ["aiohttp"]
set_type_checking_flag = True
@@ -47,7 +48,7 @@ templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = []
exclude_patterns = [] # type: ignore
# -- Options for HTML output -------------------------------------------------

View File

@@ -0,0 +1,8 @@
galaxy.http
=================
.. automodule:: galaxy.http
:members:
:special-members: __init__
:undoc-members:
:show-inheritance:

View File

@@ -6,7 +6,8 @@ GOG Galaxy Integrations Python API
:includehidden:
Overview <overview>
API <galaxy.api>
galaxy.api
galaxy.http
Platform ID's <platforms>
Index

2
mypy.ini Normal file
View File

@@ -0,0 +1,2 @@
[mypy]
ignore_missing_imports = True

View File

@@ -1,2 +1,2 @@
[pytest]
addopts = --flakes
addopts = --flakes --mypy

View File

@@ -2,6 +2,7 @@
pytest==4.2.0
pytest-asyncio==0.10.0
pytest-mock==1.10.3
pytest-mypy==0.3.2
pytest-flakes==4.0.0
# because of pip bug https://github.com/pypa/pip/issues/4780
aiohttp==3.5.4

View File

@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="galaxy.plugin.api",
version="0.45",
version="0.48",
description="GOG Galaxy Integrations Python API",
author='Galaxy team',
author_email='galaxy@gog.com',

View File

@@ -1 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
__path__: str = __import__('pkgutil').extend_path(__path__, __name__)

View File

@@ -99,6 +99,7 @@ class Feature(Enum):
VerifyGame = "VerifyGame"
ImportFriends = "ImportFriends"
ShutdownPlatformClient = "ShutdownPlatformClient"
LaunchPlatformClient = "LaunchPlatformClient"
class LicenseType(Enum):

View File

@@ -1,6 +1,6 @@
from galaxy.api.jsonrpc import ApplicationError, UnknownError
UnknownError = UnknownError
assert UnknownError
class AuthenticationRequired(ApplicationError):
def __init__(self, data=None):

View File

@@ -6,6 +6,7 @@ import inspect
import json
from galaxy.reader import StreamLineReader
from galaxy.task_manager import TaskManager
class JsonRpcError(Exception):
def __init__(self, code, message, data=None):
@@ -52,7 +53,8 @@ class UnknownError(ApplicationError):
super().__init__(0, "Unknown error", data)
Request = namedtuple("Request", ["method", "params", "id"], defaults=[{}, None])
Method = namedtuple("Method", ["callback", "signature", "internal", "sensitive_params"])
Method = namedtuple("Method", ["callback", "signature", "immediate", "sensitive_params"])
def anonymise_sensitive_params(params, sensitive_params):
anomized_data = "****"
@@ -74,9 +76,9 @@ class Server():
self._encoder = encoder
self._methods = {}
self._notifications = {}
self._eof_listeners = []
self._task_manager = TaskManager("jsonrpc server")
def register_method(self, name, callback, internal, sensitive_params=False):
def register_method(self, name, callback, immediate, sensitive_params=False):
"""
Register method
@@ -86,9 +88,9 @@ class Server():
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._methods[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
self._methods[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params)
def register_notification(self, name, callback, internal, sensitive_params=False):
def register_notification(self, name, callback, immediate, sensitive_params=False):
"""
Register notification
@@ -98,10 +100,7 @@ class Server():
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._notifications[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
def register_eof(self, callback):
self._eof_listeners.append(callback)
self._notifications[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params)
async def run(self):
while self._active:
@@ -118,14 +117,16 @@ class Server():
self._handle_input(data)
await asyncio.sleep(0) # To not starve task queue
def stop(self):
def close(self):
logging.info("Closing JSON-RPC server - not more messages will be read")
self._active = False
async def wait_closed(self):
await self._task_manager.wait()
def _eof(self):
logging.info("Received EOF")
self.stop()
for listener in self._eof_listeners:
listener()
self.close()
def _handle_input(self, data):
try:
@@ -145,7 +146,7 @@ class Server():
logging.error("Received unknown notification: %s", request.method)
return
callback, signature, internal, sensitive_params = method
callback, signature, immediate, sensitive_params = method
self._log_request(request, sensitive_params)
try:
@@ -153,12 +154,11 @@ class Server():
except TypeError:
self._send_error(request.id, InvalidParams())
if internal:
# internal requests are handled immediately
if immediate:
callback(*bound_args.args, **bound_args.kwargs)
else:
try:
asyncio.create_task(callback(*bound_args.args, **bound_args.kwargs))
self._task_manager.create_task(callback(*bound_args.args, **bound_args.kwargs), request.method)
except Exception:
logging.exception("Unexpected exception raised in notification handler")
@@ -169,7 +169,7 @@ class Server():
self._send_error(request.id, MethodNotFound())
return
callback, signature, internal, sensitive_params = method
callback, signature, immediate, sensitive_params = method
self._log_request(request, sensitive_params)
try:
@@ -177,8 +177,7 @@ class Server():
except TypeError:
self._send_error(request.id, InvalidParams())
if internal:
# internal requests are handled immediately
if immediate:
response = callback(*bound_args.args, **bound_args.kwargs)
self._send_response(request.id, response)
else:
@@ -190,11 +189,13 @@ class Server():
self._send_error(request.id, MethodNotFound())
except JsonRpcError as error:
self._send_error(request.id, error)
except asyncio.CancelledError:
self._send_error(request.id, Aborted())
except Exception as e: #pylint: disable=broad-except
logging.exception("Unexpected exception raised in plugin handler")
self._send_error(request.id, UnknownError(str(e)))
asyncio.create_task(handle())
self._task_manager.create_task(handle(), request.method)
@staticmethod
def _parse_request(data):
@@ -215,7 +216,7 @@ class Server():
logging.debug("Sending data: %s", line)
data = (line + "\n").encode("utf-8")
self._writer.write(data)
asyncio.create_task(self._writer.drain())
self._task_manager.create_task(self._writer.drain(), "drain")
except TypeError as error:
logging.error(str(error))
@@ -255,6 +256,7 @@ class NotificationClient():
self._writer = writer
self._encoder = encoder
self._methods = {}
self._task_manager = TaskManager("notification client")
def notify(self, method, params, sensitive_params=False):
"""
@@ -273,13 +275,16 @@ class NotificationClient():
self._log(method, params, sensitive_params)
self._send(notification)
async def close(self):
await self._task_manager.wait()
def _send(self, data):
try:
line = self._encoder.encode(data)
data = (line + "\n").encode("utf-8")
logging.debug("Sending %d byte of data", len(data))
self._writer.write(data)
asyncio.create_task(self._writer.drain())
self._task_manager.create_task(self._writer.drain(), "drain")
except TypeError as error:
logging.error("Failed to parse outgoing message: %s", str(error))

View File

@@ -4,16 +4,14 @@ import json
import logging
import logging.handlers
import sys
from collections import OrderedDict
from enum import Enum
from itertools import count
from typing import Any, Dict, List, Optional, Set, Union
from galaxy.api.consts import Feature
from galaxy.api.errors import ImportInProgress, UnknownError
from galaxy.api.jsonrpc import ApplicationError, NotificationClient, Server
from galaxy.api.types import Achievement, Authentication, FriendInfo, Game, GameTime, LocalGame, NextStep
from galaxy.task_manager import TaskManager
class JSONEncoder(json.JSONEncoder):
def default(self, o): # pylint: disable=method-hidden
@@ -38,7 +36,6 @@ class Plugin:
self._features: Set[Feature] = set()
self._active = True
self._pass_control_task = None
self._reader, self._writer = reader, writer
self._handshake_token = handshake_token
@@ -47,29 +44,25 @@ class Plugin:
self._server = Server(self._reader, self._writer, encoder)
self._notification_client = NotificationClient(self._writer, encoder)
def eof_handler():
self._shutdown()
self._server.register_eof(eof_handler)
self._achievements_import_in_progress = False
self._game_times_import_in_progress = False
self._persistent_cache = dict()
self._tasks = OrderedDict()
self._task_counter = count()
self._internal_task_manager = TaskManager("plugin internal")
self._external_task_manager = TaskManager("plugin external")
# internal
self._register_method("shutdown", self._shutdown, internal=True)
self._register_method("get_capabilities", self._get_capabilities, internal=True)
self._register_method("get_capabilities", self._get_capabilities, internal=True, immediate=True)
self._register_method(
"initialize_cache",
self._initialize_cache,
internal=True,
immediate=True,
sensitive_params="data"
)
self._register_method("ping", self._ping, internal=True)
self._register_method("ping", self._ping, internal=True, immediate=True)
# implemented by developer
self._register_method(
@@ -107,18 +100,28 @@ class Plugin:
self._register_notification("shutdown_platform_client", self.shutdown_platform_client)
self._detect_feature(Feature.ShutdownPlatformClient, ["shutdown_platform_client"])
self._register_notification("launch_platform_client", self.launch_platform_client)
self._detect_feature(Feature.LaunchPlatformClient, ["launch_platform_client"])
self._register_method("import_friends", self.get_friends, result_name="friend_info_list")
self._detect_feature(Feature.ImportFriends, ["get_friends"])
self._register_method("start_game_times_import", self._start_game_times_import)
self._detect_feature(Feature.ImportGameTime, ["get_game_time"])
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
self.close()
await self.wait_closed()
@property
def features(self) -> List[Feature]:
return list(self._features)
@property
def persistent_cache(self) -> Dict:
def persistent_cache(self) -> Dict[str, str]:
"""The cache is only available after the :meth:`~.handshake_complete()` is called.
"""
return self._persistent_cache
@@ -133,55 +136,65 @@ class Plugin:
if self._implements(methods):
self._features.add(feature)
def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False):
if internal:
def _register_method(self, name, handler, result_name=None, internal=False, immediate=False, sensitive_params=False):
def wrap_result(result):
if result_name:
result = {
result_name: result
}
return result
if immediate:
def method(*args, **kwargs):
result = handler(*args, **kwargs)
if result_name:
result = {
result_name: result
}
return result
return wrap_result(result)
self._server.register_method(name, method, True, sensitive_params)
else:
async def method(*args, **kwargs):
result = await handler(*args, **kwargs)
if result_name:
result = {
result_name: result
}
return result
if not internal:
handler_ = self._wrap_external_method(handler, name)
else:
handler_ = handler
result = await handler_(*args, **kwargs)
return wrap_result(result)
self._server.register_method(name, method, False, sensitive_params)
def _register_notification(self, name, handler, internal=False, sensitive_params=False):
self._server.register_notification(name, handler, internal, sensitive_params)
def _register_notification(self, name, handler, internal=False, immediate=False, sensitive_params=False):
if not internal and not immediate:
handler = self._wrap_external_method(handler, name)
self._server.register_notification(name, handler, immediate, sensitive_params)
def _wrap_external_method(self, handler, name: str):
async def wrapper(*args, **kwargs):
return await self._external_task_manager.create_task(handler(*args, **kwargs), name, False)
return wrapper
async def run(self):
"""Plugin's main coroutine."""
await self._server.run()
if self._pass_control_task is not None:
await self._pass_control_task
await self._external_task_manager.wait()
def close(self) -> None:
if not self._active:
return
logging.info("Closing plugin")
self._server.close()
self._external_task_manager.cancel()
self._internal_task_manager.create_task(self.shutdown(), "shutdown")
self._active = False
async def wait_closed(self) -> None:
await self._external_task_manager.wait()
await self._internal_task_manager.wait()
await self._server.wait_closed()
await self._notification_client.close()
def create_task(self, coro, description):
"""Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown"""
async def task_wrapper(task_id):
try:
return await coro
except asyncio.CancelledError:
logging.debug("Canceled task %d (%s)", task_id, description)
except Exception:
logging.exception("Exception raised in task %d (%s)", task_id, description)
finally:
del self._tasks[task_id]
task_id = next(self._task_counter)
logging.debug("Creating task %d (%s)", task_id, description)
task = asyncio.create_task(task_wrapper(task_id))
self._tasks[task_id] = task
return task
return self._external_task_manager.create_task(coro, description)
async def _pass_control(self):
while self._active:
@@ -191,13 +204,11 @@ class Plugin:
logging.exception("Unexpected exception raised in plugin tick")
await asyncio.sleep(1)
def _shutdown(self):
async def _shutdown(self):
logging.info("Shutting down")
self._server.stop()
self._active = False
self.shutdown()
for task in self._tasks.values():
task.cancel()
self.close()
await self._external_task_manager.wait()
await self._internal_task_manager.wait()
def _get_capabilities(self):
return {
@@ -212,7 +223,7 @@ class Plugin:
self.handshake_complete()
except Exception:
logging.exception("Unhandled exception during `handshake_complete` step")
self._pass_control_task = asyncio.create_task(self._pass_control())
self._internal_task_manager.create_task(self._pass_control(), "tick")
@staticmethod
def _ping():
@@ -240,8 +251,10 @@ class Plugin:
self.store_credentials(user_data['credentials'])
return Authentication(user_data['userId'], user_data['username'])
"""
self.persistent_cache['credentials'] = credentials
"""
# temporary solution for persistent_cache vs credentials issue
self.persistent_cache['credentials'] = credentials # type: ignore
self._notification_client.notify("store_credentials", credentials, sensitive_params=True)
def add_game(self, game: Game) -> None:
@@ -270,7 +283,7 @@ class Plugin:
"""Notify the client to remove game from the list of owned games
of the currently authenticated user.
:param game_id: game id of the game to remove from the list of owned games
:param game_id: the id of the game to remove from the list of owned games
Example use case of remove_game:
@@ -300,7 +313,7 @@ class Plugin:
def unlock_achievement(self, game_id: str, achievement: Achievement) -> None:
"""Notify the client to unlock an achievement for a specific game.
:param game_id: game_id of the game for which to unlock an achievement.
:param game_id: the id of the game for which to unlock an achievement.
:param achievement: achievement to unlock.
"""
params = {
@@ -439,7 +452,7 @@ class Plugin:
"""
def shutdown(self) -> None:
async def shutdown(self) -> None:
"""This method is called on integration shutdown.
Override it to implement tear down.
This method is called by the GOG Galaxy Client."""
@@ -545,15 +558,22 @@ class Plugin:
finally:
self._achievements_import_finished()
self._achievements_import_in_progress = False
self.achievements_import_complete()
self.create_task(import_games_achievements(game_ids, context), "Games unlocked achievements import")
self._external_task_manager.create_task(
import_games_achievements(game_ids, context),
"unlocked achievements import",
handle_exceptions=False
)
self._achievements_import_in_progress = True
async def prepare_achievements_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_unlocked_achievements.
This allows for optimizations like batch requests to platform API.
Default implementation returns None.
:param game_ids: the ids of the games for which achievements are imported
:return: context
"""
return None
@@ -562,12 +582,17 @@ class Plugin:
for the game identified by the provided game_id.
This method is called by import task initialized by GOG Galaxy Client.
:param game_id:
:param context: Value return from :meth:`prepare_achievements_context`
:return:
:param game_id: the id of the game for which the achievements are returned
:param context: the value returned from :meth:`prepare_achievements_context`
:return: list of Achievement objects
"""
raise NotImplementedError()
def achievements_import_complete(self):
"""Override this method to handle operations after achievements import is finished
(like updating cache).
"""
async def get_local_games(self) -> List[LocalGame]:
"""Override this method to return the list of
games present locally on the users pc.
@@ -595,7 +620,7 @@ class Plugin:
identified by the provided game_id.
This method is called by the GOG Galaxy Client.
:param str game_id: id of the game to launch
:param str game_id: the id of the game to launch
Example of possible override of the method:
@@ -613,7 +638,7 @@ class Plugin:
identified by the provided game_id.
This method is called by the GOG Galaxy Client.
:param str game_id: id of the game to install
:param str game_id: the id of the game to install
Example of possible override of the method:
@@ -631,7 +656,7 @@ class Plugin:
identified by the provided game_id.
This method is called by the GOG Galaxy Client.
:param str game_id: id of the game to uninstall
:param str game_id: the id of the game to uninstall
Example of possible override of the method:
@@ -649,6 +674,11 @@ class Plugin:
This method is called by the GOG Galaxy Client."""
raise NotImplementedError()
async def launch_platform_client(self) -> None:
"""Override this method to launch platform client. Preferably minimized to tray.
This method is called by the GOG Galaxy Client."""
raise NotImplementedError()
async def get_friends(self) -> List[FriendInfo]:
"""Override this method to return the friends list
of the currently authenticated user.
@@ -692,14 +722,22 @@ class Plugin:
finally:
self._game_times_import_finished()
self._game_times_import_in_progress = False
self.game_times_import_complete()
self.create_task(import_game_times(game_ids, context), "Game times import")
self._external_task_manager.create_task(
import_game_times(game_ids, context),
"game times import",
handle_exceptions=False
)
self._game_times_import_in_progress = True
async def prepare_game_times_context(self, game_ids: List[str]) -> Any:
"""Override this method to prepare context for get_game_time.
This allows for optimizations like batch requests to platform API.
Default implementation returns None.
:param game_ids: the ids of the games for which game time are imported
:return: context
"""
return None
@@ -708,12 +746,17 @@ class Plugin:
identified by the provided game_id.
This method is called by import task initialized by GOG Galaxy Client.
:param game_id:
:param context: Value return from :meth:`prepare_game_times_context`
:return:
:param game_id: the id of the game for which the game time is returned
:param context: the value returned from :meth:`prepare_game_times_context`
:return: GameTime object
"""
raise NotImplementedError()
def game_times_import_complete(self) -> None:
"""Override this method to handle operations after game times import is finished
(like updating cache).
"""
def create_and_run_plugin(plugin_class, argv):
"""Call this method as an entry point for the implemented integration.
@@ -756,8 +799,8 @@ def create_and_run_plugin(plugin_class, argv):
reader, writer = await asyncio.open_connection("127.0.0.1", port)
extra_info = writer.get_extra_info("sockname")
logging.info("Using local address: %s:%u", *extra_info)
plugin = plugin_class(reader, writer, token)
await plugin.run()
async with plugin_class(reader, writer, token) as plugin:
await plugin.run()
try:
if sys.platform == "win32":

View File

@@ -1,3 +1,34 @@
"""
This module standarize http traffic and the error handling for further communication with the GOG Galaxy 2.0.
It is recommended to use provided convenient methods for HTTP requests, especially when dealing with authorized sessions.
Examplary simple web service could looks like:
.. code-block:: python
import logging
from galaxy.http import create_client_session, handle_exception
class BackendClient:
AUTH_URL = 'my-integration.com/auth'
HEADERS = {
"My-Custom-Header": "true",
}
def __init__(self):
self._session = create_client_session(headers=self.HEADERS)
async def authenticate(self):
await self._session.request('POST', self.AUTH_URL)
async def close(self):
# to be called on plugin shutdown
await self._session.close()
async def _authorized_request(self, method, url, *args, **kwargs):
with handle_exceptions():
return await self._session.request(method, url, *args, **kwargs)
"""
import asyncio
import ssl
from contextlib import contextmanager
@@ -13,17 +44,23 @@ from galaxy.api.errors import (
)
#: Default limit of the simultaneous connections for ssl connector.
DEFAULT_LIMIT = 20
DEFAULT_TIMEOUT = 60 # seconds
#: Default timeout in seconds used for client session.
DEFAULT_TIMEOUT = 60
class HttpClient:
"""Deprecated"""
"""
.. deprecated:: 0.41
Use http module functions instead
"""
def __init__(self, limit=DEFAULT_LIMIT, timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT), cookie_jar=None):
connector = create_tcp_connector(limit=limit)
self._session = create_client_session(connector=connector, timeout=timeout, cookie_jar=cookie_jar)
async def close(self):
"""Closes connection. Should be called in :meth:`~galaxy.api.plugin.Plugin.shutdown`"""
await self._session.close()
async def request(self, method, url, *args, **kwargs):
@@ -31,23 +68,50 @@ class HttpClient:
return await self._session.request(method, url, *args, **kwargs)
def create_tcp_connector(*args, **kwargs):
def create_tcp_connector(*args, **kwargs) -> aiohttp.TCPConnector:
"""
Creates TCP connector with resonable defaults.
For details about available parameters refer to
`aiohttp.TCPConnector <https://docs.aiohttp.org/en/stable/client_reference.html#tcpconnector>`_
"""
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(certifi.where())
kwargs.setdefault("ssl", ssl_context)
kwargs.setdefault("limit", DEFAULT_LIMIT)
return aiohttp.TCPConnector(*args, **kwargs)
return aiohttp.TCPConnector(*args, **kwargs) # type: ignore due to https://github.com/python/mypy/issues/4001
def create_client_session(*args, **kwargs):
def create_client_session(*args, **kwargs) -> aiohttp.ClientSession:
"""
Creates client session with resonable defaults.
For details about available parameters refer to
`aiohttp.ClientSession <https://docs.aiohttp.org/en/stable/client_reference.html>`_
Examplary customization:
.. code-block:: python
from galaxy.http import create_client_session, create_tcp_connector
session = create_client_session(
headers={
"Keep-Alive": "true"
},
connector=create_tcp_connector(limit=40),
timeout=100)
"""
kwargs.setdefault("connector", create_tcp_connector())
kwargs.setdefault("timeout", aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT))
kwargs.setdefault("raise_for_status", True)
return aiohttp.ClientSession(*args, **kwargs)
return aiohttp.ClientSession(*args, **kwargs) # type: ignore due to https://github.com/python/mypy/issues/4001
@contextmanager
def handle_exception():
"""
Context manager translating network related exceptions
to custom :mod:`~galaxy.api.errors`.
"""
try:
yield
except asyncio.TimeoutError:
@@ -78,4 +142,3 @@ def handle_exception():
except aiohttp.ClientError:
logging.exception("Caught exception while performing request")
raise UnknownError()

View File

@@ -1,11 +1,8 @@
import platform
import sys
from dataclasses import dataclass
from typing import Iterable, NewType, Optional, Set
from typing import Iterable, NewType, Optional, List, cast
def is_windows():
return platform.system() == "Windows"
ProcessId = NewType("ProcessId", int)
@@ -16,7 +13,7 @@ class ProcessInfo:
binary_path: Optional[str]
if is_windows():
if sys.platform == "win32":
from ctypes import byref, sizeof, windll, create_unicode_buffer, FormatError, WinError
from ctypes.wintypes import DWORD
@@ -25,14 +22,14 @@ if is_windows():
_PROC_ID_T = DWORD
list_size = 4096
def try_get_pids(list_size: int) -> Set[ProcessId]:
def try_get_pids(list_size: int) -> List[ProcessId]:
result_size = DWORD()
proc_id_list = (_PROC_ID_T * list_size)()
if not windll.psapi.EnumProcesses(byref(proc_id_list), sizeof(proc_id_list), byref(result_size)):
raise WinError(descr="Failed to get process ID list: %s" % FormatError())
raise WinError(descr="Failed to get process ID list: %s" % FormatError()) # type: ignore
return proc_id_list[:int(result_size.value / sizeof(_PROC_ID_T()))]
return cast(List[ProcessId], proc_id_list[:int(result_size.value / sizeof(_PROC_ID_T()))])
while True:
proc_ids = try_get_pids(list_size)
@@ -59,7 +56,7 @@ if is_windows():
exe_path_buffer = create_unicode_buffer(_MAX_PATH)
exe_path_len = DWORD(len(exe_path_buffer))
return exe_path_buffer[:exe_path_len.value] if windll.kernel32.QueryFullProcessImageNameW(
return cast(str, exe_path_buffer[:exe_path_len.value]) if windll.kernel32.QueryFullProcessImageNameW(
h_process, _WIN32_PATH_FORMAT, exe_path_buffer, byref(exe_path_len)
) else None
@@ -86,6 +83,6 @@ else:
return process_info
def process_iter() -> Iterable[ProcessInfo]:
def process_iter() -> Iterable[Optional[ProcessInfo]]:
for pid in pids():
yield get_process_info(pid)

View File

@@ -0,0 +1,49 @@
import asyncio
import logging
from collections import OrderedDict
from itertools import count
class TaskManager:
def __init__(self, name):
self._name = name
self._tasks = OrderedDict()
self._task_counter = count()
def create_task(self, coro, description, handle_exceptions=True):
"""Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown"""
async def task_wrapper(task_id):
try:
result = await coro
logging.debug("Task manager %s: finished task %d (%s)", self._name, task_id, description)
return result
except asyncio.CancelledError:
if handle_exceptions:
logging.debug("Task manager %s: canceled task %d (%s)", self._name, task_id, description)
else:
raise
except Exception:
if handle_exceptions:
logging.exception("Task manager %s: exception raised in task %d (%s)", self._name, task_id, description)
else:
raise
finally:
del self._tasks[task_id]
task_id = next(self._task_counter)
logging.debug("Task manager %s: creating task %d (%s)", self._name, task_id, description)
task = asyncio.create_task(task_wrapper(task_id))
self._tasks[task_id] = task
return task
def cancel(self):
for task in self._tasks.values():
task.cancel()
async def wait(self):
# Tasks can spawn other tasks
while True:
tasks = self._tasks.values()
if not tasks:
return
await asyncio.gather(*tasks, return_exceptions=True)

View File

@@ -4,8 +4,8 @@ from unittest.mock import MagicMock
class AsyncMock(MagicMock):
"""
..deprecated:: 0.45
Use: :class:`MagicMock` with meth:`~.async_return_value`.
.. deprecated:: 0.45
Use: :class:`MagicMock` with meth:`~.async_return_value`.
"""
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)
@@ -13,8 +13,8 @@ class AsyncMock(MagicMock):
def coroutine_mock():
"""
..deprecated:: 0.45
Use: :class:`MagicMock` with meth:`~.async_return_value`.
.. deprecated:: 0.45
Use: :class:`MagicMock` with meth:`~.async_return_value`.
"""
coro = MagicMock(name="CoroutineResult")
corofunc = MagicMock(name="CoroutineFunction", side_effect=asyncio.coroutine(coro))

View File

@@ -7,11 +7,8 @@ def create_message(request):
def get_messages(write_mock):
messages = []
print("call_args_list", write_mock.call_args_list)
for call_args in write_mock.call_args_list:
print("call_args", call_args)
data = call_args[0][0]
print("data", data)
for line in data.splitlines():
message = json.loads(line)
messages.append(message)

View File

@@ -6,6 +6,7 @@ import pytest
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
from galaxy.unittest.mock import async_return_value
@pytest.fixture()
def reader():
@@ -16,8 +17,7 @@ def reader():
@pytest.fixture()
async def writer():
stream = MagicMock(name="stream_writer")
stream.write = MagicMock()
stream.drain = MagicMock()
stream.drain.side_effect = lambda: async_return_value(None)
yield stream
@pytest.fixture()
@@ -29,7 +29,7 @@ def write(writer):
yield writer.write
@pytest.fixture()
def plugin(reader, writer):
async def plugin(reader, writer):
"""Return plugin instance with all feature methods mocked"""
methods = (
"handshake_complete",
@@ -37,13 +37,16 @@ def plugin(reader, writer):
"get_owned_games",
"prepare_achievements_context",
"get_unlocked_achievements",
"achievements_import_complete",
"get_local_games",
"launch_game",
"launch_platform_client",
"install_game",
"uninstall_game",
"get_friends",
"get_game_time",
"prepare_game_times_context",
"game_times_import_complete",
"shutdown_platform_client",
"shutdown",
"tick"
@@ -52,7 +55,10 @@ def plugin(reader, writer):
with ExitStack() as stack:
for method in methods:
stack.enter_context(patch.object(Plugin, method))
yield Plugin(Platform.Generic, "0.1", reader, writer, "token")
async with Plugin(Platform.Generic, "0.1", reader, writer, "token") as plugin:
plugin.shutdown.return_value = async_return_value(None)
yield plugin
@pytest.fixture(autouse=True)

View File

@@ -40,6 +40,7 @@ async def test_get_unlocked_achievements_success(plugin, read, write):
await plugin.run()
plugin.prepare_achievements_context.assert_called_with(["14"])
plugin.get_unlocked_achievements.assert_called_with("14", 5)
plugin.achievements_import_complete.asert_called_with()
assert get_messages(write) == [
{
@@ -97,6 +98,7 @@ async def test_get_unlocked_achievements_error(exception, code, message, plugin,
plugin.get_unlocked_achievements.side_effect = exception
await plugin.run()
plugin.get_unlocked_achievements.assert_called()
plugin.achievements_import_complete.asert_called_with()
assert get_messages(write) == [
{
@@ -134,6 +136,7 @@ async def test_prepare_get_unlocked_achievements_context_error(plugin, read, wri
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
await plugin.run()
assert get_messages(write) == [
@@ -151,6 +154,7 @@ async def test_prepare_get_unlocked_achievements_context_error(plugin, read, wri
@pytest.mark.asyncio
async def test_import_in_progress(plugin, read, write):
plugin.prepare_achievements_context.return_value = async_return_value(None)
plugin.get_unlocked_achievements.return_value = async_return_value([])
requests = [
{
"jsonrpc": "2.0",
@@ -177,21 +181,20 @@ async def test_import_in_progress(plugin, read, write):
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
messages = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in messages
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
]
} in messages
@pytest.mark.asyncio

View File

@@ -13,7 +13,8 @@ def test_base_class():
Feature.ImportAchievements,
Feature.ImportGameTime,
Feature.ImportFriends,
Feature.ShutdownPlatformClient
Feature.ShutdownPlatformClient,
Feature.LaunchPlatformClient
}

View File

@@ -31,6 +31,7 @@ async def test_get_game_time_success(plugin, read, write):
call("5", "abc"),
call("7", "abc"),
])
plugin.game_times_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
@@ -96,6 +97,7 @@ async def test_get_game_time_error(exception, code, message, plugin, read, write
plugin.get_game_time.side_effect = exception
await plugin.run()
plugin.get_game_time.assert_called()
plugin.game_times_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
@@ -177,21 +179,20 @@ async def test_import_in_progress(plugin, read, write):
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
messages = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in messages
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
]
} in messages
@pytest.mark.asyncio

View File

@@ -3,6 +3,8 @@ from http import HTTPStatus
import aiohttp
import pytest
from multidict import CIMultiDict, CIMultiDictProxy
from yarl import URL
from galaxy.api.errors import (
AccessDenied, AuthenticationRequired, BackendTimeout, BackendNotAvailable, BackendError, NetworkError,
@@ -10,7 +12,7 @@ from galaxy.api.errors import (
)
from galaxy.http import handle_exception
request_info = aiohttp.RequestInfo("http://o.pl", "GET", {})
request_info = aiohttp.RequestInfo(URL("http://o.pl"), "GET", CIMultiDictProxy(CIMultiDict()))
@pytest.mark.parametrize(
"aiohttp_exception,expected_exception_type",
@@ -18,15 +20,15 @@ request_info = aiohttp.RequestInfo("http://o.pl", "GET", {})
(asyncio.TimeoutError(), BackendTimeout),
(aiohttp.ServerDisconnectedError(), BackendNotAvailable),
(aiohttp.ClientConnectionError(), NetworkError),
(aiohttp.ContentTypeError(request_info, []), UnknownBackendResponse),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.UNAUTHORIZED), AuthenticationRequired),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.FORBIDDEN), AccessDenied),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.SERVICE_UNAVAILABLE), BackendNotAvailable),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.TOO_MANY_REQUESTS), TooManyRequests),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.INTERNAL_SERVER_ERROR), BackendError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.NOT_IMPLEMENTED), BackendError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.BAD_REQUEST), UnknownError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.NOT_FOUND), UnknownError),
(aiohttp.ContentTypeError(request_info, ()), UnknownBackendResponse),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.UNAUTHORIZED), AuthenticationRequired),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.FORBIDDEN), AccessDenied),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.SERVICE_UNAVAILABLE), BackendNotAvailable),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.TOO_MANY_REQUESTS), TooManyRequests),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.INTERNAL_SERVER_ERROR), BackendError),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.NOT_IMPLEMENTED), BackendError),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.BAD_REQUEST), UnknownError),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.NOT_FOUND), UnknownError),
(aiohttp.ClientError(), UnknownError)
]
)

View File

@@ -46,6 +46,7 @@ async def test_shutdown(plugin, read, write):
}
read.side_effect = [async_return_value(create_message(request))]
await plugin.run()
await plugin.wait_closed()
plugin.shutdown.assert_called_with()
assert get_messages(write) == [
{

View File

@@ -0,0 +1,17 @@
import pytest
from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "launch_platform_client"
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.launch_platform_client.return_value = async_return_value(None)
await plugin.run()
plugin.launch_platform_client.assert_called_with()