Compare commits

...

10 Commits

Author SHA1 Message Date
Romuald Juchnowicz-Bierbasz
2db9d0f383 Increment version 2019-07-01 14:35:05 +02:00
Mieszko Banczerowski
9d93762867 Workaround for removing creds on push_cache 2019-07-01 14:32:23 +02:00
Romuald Juchnowicz-Bierbasz
c364b716f4 Increment version 2019-07-01 13:16:08 +02:00
Romuald Juchnowicz-Bierbasz
48e1782484 SDK-2893: Optional game time and last played 2019-07-01 13:14:07 +02:00
Romuald Juchnowicz-Bierbasz
ff30675a25 Do not invoke tick before handshake 2019-07-01 12:26:05 +02:00
Aliaksei Paulouski
7b3965ff4b Add poe platform 2019-06-28 15:09:46 +02:00
Piotr Marzec
2ebdfabd9b Path of Exile added 2019-06-28 14:49:56 +02:00
Romuald Juchnowicz-Bierbasz
4e1ea8056d Add StreamLineReader with unit tests 2019-06-28 14:00:44 +02:00
Romuald Juchnowicz-Bierbasz
67e8681de6 Increment version 2019-06-28 11:59:01 +02:00
Romuald Juchnowicz-Bierbasz
77d742ce18 SDK-2910: Fix readline 2019-06-28 11:58:32 +02:00
10 changed files with 121 additions and 36 deletions

View File

@@ -79,4 +79,4 @@ Platform ID list for GOG Galaxy 2.0 Integrations
| psvita | Playstation Vita |
| nds | Nintendo DS |
| 3ds | Nintendo 3DS |
| pathofexile | Path of Exile |

View File

@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="galaxy.plugin.api",
version="0.38",
version="0.40.1",
description="GOG Galaxy Integrations Python API",
author='Galaxy team',
author_email='galaxy@gog.com',

View File

@@ -80,6 +80,7 @@ class Platform(Enum):
PlayStationVita = "psvita"
NintendoDs = "nds"
Nintendo3Ds = "3ds"
PathOfExile = "pathofexile"
class Feature(Enum):
"""Possible features that can be implemented by an integration.

View File

@@ -5,6 +5,8 @@ import logging
import inspect
import json
from galaxy.reader import StreamLineReader
class JsonRpcError(Exception):
def __init__(self, code, message, data=None):
self.code = code
@@ -67,14 +69,12 @@ def anonymise_sensitive_params(params, sensitive_params):
class Server():
def __init__(self, reader, writer, encoder=json.JSONEncoder()):
self._active = True
self._reader = reader
self._reader = StreamLineReader(reader)
self._writer = writer
self._encoder = encoder
self._methods = {}
self._notifications = {}
self._eof_listeners = []
self._input_buffer = bytes()
self._input_buffer_it = 0
def register_method(self, name, callback, internal, sensitive_params=False):
"""
@@ -106,7 +106,7 @@ class Server():
async def run(self):
while self._active:
try:
data = await self._readline()
data = await self._reader.readline()
if not data:
self._eof()
continue
@@ -116,23 +116,7 @@ class Server():
data = data.strip()
logging.debug("Received %d bytes of data", len(data))
self._handle_input(data)
async def _readline(self):
"""Like StreamReader.readline but without limit"""
while True:
chunk = await self._reader.read(1024)
self._input_buffer += chunk
it = self._input_buffer.find(b"\n", self._input_buffer_it)
if it < 0:
if not chunk:
return bytes() # EOF
else:
self._input_buffer_it = len(self._input_buffer)
continue
line = self._input_buffer[:it]
self._input_buffer = self._input_buffer[it+1:]
self._input_buffer_it = 0
return line
await asyncio.sleep(0) # To not starve task queue
def stop(self):
self._active = False

View File

@@ -38,6 +38,7 @@ class Plugin:
self._feature_methods = OrderedDict()
self._active = True
self._pass_control_task = None
self._reader, self._writer = reader, writer
self._handshake_token = handshake_token
@@ -210,15 +211,17 @@ class Plugin:
async def run(self):
"""Plugin's main coroutine."""
async def pass_control():
while self._active:
try:
self.tick()
except Exception:
logging.exception("Unexpected exception raised in plugin tick")
await asyncio.sleep(1)
await self._server.run()
if self._pass_control_task is not None:
await self._pass_control_task
await asyncio.gather(pass_control(), self._server.run())
async def _pass_control(self):
while self._active:
try:
self.tick()
except Exception:
logging.exception("Unexpected exception raised in plugin tick")
await asyncio.sleep(1)
def _shutdown(self):
logging.info("Shutting down")
@@ -236,6 +239,7 @@ class Plugin:
def _initialize_cache(self, data: Dict):
self._persistent_cache = data
self.handshake_complete()
self._pass_control_task = asyncio.create_task(self._pass_control())
@staticmethod
def _ping():
@@ -264,6 +268,7 @@ class Plugin:
return Authentication(user_data['userId'], user_data['username'])
"""
self.persistent_cache['credentials'] = credentials
self._notification_client.notify("store_credentials", credentials, sensitive_params=True)
def add_game(self, game: Game) -> None:

View File

@@ -204,5 +204,5 @@ class GameTime():
:param last_time_played: last time the game was played (**unix timestamp**)
"""
game_id: str
time_played: int
last_played_time: int
time_played: Optional[int]
last_played_time: Optional[int]

28
src/galaxy/reader.py Normal file
View File

@@ -0,0 +1,28 @@
from asyncio import StreamReader
class StreamLineReader:
"""Handles StreamReader readline without buffer limit"""
def __init__(self, reader: StreamReader):
self._reader = reader
self._buffer = bytes()
self._processed_buffer_it = 0
async def readline(self):
while True:
# check if there is no unprocessed data in the buffer
if not self._buffer or self._processed_buffer_it != 0:
chunk = await self._reader.read(1024)
if not chunk:
return bytes() # EOF
self._buffer += chunk
it = self._buffer.find(b"\n", self._processed_buffer_it)
if it < 0:
self._processed_buffer_it = len(self._buffer)
continue
line = self._buffer[:it]
self._buffer = self._buffer[it+1:]
self._processed_buffer_it = 0
return line

View File

@@ -16,7 +16,8 @@ def test_success(plugin, read, write):
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_game_times.coro.return_value = [
GameTime("3", 60, 1549550504),
GameTime("5", 10, 1549550502)
GameTime("5", 10, None),
GameTime("7", None, 1549550502),
]
asyncio.run(plugin.run())
plugin.get_game_times.assert_called_with()
@@ -35,7 +36,10 @@ def test_success(plugin, read, write):
{
"game_id": "5",
"time_played": 10,
"last_played_time": 1549550502
},
{
"game_id": "7",
"last_played_time": 1549550502
}
]
}

View File

@@ -62,7 +62,18 @@ def test_ping(plugin, read, write):
"result": None
}
def test_tick(plugin, read):
def test_tick_before_handshake(plugin, read):
read.side_effect = [b""]
asyncio.run(plugin.run())
plugin.tick.assert_not_called()
def test_tick_after_handshake(plugin, read):
request = {
"jsonrpc": "2.0",
"id": "6",
"method": "initialize_cache",
"params": {"data": {}}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
plugin.tick.assert_called_with()

View File

@@ -0,0 +1,52 @@
from unittest.mock import MagicMock
import pytest
from galaxy.reader import StreamLineReader
from galaxy.unittest.mock import AsyncMock
@pytest.fixture()
def stream_reader():
reader = MagicMock()
reader.read = AsyncMock()
return reader
@pytest.fixture()
def read(stream_reader):
return stream_reader.read
@pytest.fixture()
def reader(stream_reader):
return StreamLineReader(stream_reader)
@pytest.mark.asyncio
async def test_message(reader, read):
read.return_value = b"a\n"
assert await reader.readline() == b"a"
read.assert_called_once()
@pytest.mark.asyncio
async def test_separate_messages(reader, read):
read.side_effect = [b"a\n", b"b\n"]
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_connected_messages(reader, read):
read.return_value = b"a\nb\n"
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
read.assert_called_once()
@pytest.mark.asyncio
async def test_cut_message(reader, read):
read.side_effect = [b"a", b"b\n"]
assert await reader.readline() == b"ab"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_half_message(reader, read):
read.side_effect = [b"a", b""]
assert await reader.readline() == b""
assert read.call_count == 2