mirror of
https://github.com/gogcom/galaxy-integrations-python-api.git
synced 2026-04-24 16:17:16 -04:00
version 0.33
This commit is contained in:
@@ -33,6 +33,7 @@ def write(writer):
|
||||
def plugin(reader, writer):
|
||||
"""Return plugin instance with all feature methods mocked"""
|
||||
async_methods = (
|
||||
"handshake_complete",
|
||||
"authenticate",
|
||||
"get_owned_games",
|
||||
"get_unlocked_achievements",
|
||||
|
||||
@@ -6,8 +6,7 @@ import pytest
|
||||
from galaxy.api.types import Authentication
|
||||
from galaxy.api.errors import (
|
||||
UnknownError, InvalidCredentials, NetworkError, LoggedInElsewhere, ProtocolError,
|
||||
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied,
|
||||
ParentalControlBlock, DeviceBlocked, RegionBlocked
|
||||
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied
|
||||
)
|
||||
|
||||
def test_success(plugin, readline, write):
|
||||
@@ -44,9 +43,6 @@ def test_success(plugin, readline, write):
|
||||
pytest.param(TemporaryBlocked, 104, "Temporary blocked", id="temporary_blocked"),
|
||||
pytest.param(Banned, 105, "Banned", id="banned"),
|
||||
pytest.param(AccessDenied, 106, "Access denied", id="access_denied"),
|
||||
pytest.param(ParentalControlBlock, 107, "Parental control block", id="parental_control_clock"),
|
||||
pytest.param(DeviceBlocked, 108, "Device blocked", id="device_blocked"),
|
||||
pytest.param(RegionBlocked, 109, "Region blocked", id="region_blocked")
|
||||
])
|
||||
def test_failure(plugin, readline, write, error, code, message):
|
||||
request = {
|
||||
|
||||
71
tests/test_persistent_cache.py
Normal file
71
tests/test_persistent_cache.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def assert_rpc_response(write, response_id, result=None):
|
||||
assert json.loads(write.call_args[0][0]) == {
|
||||
"jsonrpc": "2.0",
|
||||
"id": str(response_id),
|
||||
"result": result
|
||||
}
|
||||
|
||||
|
||||
def assert_rpc_request(write, method, params=None):
|
||||
assert json.loads(write.call_args[0][0]) == {
|
||||
"jsonrpc": "2.0",
|
||||
"method": method,
|
||||
"params": {"data": params}
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cache_data():
|
||||
return {
|
||||
"persistent key": "persistent value",
|
||||
"persistent object": {"answer to everything": 42}
|
||||
}
|
||||
|
||||
|
||||
def test_initialize_cache(plugin, readline, write, cache_data):
|
||||
request_id = 3
|
||||
request = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": str(request_id),
|
||||
"method": "initialize_cache",
|
||||
"params": {"data": cache_data}
|
||||
}
|
||||
readline.side_effect = [json.dumps(request)]
|
||||
|
||||
assert {} == plugin.persistent_cache
|
||||
asyncio.run(plugin.run())
|
||||
plugin.handshake_complete.assert_called_once_with()
|
||||
assert cache_data == plugin.persistent_cache
|
||||
assert_rpc_response(write, response_id=request_id)
|
||||
|
||||
|
||||
def test_set_cache(plugin, write, cache_data):
|
||||
async def runner():
|
||||
assert {} == plugin.persistent_cache
|
||||
|
||||
plugin.persistent_cache.update(cache_data)
|
||||
plugin.push_cache()
|
||||
|
||||
assert_rpc_request(write, "push_cache", cache_data)
|
||||
assert cache_data == plugin.persistent_cache
|
||||
|
||||
asyncio.run(runner())
|
||||
|
||||
|
||||
def test_clear_cache(plugin, write, cache_data):
|
||||
async def runner():
|
||||
plugin._persistent_cache = cache_data
|
||||
|
||||
plugin.persistent_cache.clear()
|
||||
plugin.push_cache()
|
||||
|
||||
assert_rpc_request(write, "push_cache", {})
|
||||
assert {} == plugin.persistent_cache
|
||||
|
||||
asyncio.run(runner())
|
||||
Reference in New Issue
Block a user