Compare commits

..

3 Commits

Author SHA1 Message Date
Romuald Juchnowicz-Bierbasz
4e1ea8056d Add StreamLineReader with unit tests 2019-06-28 14:00:44 +02:00
Romuald Juchnowicz-Bierbasz
67e8681de6 Increment version 2019-06-28 11:59:01 +02:00
Romuald Juchnowicz-Bierbasz
77d742ce18 SDK-2910: Fix readline 2019-06-28 11:58:32 +02:00
4 changed files with 85 additions and 22 deletions

View File

@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="galaxy.plugin.api",
version="0.38",
version="0.39",
description="GOG Galaxy Integrations Python API",
author='Galaxy team',
author_email='galaxy@gog.com',

View File

@@ -5,6 +5,8 @@ import logging
import inspect
import json
from galaxy.reader import StreamLineReader
class JsonRpcError(Exception):
def __init__(self, code, message, data=None):
self.code = code
@@ -67,14 +69,12 @@ def anonymise_sensitive_params(params, sensitive_params):
class Server():
def __init__(self, reader, writer, encoder=json.JSONEncoder()):
self._active = True
self._reader = reader
self._reader = StreamLineReader(reader)
self._writer = writer
self._encoder = encoder
self._methods = {}
self._notifications = {}
self._eof_listeners = []
self._input_buffer = bytes()
self._input_buffer_it = 0
def register_method(self, name, callback, internal, sensitive_params=False):
"""
@@ -106,7 +106,7 @@ class Server():
async def run(self):
while self._active:
try:
data = await self._readline()
data = await self._reader.readline()
if not data:
self._eof()
continue
@@ -117,23 +117,6 @@ class Server():
logging.debug("Received %d bytes of data", len(data))
self._handle_input(data)
async def _readline(self):
"""Like StreamReader.readline but without limit"""
while True:
chunk = await self._reader.read(1024)
self._input_buffer += chunk
it = self._input_buffer.find(b"\n", self._input_buffer_it)
if it < 0:
if not chunk:
return bytes() # EOF
else:
self._input_buffer_it = len(self._input_buffer)
continue
line = self._input_buffer[:it]
self._input_buffer = self._input_buffer[it+1:]
self._input_buffer_it = 0
return line
def stop(self):
self._active = False

28
src/galaxy/reader.py Normal file
View File

@@ -0,0 +1,28 @@
from asyncio import StreamReader
class StreamLineReader:
"""Handles StreamReader readline without buffer limit"""
def __init__(self, reader: StreamReader):
self._reader = reader
self._buffer = bytes()
self._processed_buffer_it = 0
async def readline(self):
while True:
# check if there is no unprocessed data in the buffer
if not self._buffer or self._processed_buffer_it != 0:
chunk = await self._reader.read(1024)
if not chunk:
return bytes() # EOF
self._buffer += chunk
it = self._buffer.find(b"\n", self._processed_buffer_it)
if it < 0:
self._processed_buffer_it = len(self._buffer)
continue
line = self._buffer[:it]
self._buffer = self._buffer[it+1:]
self._processed_buffer_it = 0
return line

View File

@@ -0,0 +1,52 @@
from unittest.mock import MagicMock
import pytest
from galaxy.reader import StreamLineReader
from galaxy.unittest.mock import AsyncMock
@pytest.fixture()
def stream_reader():
reader = MagicMock()
reader.read = AsyncMock()
return reader
@pytest.fixture()
def read(stream_reader):
return stream_reader.read
@pytest.fixture()
def reader(stream_reader):
return StreamLineReader(stream_reader)
@pytest.mark.asyncio
async def test_message(reader, read):
read.return_value = b"a\n"
assert await reader.readline() == b"a"
read.assert_called_once()
@pytest.mark.asyncio
async def test_separate_messages(reader, read):
read.side_effect = [b"a\n", b"b\n"]
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_connected_messages(reader, read):
read.return_value = b"a\nb\n"
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
read.assert_called_once()
@pytest.mark.asyncio
async def test_cut_message(reader, read):
read.side_effect = [b"a", b"b\n"]
assert await reader.readline() == b"ab"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_half_message(reader, read):
read.side_effect = [b"a", b""]
assert await reader.readline() == b""
assert read.call_count == 2