Compare commits

...

6 Commits
0.58 ... 0.59

Author SHA1 Message Date
Rafal Makagon
75e5a66fbe Increment version 2019-11-27 13:14:11 +01:00
Mieszko Banczerowski
2a9ec3067d Fix sending Exceptions with custom data 2019-11-27 13:12:20 +01:00
Rafal Makagon
69532a5ba9 fix richpresence parameter name 2019-11-27 13:10:43 +01:00
Romuald Juchnowicz-Bierbasz
f5d47b0167 Add timeout to shutdown 2019-11-22 13:11:08 +01:00
Romuald Juchnowicz-Bierbasz
02f4faa432 Do not use root logger 2019-11-22 13:07:33 +01:00
Romuald Juchnowicz-Bierbasz
3d3922c965 Add async_raise 2019-11-20 17:57:17 +01:00
8 changed files with 87 additions and 61 deletions

View File

@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="galaxy.plugin.api",
version="0.58",
version="0.59",
description="GOG Galaxy Integrations Python API",
author='Galaxy team',
author_email='galaxy@gog.com',

View File

@@ -8,6 +8,10 @@ import json
from galaxy.reader import StreamLineReader
from galaxy.task_manager import TaskManager
logger = logging.getLogger(__name__)
class JsonRpcError(Exception):
def __init__(self, code, message, data=None):
self.code = code
@@ -25,7 +29,7 @@ class JsonRpcError(Exception):
}
if self.data is not None:
obj["error"]["data"] = self.data
obj["data"] = self.data
return obj
@@ -133,7 +137,7 @@ class Connection():
future = loop.create_future()
self._requests_futures[self._last_request_id] = (future, sensitive_params)
logging.info(
logger.info(
"Sending request: id=%s, method=%s, params=%s",
request_id, method, anonymise_sensitive_params(params, sensitive_params)
)
@@ -151,7 +155,7 @@ class Connection():
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
logging.info(
logger.info(
"Sending notification: method=%s, params=%s",
method, anonymise_sensitive_params(params, sensitive_params)
)
@@ -169,20 +173,20 @@ class Connection():
self._eof()
continue
data = data.strip()
logging.debug("Received %d bytes of data", len(data))
logger.debug("Received %d bytes of data", len(data))
self._handle_input(data)
await asyncio.sleep(0) # To not starve task queue
def close(self):
if self._active:
logging.info("Closing JSON-RPC server - not more messages will be read")
logger.info("Closing JSON-RPC server - not more messages will be read")
self._active = False
async def wait_closed(self):
await self._task_manager.wait()
def _eof(self):
logging.info("Received EOF")
logger.info("Received EOF")
self.close()
def _handle_input(self, data):
@@ -204,7 +208,7 @@ class Connection():
request_future = self._requests_futures.get(int(response.id))
if request_future is None:
response_type = "response" if response.result is not None else "error"
logging.warning("Received %s for unknown request: %s", response_type, response.id)
logger.warning("Received %s for unknown request: %s", response_type, response.id)
return
future, sensitive_params = request_future
@@ -225,7 +229,7 @@ class Connection():
def _handle_notification(self, request):
method = self._notifications.get(request.method)
if not method:
logging.error("Received unknown notification: %s", request.method)
logger.error("Received unknown notification: %s", request.method)
return
callback, signature, immediate, sensitive_params = method
@@ -242,12 +246,12 @@ class Connection():
try:
self._task_manager.create_task(callback(*bound_args.args, **bound_args.kwargs), request.method)
except Exception:
logging.exception("Unexpected exception raised in notification handler")
logger.exception("Unexpected exception raised in notification handler")
def _handle_request(self, request):
method = self._methods.get(request.method)
if not method:
logging.error("Received unknown request: %s", request.method)
logger.error("Received unknown request: %s", request.method)
self._send_error(request.id, MethodNotFound())
return
@@ -274,7 +278,7 @@ class Connection():
except asyncio.CancelledError:
self._send_error(request.id, Aborted())
except Exception as e: #pylint: disable=broad-except
logging.exception("Unexpected exception raised in plugin handler")
logger.exception("Unexpected exception raised in plugin handler")
self._send_error(request.id, UnknownError(str(e)))
self._task_manager.create_task(handle(), request.method)
@@ -305,10 +309,10 @@ class Connection():
try:
line = self._encoder.encode(data)
data = (line + "\n").encode("utf-8")
logging.debug("Sending %d byte of data", len(data))
logger.debug("Sending %d byte of data", len(data))
self._task_manager.create_task(send_task(data), "send")
except TypeError as error:
logging.error(str(error))
logger.error(str(error))
def _send_response(self, request_id, result):
response = {
@@ -348,18 +352,18 @@ class Connection():
def _log_request(request, sensitive_params):
params = anonymise_sensitive_params(request.params, sensitive_params)
if request.id is not None:
logging.info("Handling request: id=%s, method=%s, params=%s", request.id, request.method, params)
logger.info("Handling request: id=%s, method=%s, params=%s", request.id, request.method, params)
else:
logging.info("Handling notification: method=%s, params=%s", request.method, params)
logger.info("Handling notification: method=%s, params=%s", request.method, params)
@staticmethod
def _log_response(response, sensitive_params):
result = anonymise_sensitive_params(response.result, sensitive_params)
logging.info("Handling response: id=%s, result=%s", response.id, result)
logger.info("Handling response: id=%s, result=%s", response.id, result)
@staticmethod
def _log_error(response, error, sensitive_params):
data = anonymise_sensitive_params(error.data, sensitive_params)
logging.info("Handling error: id=%s, code=%s, description=%s, data=%s",
logger.info("Handling error: id=%s, code=%s, description=%s, data=%s",
response.id, error.code, error.message, data
)

View File

@@ -2,7 +2,6 @@ import asyncio
import dataclasses
import json
import logging
import logging.handlers
import sys
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Union
@@ -16,6 +15,9 @@ from galaxy.api.types import (
from galaxy.task_manager import TaskManager
logger = logging.getLogger(__name__)
class JSONEncoder(json.JSONEncoder):
def default(self, o): # pylint: disable=method-hidden
if dataclasses.is_dataclass(o):
@@ -33,7 +35,7 @@ class Plugin:
"""Use and override methods of this class to create a new platform integration."""
def __init__(self, platform, version, reader, writer, handshake_token):
logging.info("Creating plugin for platform %s, version %s", platform.value, version)
logger.info("Creating plugin for platform %s, version %s", platform.value, version)
self._platform = platform
self._version = version
@@ -189,24 +191,31 @@ class Plugin:
async def run(self):
"""Plugin's main coroutine."""
await self._connection.run()
logging.debug("Plugin run loop finished")
logger.debug("Plugin run loop finished")
def close(self) -> None:
if not self._active:
return
logging.info("Closing plugin")
logger.info("Closing plugin")
self._connection.close()
self._external_task_manager.cancel()
self._internal_task_manager.create_task(self.shutdown(), "shutdown")
async def shutdown():
try:
await asyncio.wait_for(self.shutdown(), 30)
except asyncio.TimeoutError:
logging.warning("Plugin shutdown timed out")
self._internal_task_manager.create_task(shutdown(), "shutdown")
self._active = False
async def wait_closed(self) -> None:
logging.debug("Waiting for plugin to close")
logger.debug("Waiting for plugin to close")
await self._external_task_manager.wait()
await self._internal_task_manager.wait()
await self._connection.wait_closed()
logging.debug("Plugin closed")
logger.debug("Plugin closed")
def create_task(self, coro, description):
"""Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown"""
@@ -217,11 +226,11 @@ class Plugin:
try:
self.tick()
except Exception:
logging.exception("Unexpected exception raised in plugin tick")
logger.exception("Unexpected exception raised in plugin tick")
await asyncio.sleep(1)
async def _shutdown(self):
logging.info("Shutting down")
logger.info("Shutting down")
self.close()
await self._external_task_manager.wait()
await self._internal_task_manager.wait()
@@ -238,7 +247,7 @@ class Plugin:
try:
self.handshake_complete()
except Exception:
logging.exception("Unhandled exception during `handshake_complete` step")
logger.exception("Unhandled exception during `handshake_complete` step")
self._internal_task_manager.create_task(self._pass_control(), "tick")
@staticmethod
@@ -639,7 +648,7 @@ class Plugin:
except ApplicationError as error:
self._game_achievements_import_failure(game_id, error)
except Exception:
logging.exception("Unexpected exception raised in import_game_achievements")
logger.exception("Unexpected exception raised in import_game_achievements")
self._game_achievements_import_failure(game_id, UnknownError())
async def import_games_achievements(game_ids_, context_):
@@ -803,7 +812,7 @@ class Plugin:
except ApplicationError as error:
self._game_time_import_failure(game_id, error)
except Exception:
logging.exception("Unexpected exception raised in import_game_time")
logger.exception("Unexpected exception raised in import_game_time")
self._game_time_import_failure(game_id, UnknownError())
async def import_game_times(game_ids_, context_):
@@ -861,7 +870,7 @@ class Plugin:
except ApplicationError as error:
self._game_library_settings_import_failure(game_id, error)
except Exception:
logging.exception("Unexpected exception raised in import_game_library_settings")
logger.exception("Unexpected exception raised in import_game_library_settings")
self._game_library_settings_import_failure(game_id, UnknownError())
async def import_game_library_settings_set(game_ids_, context_):
@@ -919,7 +928,7 @@ class Plugin:
except ApplicationError as error:
self._os_compatibility_import_failure(game_id, error)
except Exception:
logging.exception("Unexpected exception raised in import_os_compatibility")
logger.exception("Unexpected exception raised in import_os_compatibility")
self._os_compatibility_import_failure(game_id, UnknownError())
async def import_os_compatibility_set(game_ids_, context_):
@@ -962,11 +971,11 @@ class Plugin:
def os_compatibility_import_complete(self) -> None:
"""Override this method to handle operations after OS compatibility import is finished (like updating cache)."""
async def _start_user_presence_import(self, user_ids: List[str]) -> None:
async def _start_user_presence_import(self, user_id_list: List[str]) -> None:
if self._user_presence_import_in_progress:
raise ImportInProgress()
context = await self.prepare_user_presence_context(user_ids)
context = await self.prepare_user_presence_context(user_id_list)
async def import_user_presence(user_id, context_) -> None:
try:
@@ -974,14 +983,14 @@ class Plugin:
except ApplicationError as error:
self._user_presence_import_failure(user_id, error)
except Exception:
logging.exception("Unexpected exception raised in import_user_presence")
logger.exception("Unexpected exception raised in import_user_presence")
self._user_presence_import_failure(user_id, UnknownError())
async def import_user_presence_set(user_ids_, context_) -> None:
async def import_user_presence_set(user_id_list_, context_) -> None:
try:
await asyncio.gather(*[
import_user_presence(user_id, context_)
for user_id in user_ids_
for user_id in user_id_list_
])
finally:
self._user_presence_import_finished()
@@ -989,18 +998,18 @@ class Plugin:
self.user_presence_import_complete()
self._external_task_manager.create_task(
import_user_presence_set(user_ids, context),
import_user_presence_set(user_id_list, context),
"user presence import",
handle_exceptions=False
)
self._user_presence_import_in_progress = True
async def prepare_user_presence_context(self, user_ids: List[str]) -> Any:
async def prepare_user_presence_context(self, user_id_list: List[str]) -> Any:
"""Override this method to prepare context for get_user_presence.
This allows for optimizations like batch requests to platform API.
Default implementation returns None.
:param user_ids: the ids of the users for whom presence information is imported
:param user_id_list: the ids of the users for whom presence information is imported
:return: context
"""
return None
@@ -1037,7 +1046,7 @@ def create_and_run_plugin(plugin_class, argv):
main()
"""
if len(argv) < 3:
logging.critical("Not enough parameters, required: token, port")
logger.critical("Not enough parameters, required: token, port")
sys.exit(1)
token = argv[1]
@@ -1045,21 +1054,21 @@ def create_and_run_plugin(plugin_class, argv):
try:
port = int(argv[2])
except ValueError:
logging.critical("Failed to parse port value: %s", argv[2])
logger.critical("Failed to parse port value: %s", argv[2])
sys.exit(2)
if not (1 <= port <= 65535):
logging.critical("Port value out of range (1, 65535)")
logger.critical("Port value out of range (1, 65535)")
sys.exit(3)
if not issubclass(plugin_class, Plugin):
logging.critical("plugin_class must be subclass of Plugin")
logger.critical("plugin_class must be subclass of Plugin")
sys.exit(4)
async def coroutine():
reader, writer = await asyncio.open_connection("127.0.0.1", port)
extra_info = writer.get_extra_info("sockname")
logging.info("Using local address: %s:%u", *extra_info)
logger.info("Using local address: %s:%u", *extra_info)
async with plugin_class(reader, writer, token) as plugin:
await plugin.run()
@@ -1069,5 +1078,5 @@ def create_and_run_plugin(plugin_class, argv):
asyncio.run(coroutine())
except Exception:
logging.exception("Error while running plugin")
logger.exception("Error while running plugin")
sys.exit(5)

View File

@@ -44,6 +44,8 @@ from galaxy.api.errors import (
)
logger = logging.getLogger(__name__)
#: Default limit of the simultaneous connections for ssl connector.
DEFAULT_LIMIT = 20
#: Default timeout in seconds used for client session.
@@ -136,11 +138,11 @@ def handle_exception():
if error.status >= 500:
raise BackendError()
if error.status >= 400:
logging.warning(
logger.warning(
"Got status %d while performing %s request for %s",
error.status, error.request_info.method, str(error.request_info.url)
)
raise UnknownError()
except aiohttp.ClientError:
logging.exception("Caught exception while performing request")
logger.exception("Caught exception while performing request")
raise UnknownError()

View File

@@ -3,7 +3,6 @@ from dataclasses import dataclass
from typing import Iterable, NewType, Optional, List, cast
ProcessId = NewType("ProcessId", int)

View File

@@ -3,6 +3,10 @@ import logging
from collections import OrderedDict
from itertools import count
logger = logging.getLogger(__name__)
class TaskManager:
def __init__(self, name):
self._name = name
@@ -15,23 +19,23 @@ class TaskManager:
async def task_wrapper(task_id):
try:
result = await coro
logging.debug("Task manager %s: finished task %d (%s)", self._name, task_id, description)
logger.debug("Task manager %s: finished task %d (%s)", self._name, task_id, description)
return result
except asyncio.CancelledError:
if handle_exceptions:
logging.debug("Task manager %s: canceled task %d (%s)", self._name, task_id, description)
logger.debug("Task manager %s: canceled task %d (%s)", self._name, task_id, description)
else:
raise
except Exception:
if handle_exceptions:
logging.exception("Task manager %s: exception raised in task %d (%s)", self._name, task_id, description)
logger.exception("Task manager %s: exception raised in task %d (%s)", self._name, task_id, description)
else:
raise
finally:
del self._tasks[task_id]
task_id = next(self._task_counter)
logging.debug("Task manager %s: creating task %d (%s)", self._name, task_id, description)
logger.debug("Task manager %s: creating task %d (%s)", self._name, task_id, description)
task = asyncio.create_task(task_wrapper(task_id))
self._tasks[task_id] = task
return task

View File

@@ -21,11 +21,19 @@ def coroutine_mock():
corofunc.coro = coro
return corofunc
async def skip_loop(iterations=1):
for _ in range(iterations):
await asyncio.sleep(0)
async def async_return_value(return_value, loop_iterations_delay=0):
await skip_loop(loop_iterations_delay)
if loop_iterations_delay > 0:
await skip_loop(loop_iterations_delay)
return return_value
async def async_raise(error, loop_iterations_delay=0):
if loop_iterations_delay > 0:
await skip_loop(loop_iterations_delay)
raise error

View File

@@ -12,13 +12,13 @@ from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_user_presence_success(plugin, read, write):
context = "abc"
user_ids = ["666", "13", "42", "69", "22"]
user_id_list = ["666", "13", "42", "69", "22"]
plugin.prepare_user_presence_context.return_value = async_return_value(context)
request = {
"jsonrpc": "2.0",
"id": "11",
"method": "start_user_presence_import",
"params": {"user_ids": user_ids}
"params": {"user_id_list": user_id_list}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_user_presence.side_effect = [
@@ -60,7 +60,7 @@ async def test_get_user_presence_success(plugin, read, write):
]
await plugin.run()
plugin.get_user_presence.assert_has_calls([
call(user_id, context) for user_id in user_ids
call(user_id, context) for user_id in user_id_list
])
plugin.user_presence_import_complete.assert_called_once_with()
@@ -151,7 +151,7 @@ async def test_get_user_presence_error(exception, code, message, plugin, read, w
"jsonrpc": "2.0",
"id": request_id,
"method": "start_user_presence_import",
"params": {"user_ids": [user_id]}
"params": {"user_id_list": [user_id]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_user_presence.side_effect = exception
@@ -192,7 +192,7 @@ async def test_prepare_get_user_presence_context_error(plugin, read, write):
"jsonrpc": "2.0",
"id": request_id,
"method": "start_user_presence_import",
"params": {"user_ids": ["6"]}
"params": {"user_id_list": ["6"]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
await plugin.run()
@@ -218,7 +218,7 @@ async def test_import_already_in_progress_error(plugin, read, write):
"id": "3",
"method": "start_user_presence_import",
"params": {
"user_ids": ["42"]
"user_id_list": ["42"]
}
},
{
@@ -226,7 +226,7 @@ async def test_import_already_in_progress_error(plugin, read, write):
"id": "4",
"method": "start_user_presence_import",
"params": {
"user_ids": ["666"]
"user_id_list": ["666"]
}
}
]