SDK-2880: Read in chunks

This commit is contained in:
Romuald Juchnowicz-Bierbasz
2019-06-25 17:53:16 +02:00
parent be03c83d45
commit 58b17d94fa
16 changed files with 104 additions and 71 deletions

View File

@@ -73,6 +73,7 @@ class Server():
self._methods = {}
self._notifications = {}
self._eof_listeners = []
self._input_buffer = bytes()
def register_method(self, name, callback, internal, sensitive_params=False):
"""
@@ -104,7 +105,7 @@ class Server():
async def run(self):
while self._active:
try:
data = await self._reader.readline()
data = await self._readline()
if not data:
self._eof()
continue
@@ -115,6 +116,21 @@ class Server():
logging.debug("Received %d bytes of data", len(data))
self._handle_input(data)
async def _readline(self):
"""Like StreamReader.readline but without limit"""
while True:
chunk = await self._reader.read(1024)
if not chunk:
return chunk
previous_size = len(self._input_buffer)
self._input_buffer += chunk
it = self._input_buffer.find(b"\n", previous_size)
if it < 0:
continue
line = self._input_buffer[:it]
self._input_buffer = self._input_buffer[it+1:]
return line
def stop(self):
self._active = False

View File

@@ -11,7 +11,7 @@ from galaxy.unittest.mock import AsyncMock, coroutine_mock
@pytest.fixture()
def reader():
stream = MagicMock(name="stream_reader")
stream.readline = AsyncMock()
stream.read = AsyncMock()
yield stream
@pytest.fixture()
@@ -22,8 +22,8 @@ def writer():
yield stream
@pytest.fixture()
def readline(reader):
yield reader.readline
def read(reader):
yield reader.read
@pytest.fixture()
def write(writer):

View File

@@ -16,7 +16,7 @@ def test_initialization_no_id_nor_name():
with raises(AssertionError):
Achievement(unlock_time=1234567890)
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -25,7 +25,7 @@ def test_success(plugin, readline, write):
"game_id": "14"
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_unlocked_achievements.coro.return_value = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395),
@@ -57,7 +57,7 @@ def test_success(plugin, readline, write):
}
}
def test_failure(plugin, readline, write):
def test_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -67,7 +67,7 @@ def test_failure(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_unlocked_achievements.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_unlocked_achievements.assert_called()

View File

@@ -9,14 +9,14 @@ from galaxy.api.errors import (
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied
)
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek")
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with()
@@ -44,14 +44,14 @@ def test_success(plugin, readline, write):
pytest.param(Banned, 105, "Banned", id="banned"),
pytest.param(AccessDenied, 106, "Access denied", id="access_denied"),
])
def test_failure(plugin, readline, write, error, code, message):
def test_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with()
@@ -66,7 +66,7 @@ def test_failure(plugin, readline, write, error, code, message):
}
}
def test_stored_credentials(plugin, readline, write):
def test_stored_credentials(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -77,7 +77,7 @@ def test_stored_credentials(plugin, readline, write):
}
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek")
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with(stored_credentials={"token": "ABC"})
@@ -100,7 +100,7 @@ def test_store_credentials(plugin, write):
"params": credentials
}
def test_lost_authentication(plugin, readline, write):
def test_lost_authentication(plugin, write):
async def couritine():
plugin.lost_authentication()

View File

@@ -9,7 +9,7 @@ from galaxy.api.errors import (
TooManyMessagesSent, IncoherentLastMessage, MessageNotFound
)
def test_send_message_success(plugin, readline, write):
def test_send_message_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -20,7 +20,7 @@ def test_send_message_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.send_message.coro.return_value = None
asyncio.run(plugin.run())
plugin.send_message.assert_called_with(room_id="14", message="Hello!")
@@ -40,7 +40,7 @@ def test_send_message_success(plugin, readline, write):
pytest.param(BackendError, 4, "Backend error", id="backend_error"),
pytest.param(TooManyMessagesSent, 300, "Too many messages sent", id="too_many_messages")
])
def test_send_message_failure(plugin, readline, write, error, code, message):
def test_send_message_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "6",
@@ -51,7 +51,7 @@ def test_send_message_failure(plugin, readline, write, error, code, message):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.send_message.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.send_message.assert_called_with(room_id="15", message="Bye")
@@ -66,7 +66,7 @@ def test_send_message_failure(plugin, readline, write, error, code, message):
}
}
def test_mark_as_read_success(plugin, readline, write):
def test_mark_as_read_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "7",
@@ -77,7 +77,7 @@ def test_mark_as_read_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.mark_as_read.coro.return_value = None
asyncio.run(plugin.run())
plugin.mark_as_read.assert_called_with(room_id="14", last_message_id="67")
@@ -102,7 +102,7 @@ def test_mark_as_read_success(plugin, readline, write):
id="incoherent_last_message"
)
])
def test_mark_as_read_failure(plugin, readline, write, error, code, message):
def test_mark_as_read_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "4",
@@ -113,7 +113,7 @@ def test_mark_as_read_failure(plugin, readline, write, error, code, message):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.mark_as_read.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.mark_as_read.assert_called_with(room_id="18", last_message_id="7")
@@ -128,14 +128,14 @@ def test_mark_as_read_failure(plugin, readline, write, error, code, message):
}
}
def test_get_rooms_success(plugin, readline, write):
def test_get_rooms_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "2",
"method": "import_rooms"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_rooms.coro.return_value = [
Room("13", 0, None),
Room("15", 34, "8")
@@ -162,14 +162,14 @@ def test_get_rooms_success(plugin, readline, write):
}
}
def test_get_rooms_failure(plugin, readline, write):
def test_get_rooms_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "9",
"method": "import_rooms"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_rooms.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_rooms.assert_called_with()
@@ -184,7 +184,7 @@ def test_get_rooms_failure(plugin, readline, write):
}
}
def test_get_room_history_from_message_success(plugin, readline, write):
def test_get_room_history_from_message_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "2",
@@ -195,7 +195,7 @@ def test_get_room_history_from_message_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_room_history_from_message.coro.return_value = [
Message("13", "149", 1549454837, "Hello"),
Message("14", "812", 1549454899, "Hi")
@@ -233,7 +233,7 @@ def test_get_room_history_from_message_success(plugin, readline, write):
pytest.param(BackendError, 4, "Backend error", id="backend_error"),
pytest.param(MessageNotFound, 500, "Message not found", id="message_not_found")
])
def test_get_room_history_from_message_failure(plugin, readline, write, error, code, message):
def test_get_room_history_from_message_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "7",
@@ -244,7 +244,7 @@ def test_get_room_history_from_message_failure(plugin, readline, write, error, c
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_room_history_from_message.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.get_room_history_from_message.assert_called_with(room_id="33", message_id="88")
@@ -259,7 +259,7 @@ def test_get_room_history_from_message_failure(plugin, readline, write, error, c
}
}
def test_get_room_history_from_timestamp_success(plugin, readline, write):
def test_get_room_history_from_timestamp_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "7",
@@ -270,7 +270,7 @@ def test_get_room_history_from_timestamp_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_room_history_from_timestamp.coro.return_value = [
Message("12", "155", 1549454836, "Bye")
]
@@ -296,7 +296,7 @@ def test_get_room_history_from_timestamp_success(plugin, readline, write):
}
}
def test_get_room_history_from_timestamp_failure(plugin, readline, write):
def test_get_room_history_from_timestamp_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -307,7 +307,7 @@ def test_get_room_history_from_timestamp_failure(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_room_history_from_timestamp.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_room_history_from_timestamp.assert_called_with(

View File

@@ -0,0 +1,17 @@
import asyncio
import json
def test_chunked_messages(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
"params": {
"game_id": "3"
}
}
message = json.dumps(request).encode() + b"\n"
read.side_effect = [message[:5], message[5:], b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.install_game.assert_called_with(game_id="3")

View File

@@ -5,14 +5,14 @@ from galaxy.api.types import FriendInfo
from galaxy.api.errors import UnknownError
def test_get_friends_success(plugin, readline, write):
def test_get_friends_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_friends.coro.return_value = [
FriendInfo("3", "Jan"),
FriendInfo("5", "Ola")
@@ -33,14 +33,14 @@ def test_get_friends_success(plugin, readline, write):
}
def test_get_friends_failure(plugin, readline, write):
def test_get_friends_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_friends.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_friends.assert_called_with()

View File

@@ -6,14 +6,14 @@ import pytest
from galaxy.api.types import GameTime
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_game_times"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_game_times.coro.return_value = [
GameTime("3", 60, 1549550504),
GameTime("5", 10, 1549550502)
@@ -41,14 +41,14 @@ def test_success(plugin, readline, write):
}
}
def test_failure(plugin, readline, write):
def test_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_game_times"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_game_times.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_game_times.assert_called_with()

View File

@@ -1,7 +1,7 @@
import asyncio
import json
def test_success(plugin, readline):
def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
@@ -10,7 +10,7 @@ def test_success(plugin, readline):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.install_game.assert_called_with(game_id="3")

View File

@@ -4,7 +4,7 @@ import json
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
def test_get_capabilites(reader, writer, readline, write):
def test_get_capabilites(reader, writer, read, write):
class PluginImpl(Plugin): #pylint: disable=abstract-method
async def get_owned_games(self):
pass
@@ -16,7 +16,7 @@ def test_get_capabilites(reader, writer, readline, write):
}
token = "token"
plugin = PluginImpl(Platform.Generic, "0.1", reader, writer, token)
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
response = json.loads(write.call_args[0][0])
assert response == {
@@ -31,13 +31,13 @@ def test_get_capabilites(reader, writer, readline, write):
}
}
def test_shutdown(plugin, readline, write):
def test_shutdown(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "5",
"method": "shutdown"
}
readline.side_effect = [json.dumps(request)]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
plugin.shutdown.assert_called_with()
response = json.loads(write.call_args[0][0])
@@ -47,13 +47,13 @@ def test_shutdown(plugin, readline, write):
"result": None
}
def test_ping(plugin, readline, write):
def test_ping(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "7",
"method": "ping"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
response = json.loads(write.call_args[0][0])
assert response == {
@@ -62,7 +62,7 @@ def test_ping(plugin, readline, write):
"result": None
}
def test_tick(plugin, readline):
readline.side_effect = [""]
def test_tick(plugin, read):
read.side_effect = [b""]
asyncio.run(plugin.run())
plugin.tick.assert_called_with()

View File

@@ -1,7 +1,7 @@
import asyncio
import json
def test_success(plugin, readline):
def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "launch_game",
@@ -10,7 +10,7 @@ def test_success(plugin, readline):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.launch_game.assert_called_with(game_id="3")

View File

@@ -7,14 +7,14 @@ from galaxy.api.types import LocalGame
from galaxy.api.consts import LocalGameState
from galaxy.api.errors import UnknownError, FailedParsingManifest
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_local_games"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_local_games.coro.return_value = [
LocalGame("1", LocalGameState.Running),
@@ -53,14 +53,14 @@ def test_success(plugin, readline, write):
pytest.param(FailedParsingManifest, 200, "Failed parsing manifest", id="failed_parsing")
],
)
def test_failure(plugin, readline, write, error, code, message):
def test_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_local_games"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_local_games.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.get_local_games.assert_called_with()

View File

@@ -5,14 +5,14 @@ from galaxy.api.types import Game, Dlc, LicenseInfo
from galaxy.api.consts import LicenseType
from galaxy.api.errors import UnknownError
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_owned_games"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.coro.return_value = [
Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None)),
Game(
@@ -67,14 +67,14 @@ def test_success(plugin, readline, write):
}
}
def test_failure(plugin, readline, write):
def test_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_owned_games"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_owned_games.assert_called_with()

View File

@@ -28,7 +28,7 @@ def cache_data():
}
def test_initialize_cache(plugin, readline, write, cache_data):
def test_initialize_cache(plugin, read, write, cache_data):
request_id = 3
request = {
"jsonrpc": "2.0",
@@ -36,7 +36,7 @@ def test_initialize_cache(plugin, readline, write, cache_data):
"method": "initialize_cache",
"params": {"data": cache_data}
}
readline.side_effect = [json.dumps(request)]
read.side_effect = [json.dumps(request).encode() + b"\n"]
assert {} == plugin.persistent_cache
asyncio.run(plugin.run())

View File

@@ -1,7 +1,7 @@
import asyncio
import json
def test_success(plugin, readline):
def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "uninstall_game",
@@ -10,7 +10,7 @@ def test_success(plugin, readline):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.uninstall_game.assert_called_with(game_id="3")

View File

@@ -6,7 +6,7 @@ from galaxy.api.errors import UnknownError
from galaxy.api.consts import PresenceState
def test_get_users_success(plugin, readline, write):
def test_get_users_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "8",
@@ -16,7 +16,7 @@ def test_get_users_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_users.coro.return_value = [
UserInfo("5", False, "Ula", "http://avatar.png", Presence(PresenceState.Offline))
]
@@ -43,7 +43,7 @@ def test_get_users_success(plugin, readline, write):
}
def test_get_users_failure(plugin, readline, write):
def test_get_users_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "12",
@@ -53,7 +53,7 @@ def test_get_users_failure(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_users.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_users.assert_called_with(user_id_list=["10", "11", "12"])