mirror of
https://github.com/gogcom/galaxy-integrations-python-api.git
synced 2025-12-23 23:18:15 -05:00
64 lines
1.6 KiB
Python
64 lines
1.6 KiB
Python
import json
|
|
|
|
import pytest
|
|
|
|
from galaxy.unittest.mock import async_return_value
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chunked_messages(plugin, read):
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"method": "install_game",
|
|
"params": {
|
|
"game_id": "3"
|
|
}
|
|
}
|
|
|
|
message = json.dumps(request).encode() + b"\n"
|
|
read.side_effect = [async_return_value(message[:5]), async_return_value(message[5:]), async_return_value(b"")]
|
|
await plugin.run()
|
|
plugin.install_game.assert_called_with(game_id="3")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_joined_messages(plugin, read):
|
|
requests = [
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "install_game",
|
|
"params": {
|
|
"game_id": "3"
|
|
}
|
|
},
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "launch_game",
|
|
"params": {
|
|
"game_id": "3"
|
|
}
|
|
}
|
|
]
|
|
data = b"".join([json.dumps(request).encode() + b"\n" for request in requests])
|
|
|
|
read.side_effect = [async_return_value(data), async_return_value(b"")]
|
|
await plugin.run()
|
|
plugin.install_game.assert_called_with(game_id="3")
|
|
plugin.launch_game.assert_called_with(game_id="3")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_not_finished(plugin, read):
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"method": "install_game",
|
|
"params": {
|
|
"game_id": "3"
|
|
}
|
|
}
|
|
|
|
message = json.dumps(request).encode() # no new line
|
|
read.side_effect = [async_return_value(message), async_return_value(b"")]
|
|
await plugin.run()
|
|
plugin.install_game.assert_not_called()
|