Files
galaxy-integrations-python-api/tests/test_chunk_messages.py

64 lines
1.6 KiB
Python

import json
import pytest
from galaxy.unittest.mock import async_return_value
@pytest.mark.asyncio
async def test_chunked_messages(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
"params": {
"game_id": "3"
}
}
message = json.dumps(request).encode() + b"\n"
read.side_effect = [async_return_value(message[:5]), async_return_value(message[5:]), async_return_value(b"")]
await plugin.run()
plugin.install_game.assert_called_with(game_id="3")
@pytest.mark.asyncio
async def test_joined_messages(plugin, read):
requests = [
{
"jsonrpc": "2.0",
"method": "install_game",
"params": {
"game_id": "3"
}
},
{
"jsonrpc": "2.0",
"method": "launch_game",
"params": {
"game_id": "3"
}
}
]
data = b"".join([json.dumps(request).encode() + b"\n" for request in requests])
read.side_effect = [async_return_value(data), async_return_value(b"")]
await plugin.run()
plugin.install_game.assert_called_with(game_id="3")
plugin.launch_game.assert_called_with(game_id="3")
@pytest.mark.asyncio
async def test_not_finished(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
"params": {
"game_id": "3"
}
}
message = json.dumps(request).encode() # no new line
read.side_effect = [async_return_value(message), async_return_value(b"")]
await plugin.run()
plugin.install_game.assert_not_called()