Compare commits

...

6 Commits
0.53 ... 0.55

Author SHA1 Message Date
Romuald Bierbasz
1585bab203 Wait for drain before writing 2019-10-22 11:30:01 +02:00
Aleksej Pawlowskij
92caf682d8 Increment version 2019-10-21 16:13:09 +02:00
Aleksej Pawlowskij
062d6a9428 Add user presence import feature 2019-10-21 16:11:10 +02:00
Romuald Juchnowicz-Bierbasz
c874bc1d6e Increment version 2019-10-21 14:54:23 +02:00
Romuald Bierbasz
2dc56571d6 Revert "Add more logs"
This reverts commit 21ab8bf33d3c2714b8c7af2da8036fe8baae20ea.
2019-10-21 14:50:33 +02:00
Romuald Bierbasz
eb216a50a8 Fix mypy ignores 2019-10-21 14:20:21 +02:00
17 changed files with 400 additions and 23 deletions

1
.gitignore vendored
View File

@@ -7,3 +7,4 @@ docs/build/
Pipfile
.idea
docs/source/_build
.mypy_cache

View File

@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="galaxy.plugin.api",
version="0.53",
version="0.55",
description="GOG Galaxy Integrations Python API",
author='Galaxy team',
author_email='galaxy@gog.com',

View File

@@ -113,6 +113,7 @@ class Feature(Enum):
LaunchPlatformClient = "LaunchPlatformClient"
ImportGameLibrarySettings = "ImportGameLibrarySettings"
ImportOSCompatibility = "ImportOSCompatibility"
ImportUserPresence = "ImportUserPresence"
class LicenseType(Enum):
@@ -140,3 +141,11 @@ class OSCompatibility(Flag):
Windows = 0b001
MacOS = 0b010
Linux = 0b100
class PresenceState(Enum):
""""Possible states of a user."""
Unknown = "unknown"
Online = "online"
Offline = "offline"
Away = "away"

View File

@@ -88,6 +88,7 @@ class Server():
self._methods = {}
self._notifications = {}
self._task_manager = TaskManager("jsonrpc server")
self._write_lock = asyncio.Lock()
def register_method(self, name, callback, immediate, sensitive_params=False):
"""
@@ -129,8 +130,9 @@ class Server():
await asyncio.sleep(0) # To not starve task queue
def close(self):
logging.info("Closing JSON-RPC server - not more messages will be read")
self._active = False
if self._active:
logging.info("Closing JSON-RPC server - not more messages will be read")
self._active = False
async def wait_closed(self):
await self._task_manager.wait()
@@ -222,12 +224,16 @@ class Server():
raise InvalidRequest()
def _send(self, data):
async def send_task(data_):
async with self._write_lock:
self._writer.write(data_)
await self._writer.drain()
try:
line = self._encoder.encode(data)
logging.debug("Sending data: %s", line)
data = (line + "\n").encode("utf-8")
self._writer.write(data)
self._task_manager.create_task(self._writer.drain(), "drain")
self._task_manager.create_task(send_task(data), "send")
except TypeError as error:
logging.error(str(error))
@@ -262,6 +268,7 @@ class NotificationClient():
self._encoder = encoder
self._methods = {}
self._task_manager = TaskManager("notification client")
self._write_lock = asyncio.Lock()
def notify(self, method, params, sensitive_params=False):
"""
@@ -281,15 +288,20 @@ class NotificationClient():
self._send(notification)
async def close(self):
self._task_manager.cancel()
await self._task_manager.wait()
def _send(self, data):
async def send_task(data_):
async with self._write_lock:
self._writer.write(data_)
await self._writer.drain()
try:
line = self._encoder.encode(data)
data = (line + "\n").encode("utf-8")
logging.debug("Sending %d byte of data", len(data))
self._writer.write(data)
self._task_manager.create_task(self._writer.drain(), "drain")
self._task_manager.create_task(send_task(data), "send")
except TypeError as error:
logging.error("Failed to parse outgoing message: %s", str(error))

View File

@@ -10,7 +10,9 @@ from typing import Any, Dict, List, Optional, Set, Union
from galaxy.api.consts import Feature, OSCompatibility
from galaxy.api.errors import ImportInProgress, UnknownError
from galaxy.api.jsonrpc import ApplicationError, NotificationClient, Server
from galaxy.api.types import Achievement, Authentication, FriendInfo, Game, GameTime, LocalGame, NextStep, GameLibrarySettings
from galaxy.api.types import (
Achievement, Authentication, FriendInfo, Game, GameLibrarySettings, GameTime, LocalGame, NextStep, UserPresence
)
from galaxy.task_manager import TaskManager
@@ -49,6 +51,7 @@ class Plugin:
self._game_times_import_in_progress = False
self._game_library_settings_import_in_progress = False
self._os_compatibility_import_in_progress = False
self._user_presence_import_in_progress = False
self._persistent_cache = dict()
@@ -118,6 +121,9 @@ class Plugin:
self._register_method("start_os_compatibility_import", self._start_os_compatibility_import)
self._detect_feature(Feature.ImportOSCompatibility, ["get_os_compatibility"])
self._register_method("start_user_presence_import", self._start_user_presence_import)
self._detect_feature(Feature.ImportUserPresence, ["get_user_presence"])
async def __aenter__(self):
return self
@@ -184,6 +190,7 @@ class Plugin:
async def run(self):
"""Plugin's main coroutine."""
await self._server.run()
logging.debug("Plugin run loop finished")
def close(self) -> None:
if not self._active:
@@ -196,10 +203,12 @@ class Plugin:
self._active = False
async def wait_closed(self) -> None:
logging.debug("Waiting for plugin to close")
await self._external_task_manager.wait()
await self._internal_task_manager.wait()
await self._server.wait_closed()
await self._notification_client.close()
logging.debug("Plugin closed")
def create_task(self, coro, description):
"""Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown"""
@@ -262,7 +271,7 @@ class Plugin:
"""
# temporary solution for persistent_cache vs credentials issue
self.persistent_cache['credentials'] = credentials # type: ignore
self.persistent_cache["credentials"] = credentials # type: ignore
self._notification_client.notify("store_credentials", credentials, sensitive_params=True)
@@ -447,6 +456,27 @@ class Plugin:
def _os_compatibility_import_finished(self) -> None:
self._notification_client.notify("os_compatibility_import_finished", None)
def _user_presence_import_success(self, user_id: str, user_presence: UserPresence) -> None:
self._notification_client.notify(
"user_presence_import_success",
{
"user_id": user_id,
"presence": user_presence
}
)
def _user_presence_import_failure(self, user_id: str, error: ApplicationError) -> None:
self._notification_client.notify(
"user_presence_import_failure",
{
"user_id": user_id,
"error": error.json()
}
)
def _user_presence_import_finished(self) -> None:
self._notification_client.notify("user_presence_import_finished", None)
def lost_authentication(self) -> None:
"""Notify the client that integration has lost authentication for the
current user and is unable to perform actions which would require it.
@@ -909,6 +939,62 @@ class Plugin:
def os_compatibility_import_complete(self) -> None:
"""Override this method to handle operations after OS compatibility import is finished (like updating cache)."""
async def _start_user_presence_import(self, user_ids: List[str]) -> None:
if self._user_presence_import_in_progress:
raise ImportInProgress()
context = await self.prepare_user_presence_context(user_ids)
async def import_user_presence(user_id, context_) -> None:
try:
self._user_presence_import_success(user_id, await self.get_user_presence(user_id, context_))
except ApplicationError as error:
self._user_presence_import_failure(user_id, error)
except Exception:
logging.exception("Unexpected exception raised in import_user_presence")
self._user_presence_import_failure(user_id, UnknownError())
async def import_user_presence_set(user_ids_, context_) -> None:
try:
await asyncio.gather(*[
import_user_presence(user_id, context_)
for user_id in user_ids_
])
finally:
self._user_presence_import_finished()
self._user_presence_import_in_progress = False
self.user_presence_import_complete()
self._external_task_manager.create_task(
import_user_presence_set(user_ids, context),
"user presence import",
handle_exceptions=False
)
self._user_presence_import_in_progress = True
async def prepare_user_presence_context(self, user_ids: List[str]) -> Any:
"""Override this method to prepare context for get_user_presence.
This allows for optimizations like batch requests to platform API.
Default implementation returns None.
:param user_ids: the ids of the users for whom presence information is imported
:return: context
"""
return None
async def get_user_presence(self, user_id: str, context: Any) -> UserPresence:
"""Override this method to return presence information for the user with the provided user_id.
This method is called by import task initialized by GOG Galaxy Client.
:param user_id: the id of the user for whom presence information is imported
:param context: the value returned from :meth:`prepare_user_presence_context`
:return: UserPresence presence information of the provided user
"""
raise NotImplementedError()
def user_presence_import_complete(self) -> None:
"""Override this method to handle operations after presence import is finished (like updating cache)."""
def create_and_run_plugin(plugin_class, argv):
"""Call this method as an entry point for the implemented integration.

View File

@@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Dict, List, Optional
from galaxy.api.consts import LicenseType, LocalGameState
from galaxy.api.consts import LicenseType, LocalGameState, PresenceState
@dataclass
@@ -174,3 +174,18 @@ class GameLibrarySettings:
game_id: str
tags: Optional[List[str]]
hidden: Optional[bool]
@dataclass
class UserPresence:
"""Presence information of a user.
:param presence_state: the state of the user
:param game_id: id of the game a user is currently in
:param game_title: name of the game a user is currently in
:param presence_status: detailed user's presence description
"""
presence_state: PresenceState
game_id: Optional[str] = None
game_title: Optional[str] = None
presence_status: Optional[str] = None

View File

@@ -78,7 +78,8 @@ def create_tcp_connector(*args, **kwargs) -> aiohttp.TCPConnector:
ssl_context.load_verify_locations(certifi.where())
kwargs.setdefault("ssl", ssl_context)
kwargs.setdefault("limit", DEFAULT_LIMIT)
return aiohttp.TCPConnector(*args, **kwargs) # type: ignore due to https://github.com/python/mypy/issues/4001
# due to https://github.com/python/mypy/issues/4001
return aiohttp.TCPConnector(*args, **kwargs) # type: ignore
def create_client_session(*args, **kwargs) -> aiohttp.ClientSession:
@@ -103,7 +104,8 @@ def create_client_session(*args, **kwargs) -> aiohttp.ClientSession:
kwargs.setdefault("connector", create_tcp_connector())
kwargs.setdefault("timeout", aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT))
kwargs.setdefault("raise_for_status", True)
return aiohttp.ClientSession(*args, **kwargs) # type: ignore due to https://github.com/python/mypy/issues/4001
# due to https://github.com/python/mypy/issues/4001
return aiohttp.ClientSession(*args, **kwargs) # type: ignore
@contextmanager

View File

@@ -1,33 +1,38 @@
from contextlib import ExitStack
import logging
from unittest.mock import patch, MagicMock
from contextlib import ExitStack
from unittest.mock import MagicMock, patch
import pytest
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
from galaxy.api.plugin import Plugin
from galaxy.unittest.mock import async_return_value
@pytest.fixture()
def reader():
stream = MagicMock(name="stream_reader")
stream.read = MagicMock()
yield stream
@pytest.fixture()
async def writer():
stream = MagicMock(name="stream_writer")
stream.drain.side_effect = lambda: async_return_value(None)
yield stream
@pytest.fixture()
def read(reader):
yield reader.read
@pytest.fixture()
def write(writer):
yield writer.write
@pytest.fixture()
async def plugin(reader, writer):
"""Return plugin instance with all feature methods mocked"""
@@ -56,6 +61,9 @@ async def plugin(reader, writer):
"get_os_compatibility",
"prepare_os_compatibility_context",
"os_compatibility_import_complete",
"get_user_presence",
"prepare_user_presence_context",
"user_presence_import_complete",
)
with ExitStack() as stack:

View File

@@ -5,7 +5,7 @@ from pytest import raises
from galaxy.api.types import Achievement
from galaxy.api.errors import BackendError
from galaxy.unittest.mock import async_return_value
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
@@ -201,6 +201,7 @@ async def test_import_in_progress(plugin, read, write):
async def test_unlock_achievement(plugin, write):
achievement = Achievement(achievement_id="lvl20", unlock_time=1548422395)
plugin.unlock_achievement("14", achievement)
await skip_loop()
response = json.loads(write.call_args[0][0])
assert response == {

View File

@@ -5,7 +5,7 @@ from galaxy.api.errors import (
UnknownError, InvalidCredentials, NetworkError, LoggedInElsewhere, ProtocolError,
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied
)
from galaxy.unittest.mock import async_return_value
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
@@ -97,6 +97,7 @@ async def test_store_credentials(plugin, write):
"token": "ABC"
}
plugin.store_credentials(credentials)
await skip_loop()
assert get_messages(write) == [
{
@@ -110,6 +111,7 @@ async def test_store_credentials(plugin, write):
@pytest.mark.asyncio
async def test_lost_authentication(plugin, write):
plugin.lost_authentication()
await skip_loop()
assert get_messages(write) == [
{

View File

@@ -16,7 +16,8 @@ def test_base_class():
Feature.ShutdownPlatformClient,
Feature.LaunchPlatformClient,
Feature.ImportGameLibrarySettings,
Feature.ImportOSCompatibility
Feature.ImportOSCompatibility,
Feature.ImportUserPresence
}

View File

@@ -1,6 +1,6 @@
from galaxy.api.types import FriendInfo
from galaxy.api.errors import UnknownError
from galaxy.unittest.mock import async_return_value
from galaxy.unittest.mock import async_return_value, skip_loop
import pytest
@@ -67,6 +67,7 @@ async def test_add_friend(plugin, write):
friend = FriendInfo("7", "Kuba")
plugin.add_friend(friend)
await skip_loop()
assert get_messages(write) == [
{
@@ -82,6 +83,7 @@ async def test_add_friend(plugin, write):
@pytest.mark.asyncio
async def test_remove_friend(plugin, write):
plugin.remove_friend("5")
await skip_loop()
assert get_messages(write) == [
{

View File

@@ -3,7 +3,7 @@ from unittest.mock import call
import pytest
from galaxy.api.types import GameTime
from galaxy.api.errors import BackendError
from galaxy.unittest.mock import async_return_value
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
@@ -199,6 +199,7 @@ async def test_import_in_progress(plugin, read, write):
async def test_update_game(plugin, write):
game_time = GameTime("3", 60, 1549550504)
plugin.update_game_time(game_time)
await skip_loop()
assert get_messages(write) == [
{

View File

@@ -3,7 +3,7 @@ import pytest
from galaxy.api.types import LocalGame
from galaxy.api.consts import LocalGameState
from galaxy.api.errors import UnknownError, FailedParsingManifest
from galaxy.unittest.mock import async_return_value
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
@@ -83,6 +83,7 @@ async def test_failure(plugin, read, write, error, code, message):
async def test_local_game_state_update(plugin, write):
game = LocalGame("1", LocalGameState.Running)
plugin.update_local_game_status(game)
await skip_loop()
assert get_messages(write) == [
{

View File

@@ -3,7 +3,7 @@ import pytest
from galaxy.api.types import Game, Dlc, LicenseInfo
from galaxy.api.consts import LicenseType
from galaxy.api.errors import UnknownError
from galaxy.unittest.mock import async_return_value
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
@@ -100,6 +100,7 @@ async def test_failure(plugin, read, write):
async def test_add_game(plugin, write):
game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None))
plugin.add_game(game)
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
@@ -120,6 +121,7 @@ async def test_add_game(plugin, write):
@pytest.mark.asyncio
async def test_remove_game(plugin, write):
plugin.remove_game("5")
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
@@ -135,6 +137,7 @@ async def test_remove_game(plugin, write):
async def test_update_game(plugin, write):
game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None))
plugin.update_game(game)
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",

View File

@@ -1,6 +1,6 @@
import pytest
from galaxy.unittest.mock import async_return_value
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
@@ -57,6 +57,7 @@ async def test_set_cache(plugin, write, cache_data):
plugin.persistent_cache.update(cache_data)
plugin.push_cache()
await skip_loop()
assert_rpc_request(write, "push_cache", cache_data)
assert cache_data == plugin.persistent_cache
@@ -68,6 +69,7 @@ async def test_clear_cache(plugin, write, cache_data):
plugin.persistent_cache.clear()
plugin.push_cache()
await skip_loop()
assert_rpc_request(write, "push_cache", {})
assert {} == plugin.persistent_cache

231
tests/test_user_presence.py Normal file
View File

@@ -0,0 +1,231 @@
from unittest.mock import call
import pytest
from galaxy.api.consts import PresenceState
from galaxy.api.errors import BackendError
from galaxy.api.types import UserPresence
from galaxy.unittest.mock import async_return_value
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_user_presence_success(plugin, read, write):
context = "abc"
user_ids = ["666", "13", "42", "69"]
plugin.prepare_user_presence_context.return_value = async_return_value(context)
request = {
"jsonrpc": "2.0",
"id": "11",
"method": "start_user_presence_import",
"params": {"user_ids": user_ids}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_user_presence.side_effect = [
async_return_value(UserPresence(
PresenceState.Unknown,
"game-id1",
None,
"unknown state"
)),
async_return_value(UserPresence(
PresenceState.Offline,
None,
None,
"Going to grandma's house"
)),
async_return_value(UserPresence(
PresenceState.Online,
"game-id3",
"game-title3",
"Pew pew"
)),
async_return_value(UserPresence(
PresenceState.Away,
None,
"game-title4",
"AFKKTHXBY"
)),
]
await plugin.run()
plugin.get_user_presence.assert_has_calls([
call(user_id, context) for user_id in user_ids
])
plugin.user_presence_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "11",
"result": None
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "666",
"presence": {
"presence_state": PresenceState.Unknown.value,
"game_id": "game-id1",
"presence_status": "unknown state"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "13",
"presence": {
"presence_state": PresenceState.Offline.value,
"presence_status": "Going to grandma's house"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "42",
"presence": {
"presence_state": PresenceState.Online.value,
"game_id": "game-id3",
"game_title": "game-title3",
"presence_status": "Pew pew"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "69",
"presence": {
"presence_state": PresenceState.Away.value,
"game_title": "game-title4",
"presence_status": "AFKKTHXBY"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_finished",
"params": None
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_user_presence_error(exception, code, message, plugin, read, write):
user_id = "69"
request_id = "55"
plugin.prepare_user_presence_context.return_value = async_return_value(None)
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_user_presence_import",
"params": {"user_ids": [user_id]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_user_presence.side_effect = exception
await plugin.run()
plugin.get_user_presence.assert_called()
plugin.user_presence_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"result": None
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_failure",
"params": {
"user_id": user_id,
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_user_presence_context_error(plugin, read, write):
request_id = "31415"
plugin.prepare_user_presence_context.side_effect = BackendError()
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_user_presence_import",
"params": {"user_ids": ["6"]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": 4,
"message": "Backend error"
}
}
]
@pytest.mark.asyncio
async def test_import_already_in_progress_error(plugin, read, write):
plugin.prepare_user_presence_context.return_value = async_return_value(None)
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_user_presence_import",
"params": {
"user_ids": ["42"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_user_presence_import",
"params": {
"user_ids": ["666"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"", 10)
]
await plugin.run()
responses = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in responses
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
} in responses