Compare commits

...

101 Commits
0.43 ... 0.65.1

Author SHA1 Message Date
Mieszko Banczerowski
aaeca6b47e Increment version 2020-05-15 12:17:28 +02:00
Mieszko Banczerowski
fe8f7e929a Bump psutil >5.6.6 due to CVE-2019-18874 2020-05-15 11:52:49 +02:00
Mieszko Banczerowski
49da4d4d37 GPI-1341 Fix logging error on _handle_response 2020-05-11 16:05:54 +02:00
Mieszko Banczerowski
9745dcd8ef GPI-1237 Docs clarification about platforms 2020-04-27 12:14:23 +02:00
Mateusz Silaczewski
ad758b0da9 subscription settings 2020-03-23 10:15:24 +01:00
unknown
9062944d4f adhere to comments 2020-02-18 15:09:19 +01:00
unknown
2251747281 adhere to comments 2020-02-18 15:03:58 +01:00
unknown
0245e47a74 cleanup docs, up version 2020-02-11 09:53:38 +01:00
unknown
0c51ff2cc9 adhere to comments, move importers to seperate module 2020-02-10 11:37:01 +01:00
unknown
cd452b881d include subscription_name in partial finished notification 2020-02-10 10:05:58 +01:00
unknown
19c9f14ca9 separate sub importer, notify partial finished per subscription 2020-02-10 09:26:48 +01:00
unknown
f5683d222a adhere to comments 2020-02-07 09:20:33 +01:00
unknown
44ea89ef63 adhere to comments 2020-02-07 09:17:26 +01:00
unknown
325cf66c7d cleanup 2020-02-06 13:52:07 +01:00
unknown
cd8aecac8f use python from galaxy 2020-02-06 13:49:23 +01:00
unknown
3aa37907fc use python from galaxy 2020-02-06 13:49:02 +01:00
unknown
01e844009b use python from galaxy 2020-02-06 13:45:24 +01:00
unknown
4a7febfa37 check remote tests fix 2020-02-06 12:20:30 +01:00
unknown
f9eb9ab6cb dont change line in unittest/mock.py 2020-02-06 11:49:30 +01:00
unknown
134fbe2752 adjust tests, allow for None yield 2020-02-06 11:47:34 +01:00
unknown
bd8e6703e0 async yield 2020-02-06 11:05:47 +01:00
unknown
74e3825f10 prepare interfaces for subscriptions 2020-02-05 16:28:15 +01:00
Mieszko Banczerowski
62206318bd GPI-1122 get_local_size docs clarification about unknown size and 0 value use-cases 2020-02-03 11:23:01 +01:00
Mieszko Banczerowski
083b9f869f GPI-1050 More detailed logging in http module 2020-01-28 15:03:21 +01:00
Mieszko Banczerowski
617dbdfee7 GPI-1109 Implement get_game_size 2020-01-28 10:35:54 +01:00
Denis LE
65f4334c03 Fix typo in galaxy.http doc 2019-12-25 12:42:56 +01:00
Aleksej Pawlowskij
26102dd832 Increment version 2019-12-17 15:56:37 +01:00
Aleksej Pawlowskij
cdcebda529 SDK-3136: Relax install requirements 2019-12-17 15:43:47 +01:00
Romuald Bierbasz
a83f348d7d Increment version 2019-12-10 16:02:40 +01:00
Romuald Bierbasz
1c196d60d5 SDK-3199: Log response json 2019-12-10 16:00:46 +01:00
Aleksej Pawlowskij
deb125ec48 Add missing psutil setup requirement 2019-12-05 16:22:26 +01:00
Rafal Makagon
4cc0055119 Increment version 2019-12-05 13:58:04 +01:00
Romuald Bierbasz
00164fab67 Correctly set _import_in_progress 2019-12-05 11:39:09 +01:00
Romuald Juchnowicz-Bierbasz
453cd1cc70 Do not send notificaitons when import is cancelled 2019-12-03 14:06:55 +01:00
Romuald Juchnowicz-Bierbasz
1f55253fd7 Wait until writer is closed 2019-12-03 14:04:19 +01:00
Romuald Juchnowicz-Bierbasz
7aa3b01abd Add Importer class (reuse code for importers) 2019-12-03 14:03:53 +01:00
Rafal Makagon
bd14d58bad Increment version 2019-11-28 14:37:46 +01:00
Romuald Juchnowicz-Bierbasz
274b9a2c18 Do not wait for drain 2019-11-28 13:10:58 +01:00
Rafal Makagon
75e5a66fbe Increment version 2019-11-27 13:14:11 +01:00
Mieszko Banczerowski
2a9ec3067d Fix sending Exceptions with custom data 2019-11-27 13:12:20 +01:00
Rafal Makagon
69532a5ba9 fix richpresence parameter name 2019-11-27 13:10:43 +01:00
Romuald Juchnowicz-Bierbasz
f5d47b0167 Add timeout to shutdown 2019-11-22 13:11:08 +01:00
Romuald Juchnowicz-Bierbasz
02f4faa432 Do not use root logger 2019-11-22 13:07:33 +01:00
Romuald Juchnowicz-Bierbasz
3d3922c965 Add async_raise 2019-11-20 17:57:17 +01:00
Rafal Makagon
b695cdfc78 Increment version 2019-11-20 16:23:23 +01:00
Rafal Makagon
66ab1809b8 Do not log data sent to socket 2019-11-20 15:48:00 +01:00
Rafal Makagon
8bf367d0f9 Increment vesion 2019-11-18 13:59:09 +01:00
Rafal Makagon
2cf83395fa fix parse sphinx parse error
+ other small imporvements in docs
2019-11-15 16:06:45 +01:00
Aliaksei Paulouski
4aa76b6e3d SDK-3137: friends and presence updates 2019-11-13 13:40:53 +01:00
Aleksej Pawlowskij
c03465e8f2 SDK-3145: Add optional profile and avatar url 2019-11-13 08:49:38 +01:00
mezzode
810a87718d Fix incorrect field name in GameTime docstring 2019-11-08 11:25:58 +01:00
Rafal Makagon
e32abe11b7 Increment version 2019-11-07 12:29:13 +01:00
FriendsOfGalaxy
d79f183826 Fix RegistryMonitor.is_updated method 2019-11-04 14:26:26 +01:00
Rafal Makagon
78f1d5a4cc Add refresh_credentials method to plugin 2019-10-31 15:15:14 +01:00
Rafal Makagon
9041dbd98c Increase size that is to be read at once from reader's buffer 2019-10-30 15:42:03 +01:00
Aleksej Pawlowskij
e57ecc489c SDK-3110: Deprecate FriendInfo and replace with UserInfo 2019-10-28 11:37:21 +01:00
Romuald Bierbasz
0a20629459 Use pytest 5.2.2 2019-10-28 11:35:59 +01:00
Romuald Bierbasz
1585bab203 Wait for drain before writing 2019-10-22 11:30:01 +02:00
Aleksej Pawlowskij
92caf682d8 Increment version 2019-10-21 16:13:09 +02:00
Aleksej Pawlowskij
062d6a9428 Add user presence import feature 2019-10-21 16:11:10 +02:00
Romuald Juchnowicz-Bierbasz
c874bc1d6e Increment version 2019-10-21 14:54:23 +02:00
Romuald Bierbasz
2dc56571d6 Revert "Add more logs"
This reverts commit 21ab8bf33d3c2714b8c7af2da8036fe8baae20ea.
2019-10-21 14:50:33 +02:00
Romuald Bierbasz
eb216a50a8 Fix mypy ignores 2019-10-21 14:20:21 +02:00
Rafal Makagon
c9b1c8fcae Increment version 2019-10-15 12:58:48 +02:00
Aleksej Pawlowskij
a19a6cf11f Add Rockstar platform 2019-10-07 14:12:44 +02:00
Aleksej Pawlowskij
98cff9cfb8 SDK-3069: add OS compatibility import 2019-10-02 15:41:16 +02:00
Rafal Makagon
2e2aa8c4a0 Increment version 2019-10-01 11:21:32 +02:00
Rafal Makagon
f57e03db2d Add game library settings feature 2019-09-27 16:15:57 +02:00
Rafal Makagon
66085e2239 Revert "Add ignoring not having windll to mypy"
This reverts commit 55c7fcfd61e0391287e2717117da4fca03b77dec.
2019-09-27 15:37:21 +02:00
Romuald Bierbasz
4d3c9b78c4 Add RegistryMonitor 2019-09-25 11:46:18 +02:00
Romuald Juchnowicz-Bierbasz
392e4c5f68 Print error data in import notifications 2019-09-19 14:47:50 +02:00
Romuald Juchnowicz-Bierbasz
4d6d3b8eb2 Increment version 2019-09-18 12:16:34 +02:00
Romuald Juchnowicz-Bierbasz
d5610221a9 Do not wait for external tasks in run 2019-09-18 12:15:18 +02:00
Aleksej Pawlowskij
aa7b398d3b Increment version 2019-09-05 09:14:46 +02:00
Aleksej Pawlowskij
8d6ec500f9 SDK-3041: Add new platforms 2019-09-05 09:14:37 +02:00
Romuald Juchnowicz-Bierbasz
bab0be9994 Increment version 2019-09-02 17:31:47 +02:00
Romuald Juchnowicz-Bierbasz
0294e2a1f1 SDK-3023: Introduce task managers 2019-09-02 17:28:13 +02:00
Mieszko Banczerowski
0ab00e4119 Fix typing in persistent cache 2019-08-23 17:48:42 +02:00
Mieszko Banczerowski
b20fce057b GPI-661: Add docs to http module 2019-08-23 17:46:08 +02:00
Romuald Juchnowicz-Bierbasz
dec59f47dd Add more detials to launch_platform_client method 2019-08-22 09:42:10 +02:00
Mieszko Banczerowski
ca85b2428b GPI-517: Mark coroutines in docs 2019-08-20 14:35:54 +02:00
Pawel Czoppa
d8a00d58a6 Merge branch 'master' of https://gitlab.gog.com/galaxy-client/galaxy-plugin-api into platform_id_update
# Conflicts:
#	PLATFORM_IDs.md
2019-08-20 14:22:36 +02:00
Romuald Juchnowicz-Bierbasz
d4cd1cedfd Install wheel in deploy 2019-08-14 16:07:01 +02:00
Romuald Juchnowicz-Bierbasz
161122b94d SDK-2988: Upload to official pypi server 2019-08-14 13:48:35 +02:00
Romuald Juchnowicz-Bierbasz
cec36695b6 Increment version 2019-08-13 16:08:59 +02:00
Romuald Juchnowicz-Bierbasz
4cc8be8f5d SDK-2997: Add launch_platform_client method 2019-08-13 15:23:20 +02:00
Vadim Suharnikov
f5eb32aa19 Fix a comment typo 2019-08-04 01:06:22 +02:00
Romuald Bierbasz
a76345ff6b Docs fixes 2019-08-02 17:23:56 +02:00
Romuald Bierbasz
c3bbeee54d Turn on type checking 2019-08-02 15:15:29 +02:00
Romuald Juchnowicz-Bierbasz
13a3f7577b Increment version 2019-08-02 15:13:11 +02:00
Romuald Bierbasz
f5b9adfbd5 Add achievements_import_complete and game_times_import_complete 2019-08-02 15:11:05 +02:00
Romuald Juchnowicz-Bierbasz
e33dd09a8d Increment version 2019-08-02 12:57:13 +02:00
Aleksej Pawlowskij
f4ea2af924 Make 'handshake_complete' safe 2019-08-02 12:29:48 +02:00
Romuald Bierbasz
3bcc674518 Use MagicMocks with async_return_value and pytest.mark.asyncio 2019-08-02 12:28:13 +02:00
Romuald Juchnowicz-Bierbasz
b14595bef5 Fix types 2019-08-02 12:20:27 +02:00
Romuald Bierbasz
a9acb7a0db SDK-2931: Refactor import methods 2019-08-02 10:10:58 +02:00
Mesco
d95aacb9d8 Fix typo in docstring 2019-08-01 09:53:50 +02:00
Aleksej Pawlowskij
49ae2beab9 SDK-2983: Split entry methods from feature detection 2019-07-29 12:59:45 +02:00
Aleksej Pawlowskij
c9e190772c Increment version 2019-07-26 16:28:47 +02:00
Aleksej Pawlowskij
789415d31b SDK-2974: add process info tools 2019-07-26 13:41:43 +02:00
Piotr Marzec
8d210e7f3e platform name fixing 2019-06-19 20:43:25 +02:00
51 changed files with 3719 additions and 1203 deletions

2
.gitignore vendored
View File

@@ -7,3 +7,5 @@ docs/build/
Pipfile
.idea
docs/source/_build
.mypy_cache
.pytest_cache

View File

@@ -1,4 +1,4 @@
image: registry-gitlab.gog.com/galaxy-client/gitlab-ci-tools:latest
image: registry-gitlab.gog.com/docker/python:3.7.3
stages:
- test
@@ -14,13 +14,19 @@ test_package:
deploy_package:
stage: deploy
variables:
TWINE_USERNAME: $PYPI_USERNAME
TWINE_PASSWORD: $PYPI_PASSWORD
script:
- pip install twine wheel
- rm -rf dist
- export VERSION=$(python setup.py --version)
- python setup.py sdist --formats=gztar upload -r gog-pypi
- python setup.py sdist --formats=gztar bdist_wheel
- twine upload dist/*
- curl -X POST --silent --show-error --fail
"https://gitlab.gog.com/api/v4/projects/${CI_PROJECT_ID}/repository/tags?tag_name=${VERSION}&ref=${CI_COMMIT_REF_NAME}&private_token=${PACKAGE_DEPLOYER_API_TOKEN}"
when: manual
only:
- master
except:
- tags
- tags

View File

@@ -4,10 +4,10 @@ Platform ID list for GOG Galaxy 2.0 Integrations
| ID | Name |
| --- | --- |
| test | Testing purposes |
| steam | Steam |
| psn | PlayStation Network |
| xboxone | Xbox Live |
| generic | Manually added games |
| origin | Origin |
| uplay | Uplay |
| battlenet | Battle.net |
@@ -20,7 +20,7 @@ Platform ID list for GOG Galaxy 2.0 Integrations
| nswitch | Nintendo Switch |
| nwiiu | Nintendo Wii U |
| nwii | Nintendo Wii |
| ncube | Nintendo Game Cube |
| ncube | Nintendo GameCube |
| riot | Riot |
| wargaming | Wargaming |
| ngameboy | Nintendo Game Boy |
@@ -58,25 +58,25 @@ Platform ID list for GOG Galaxy 2.0 Integrations
| bb | BestBuy |
| gameuk | Game UK |
| fanatical | Fanatical store |
| playasia | PlayAsia |
| playasia | Play-Asia |
| stadia | Google Stadia |
| arc | ARC |
| eso | ESO |
| glyph | Trion World |
| aionl | Aion: Legions of War |
| aion | Aion |
| blade | Blade and Soul |
| blade | Blade & Soul |
| gw | Guild Wars |
| gw2 | Guild Wars 2 |
| lin2 | Lineage 2 |
| ffxi | Final Fantasy XI |
| ffxiv | Final Fantasy XIV |
| totalwar | TotalWar |
| ffxiv | Final Fantasy XIV |
| totalwar | Total War |
| winstore | Windows Store |
| elites | Elite Dangerous |
| star | Star Citizen |
| psp | Playstation Portable |
| psvita | Playstation Vita |
| psp | PlayStation Portable |
| psvita | PlayStation Vita |
| nds | Nintendo DS |
| 3ds | Nintendo 3DS |
| pathofexile | Path of Exile |

View File

@@ -36,20 +36,31 @@ Communication between an integration and the client is also possible with the us
import sys
from galaxy.api.plugin import Plugin, create_and_run_plugin
from galaxy.api.consts import Platform
from galaxy.api.types import Authentication, Game, LicenseInfo, LicenseType
class PluginExample(Plugin):
def __init__(self, reader, writer, token):
super().__init__(
Platform.Generic, # Choose platform from available list
"0.1", # Version
Platform.Test, # choose platform from available list
"0.1", # version
reader,
writer,
token
)
# implement methods
# required
async def authenticate(self, stored_credentials=None):
pass
return Authentication('test_user_id', 'Test User Name')
# required
async def get_owned_games(self):
return [
Game('test', 'The Test', None, LicenseInfo(LicenseType.SinglePurchase))
]
def main():
create_and_run_plugin(PluginExample, sys.argv)
@@ -76,6 +87,20 @@ In order to be found by GOG Galaxy 2.0 an integration folder should be placed in
`~/Library/Application Support/GOG.com/Galaxy/plugins/installed`
### Logging
<a href='https://docs.python.org/3.7/howto/logging.html'>Root logger</a> is already setup by GOG Galaxy to store rotated log files in:
- Windows:
`%programdata%\GOG.com\Galaxy\logs`
- macOS:
`/Users/Shared/GOG.com/Galaxy/Logs`
Plugin logs are kept in `plugin-<platform>-<guid>.log`.
When debugging, inspecting the other side of communication in the `GalaxyClient.log` can be helpful as well.
### Manifest
<a name="deploy-manifest"></a>
@@ -84,8 +109,8 @@ Obligatory JSON file to be placed in an integration folder.
```json
{
"name": "Example plugin",
"platform": "generic",
"guid": "UNIQUE-GUID",
"platform": "test",
"guid": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"version": "0.1",
"description": "Example plugin",
"author": "Name",
@@ -97,9 +122,8 @@ Obligatory JSON file to be placed in an integration folder.
| property | description |
|---------------|---|
| `guid` | |
| `description` | |
| `url` | |
| `guid` | custom Globally Unique Identifier |
| `version` | the same string as `version` in `Plugin` constructor |
| `script` | path of the entry point module, relative to the integration folder |
### Dependencies

View File

@@ -1,4 +1,5 @@
Sphinx==2.0.1
sphinx-rtd-theme==0.4.3
sphinx-autodoc-typehints==1.6.0
sphinxcontrib-asyncio==0.2.0
m2r==0.2.1

View File

@@ -32,12 +32,13 @@ release = _version
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinxcontrib.asyncio',
'sphinx_autodoc_typehints',
'm2r' # mdinclude directive for makrdown files
]
autodoc_member_order = 'bysource'
autodoc_inherit_docstrings = False
autodoc_mock_imports = ["galaxy.http"]
autodoc_mock_imports = ["aiohttp"]
set_type_checking_flag = True
@@ -47,7 +48,7 @@ templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = []
exclude_patterns = [] # type: ignore
# -- Options for HTML output -------------------------------------------------

View File

@@ -0,0 +1,8 @@
galaxy.http
=================
.. automodule:: galaxy.http
:members:
:special-members: __init__
:undoc-members:
:show-inheritance:

View File

@@ -6,7 +6,8 @@ GOG Galaxy Integrations Python API
:includehidden:
Overview <overview>
API <galaxy.api>
galaxy.api
galaxy.http
Platform ID's <platforms>
Index

2
mypy.ini Normal file
View File

@@ -0,0 +1,2 @@
[mypy]
ignore_missing_imports = True

View File

@@ -1,2 +1,2 @@
[pytest]
addopts = --flakes
addopts = --flakes --mypy

View File

@@ -1,8 +1,10 @@
-e .
pytest==4.2.0
pytest==5.2.2
pytest-asyncio==0.10.0
pytest-mock==1.10.3
pytest-mypy==0.4.1
pytest-flakes==4.0.0
# because of pip bug https://github.com/pypa/pip/issues/4780
aiohttp==3.5.4
certifi==2019.3.9
certifi==2019.3.9
psutil==5.6.6; sys_platform == 'darwin'

View File

@@ -2,14 +2,15 @@ from setuptools import setup, find_packages
setup(
name="galaxy.plugin.api",
version="0.43",
version="0.65.1",
description="GOG Galaxy Integrations Python API",
author='Galaxy team',
author_email='galaxy@gog.com',
packages=find_packages("src"),
package_dir={'': 'src'},
install_requires=[
"aiohttp==3.5.4",
"certifi==2019.3.9"
"aiohttp>=3.5.4",
"certifi>=2019.3.9",
"psutil>=5.6.6; sys_platform == 'darwin'"
]
)

View File

@@ -1 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
__path__: str = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore

View File

@@ -81,6 +81,17 @@ class Platform(Enum):
NintendoDs = "nds"
Nintendo3Ds = "3ds"
PathOfExile = "pathofexile"
Twitch = "twitch"
Minecraft = "minecraft"
GameSessions = "gamesessions"
Nuuvem = "nuuvem"
FXStore = "fxstore"
IndieGala = "indiegala"
Playfire = "playfire"
Oculus = "oculus"
Test = "test"
Rockstar = "rockstar"
class Feature(Enum):
"""Possible features that can be implemented by an integration.
@@ -99,6 +110,13 @@ class Feature(Enum):
VerifyGame = "VerifyGame"
ImportFriends = "ImportFriends"
ShutdownPlatformClient = "ShutdownPlatformClient"
LaunchPlatformClient = "LaunchPlatformClient"
ImportGameLibrarySettings = "ImportGameLibrarySettings"
ImportOSCompatibility = "ImportOSCompatibility"
ImportUserPresence = "ImportUserPresence"
ImportLocalSize = "ImportLocalSize"
ImportSubscriptions = "ImportSubscriptions"
ImportSubscriptionGames = "ImportSubscriptionGames"
class LicenseType(Enum):
@@ -117,3 +135,30 @@ class LocalGameState(Flag):
None_ = 0
Installed = 1
Running = 2
class OSCompatibility(Flag):
"""Possible game OS compatibility.
Use "bitwise or" to express multiple OSs compatibility, e.g. ``os=OSCompatibility.Windows|OSCompatibility.MacOS``
"""
Windows = 0b001
MacOS = 0b010
Linux = 0b100
class PresenceState(Enum):
""""Possible states of a user."""
Unknown = "unknown"
Online = "online"
Offline = "offline"
Away = "away"
class SubscriptionDiscovery(Flag):
"""Possible capabilities which inform what methods of subscriptions ownership detection are supported.
:param AUTOMATIC: integration can retrieve the proper status of subscription ownership.
:param USER_ENABLED: integration can handle override of ~class::`Subscription.owned` value to True
"""
AUTOMATIC = 1
USER_ENABLED = 2

View File

@@ -1,6 +1,6 @@
from galaxy.api.jsonrpc import ApplicationError, UnknownError
UnknownError = UnknownError
assert UnknownError
class AuthenticationRequired(ApplicationError):
def __init__(self, data=None):
@@ -20,7 +20,7 @@ class BackendError(ApplicationError):
class UnknownBackendResponse(ApplicationError):
def __init__(self, data=None):
super().__init__(4, "Backend responded in uknown way", data)
super().__init__(4, "Backend responded in unknown way", data)
class TooManyRequests(ApplicationError):
def __init__(self, data=None):

View File

@@ -0,0 +1,89 @@
import asyncio
import logging
from galaxy.api.jsonrpc import ApplicationError
from galaxy.api.errors import ImportInProgress, UnknownError
logger = logging.getLogger(__name__)
class Importer:
def __init__(
self,
task_manger,
name,
get,
prepare_context,
notification_success,
notification_failure,
notification_finished,
complete,
):
self._task_manager = task_manger
self._name = name
self._get = get
self._prepare_context = prepare_context
self._notification_success = notification_success
self._notification_failure = notification_failure
self._notification_finished = notification_finished
self._complete = complete
self._import_in_progress = False
async def _import_element(self, id_, context_):
try:
element = await self._get(id_, context_)
self._notification_success(id_, element)
except ApplicationError as error:
self._notification_failure(id_, error)
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Unexpected exception raised in %s importer", self._name)
self._notification_failure(id_, UnknownError())
async def _import_elements(self, ids_, context_):
try:
imports = [self._import_element(id_, context_) for id_ in ids_]
await asyncio.gather(*imports)
self._notification_finished()
self._complete()
except asyncio.CancelledError:
logger.debug("Importing %s cancelled", self._name)
finally:
self._import_in_progress = False
async def start(self, ids):
if self._import_in_progress:
raise ImportInProgress()
self._import_in_progress = True
try:
context = await self._prepare_context(ids)
self._task_manager.create_task(
self._import_elements(ids, context),
"{} import".format(self._name),
handle_exceptions=False
)
except:
self._import_in_progress = False
raise
class CollectionImporter(Importer):
def __init__(self, notification_partially_finished, *args):
super().__init__(*args)
self._notification_partially_finished = notification_partially_finished
async def _import_element(self, id_, context_):
try:
async for element in self._get(id_, context_):
self._notification_success(id_, element)
except ApplicationError as error:
self._notification_failure(id_, error)
except asyncio.CancelledError:
pass
except Exception:
logger.exception("Unexpected exception raised in %s importer", self._name)
self._notification_failure(id_, UnknownError())
finally:
self._notification_partially_finished(id_)

View File

@@ -6,6 +6,11 @@ import inspect
import json
from galaxy.reader import StreamLineReader
from galaxy.task_manager import TaskManager
logger = logging.getLogger(__name__)
class JsonRpcError(Exception):
def __init__(self, code, message, data=None):
@@ -17,6 +22,17 @@ class JsonRpcError(Exception):
def __eq__(self, other):
return self.code == other.code and self.message == other.message and self.data == other.data
def json(self):
obj = {
"code": self.code,
"message": self.message
}
if self.data is not None:
obj["data"] = self.data
return obj
class ParseError(JsonRpcError):
def __init__(self):
super().__init__(-32700, "Parse error")
@@ -52,7 +68,9 @@ class UnknownError(ApplicationError):
super().__init__(0, "Unknown error", data)
Request = namedtuple("Request", ["method", "params", "id"], defaults=[{}, None])
Method = namedtuple("Method", ["callback", "signature", "internal", "sensitive_params"])
Response = namedtuple("Response", ["id", "result", "error"], defaults=[None, {}, {}])
Method = namedtuple("Method", ["callback", "signature", "immediate", "sensitive_params"])
def anonymise_sensitive_params(params, sensitive_params):
anomized_data = "****"
@@ -66,7 +84,7 @@ def anonymise_sensitive_params(params, sensitive_params):
return params
class Server():
class Connection():
def __init__(self, reader, writer, encoder=json.JSONEncoder()):
self._active = True
self._reader = StreamLineReader(reader)
@@ -74,9 +92,11 @@ class Server():
self._encoder = encoder
self._methods = {}
self._notifications = {}
self._eof_listeners = []
self._task_manager = TaskManager("jsonrpc server")
self._last_request_id = 0
self._requests_futures = {}
def register_method(self, name, callback, internal, sensitive_params=False):
def register_method(self, name, callback, immediate, sensitive_params=False):
"""
Register method
@@ -86,9 +106,9 @@ class Server():
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._methods[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
self._methods[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params)
def register_notification(self, name, callback, internal, sensitive_params=False):
def register_notification(self, name, callback, immediate, sensitive_params=False):
"""
Register notification
@@ -98,10 +118,48 @@ class Server():
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._notifications[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
self._notifications[name] = Method(callback, inspect.signature(callback), immediate, sensitive_params)
def register_eof(self, callback):
self._eof_listeners.append(callback)
async def send_request(self, method, params, sensitive_params):
"""
Send request
:param method:
:param params:
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._last_request_id += 1
request_id = str(self._last_request_id)
loop = asyncio.get_running_loop()
future = loop.create_future()
self._requests_futures[self._last_request_id] = (future, sensitive_params)
logger.info(
"Sending request: id=%s, method=%s, params=%s",
request_id, method, anonymise_sensitive_params(params, sensitive_params)
)
self._send_request(request_id, method, params)
return await future
def send_notification(self, method, params, sensitive_params=False):
"""
Send notification
:param method:
:param params:
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
logger.info(
"Sending notification: method=%s, params=%s",
method, anonymise_sensitive_params(params, sensitive_params)
)
self._send_notification(method, params)
async def run(self):
while self._active:
@@ -114,38 +172,66 @@ class Server():
self._eof()
continue
data = data.strip()
logging.debug("Received %d bytes of data", len(data))
logger.debug("Received %d bytes of data", len(data))
self._handle_input(data)
await asyncio.sleep(0) # To not starve task queue
def stop(self):
self._active = False
def close(self):
if self._active:
logger.info("Closing JSON-RPC server - not more messages will be read")
self._active = False
async def wait_closed(self):
await self._task_manager.wait()
def _eof(self):
logging.info("Received EOF")
self.stop()
for listener in self._eof_listeners:
listener()
logger.info("Received EOF")
self.close()
def _handle_input(self, data):
try:
request = self._parse_request(data)
message = self._parse_message(data)
except JsonRpcError as error:
self._send_error(None, error)
return
if request.id is not None:
self._handle_request(request)
else:
self._handle_notification(request)
if isinstance(message, Request):
if message.id is not None:
self._handle_request(message)
else:
self._handle_notification(message)
elif isinstance(message, Response):
self._handle_response(message)
def _handle_response(self, response):
request_future = self._requests_futures.get(int(response.id))
if request_future is None:
response_type = "response" if response.result is not None else "error"
logger.warning("Received %s for unknown request: %s", response_type, response.id)
return
future, sensitive_params = request_future
if response.error:
error = JsonRpcError(
response.error.setdefault("code", 0),
response.error.setdefault("message", ""),
response.error.setdefault("data", None)
)
self._log_error(response, error, sensitive_params)
future.set_exception(error)
return
self._log_response(response, sensitive_params)
future.set_result(response.result)
def _handle_notification(self, request):
method = self._notifications.get(request.method)
if not method:
logging.error("Received unknown notification: %s", request.method)
logger.error("Received unknown notification: %s", request.method)
return
callback, signature, internal, sensitive_params = method
callback, signature, immediate, sensitive_params = method
self._log_request(request, sensitive_params)
try:
@@ -153,23 +239,22 @@ class Server():
except TypeError:
self._send_error(request.id, InvalidParams())
if internal:
# internal requests are handled immediately
if immediate:
callback(*bound_args.args, **bound_args.kwargs)
else:
try:
asyncio.create_task(callback(*bound_args.args, **bound_args.kwargs))
self._task_manager.create_task(callback(*bound_args.args, **bound_args.kwargs), request.method)
except Exception:
logging.exception("Unexpected exception raised in notification handler")
logger.exception("Unexpected exception raised in notification handler")
def _handle_request(self, request):
method = self._methods.get(request.method)
if not method:
logging.error("Received unknown request: %s", request.method)
logger.error("Received unknown request: %s", request.method)
self._send_error(request.id, MethodNotFound())
return
callback, signature, internal, sensitive_params = method
callback, signature, immediate, sensitive_params = method
self._log_request(request, sensitive_params)
try:
@@ -177,8 +262,7 @@ class Server():
except TypeError:
self._send_error(request.id, InvalidParams())
if internal:
# internal requests are handled immediately
if immediate:
response = callback(*bound_args.args, **bound_args.kwargs)
self._send_response(request.id, response)
else:
@@ -190,34 +274,42 @@ class Server():
self._send_error(request.id, MethodNotFound())
except JsonRpcError as error:
self._send_error(request.id, error)
except asyncio.CancelledError:
self._send_error(request.id, Aborted())
except Exception as e: #pylint: disable=broad-except
logging.exception("Unexpected exception raised in plugin handler")
logger.exception("Unexpected exception raised in plugin handler")
self._send_error(request.id, UnknownError(str(e)))
asyncio.create_task(handle())
self._task_manager.create_task(handle(), request.method)
@staticmethod
def _parse_request(data):
def _parse_message(data):
try:
jsonrpc_request = json.loads(data, encoding="utf-8")
if jsonrpc_request.get("jsonrpc") != "2.0":
jsonrpc_message = json.loads(data, encoding="utf-8")
if jsonrpc_message.get("jsonrpc") != "2.0":
raise InvalidRequest()
del jsonrpc_request["jsonrpc"]
return Request(**jsonrpc_request)
del jsonrpc_message["jsonrpc"]
if "result" in jsonrpc_message.keys() or "error" in jsonrpc_message.keys():
return Response(**jsonrpc_message)
else:
return Request(**jsonrpc_message)
except json.JSONDecodeError:
raise ParseError()
except TypeError:
raise InvalidRequest()
def _send(self, data):
def _send(self, data, sensitive=True):
try:
line = self._encoder.encode(data)
logging.debug("Sending data: %s", line)
data = (line + "\n").encode("utf-8")
if sensitive:
logger.debug("Sending %d bytes of data", len(data))
else:
logging.debug("Sending data: %s", line)
self._writer.write(data)
asyncio.create_task(self._writer.drain())
except TypeError as error:
logging.error(str(error))
logger.error(str(error))
def _send_response(self, request_id, result):
response = {
@@ -225,65 +317,51 @@ class Server():
"id": request_id,
"result": result
}
self._send(response)
self._send(response, sensitive=False)
def _send_error(self, request_id, error):
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": error.code,
"message": error.message
}
"error": error.json()
}
if error.data is not None:
response["error"]["data"] = error.data
self._send(response, sensitive=False)
self._send(response)
def _send_request(self, request_id, method, params):
request = {
"jsonrpc": "2.0",
"method": method,
"id": request_id,
"params": params
}
self._send(request, sensitive=True)
@staticmethod
def _log_request(request, sensitive_params):
params = anonymise_sensitive_params(request.params, sensitive_params)
if request.id is not None:
logging.info("Handling request: id=%s, method=%s, params=%s", request.id, request.method, params)
else:
logging.info("Handling notification: method=%s, params=%s", request.method, params)
class NotificationClient():
def __init__(self, writer, encoder=json.JSONEncoder()):
self._writer = writer
self._encoder = encoder
self._methods = {}
def notify(self, method, params, sensitive_params=False):
"""
Send notification
:param method:
:param params:
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
def _send_notification(self, method, params):
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params
}
self._log(method, params, sensitive_params)
self._send(notification)
def _send(self, data):
try:
line = self._encoder.encode(data)
data = (line + "\n").encode("utf-8")
logging.debug("Sending %d byte of data", len(data))
self._writer.write(data)
asyncio.create_task(self._writer.drain())
except TypeError as error:
logging.error("Failed to parse outgoing message: %s", str(error))
self._send(notification, sensitive=True)
@staticmethod
def _log(method, params, sensitive_params):
params = anonymise_sensitive_params(params, sensitive_params)
logging.info("Sending notification: method=%s, params=%s", method, params)
def _log_request(request, sensitive_params):
params = anonymise_sensitive_params(request.params, sensitive_params)
if request.id is not None:
logger.info("Handling request: id=%s, method=%s, params=%s", request.id, request.method, params)
else:
logger.info("Handling notification: method=%s, params=%s", request.method, params)
@staticmethod
def _log_response(response, sensitive_params):
result = anonymise_sensitive_params(response.result, sensitive_params)
logger.info("Handling response: id=%s, result=%s", response.id, result)
@staticmethod
def _log_error(response, error, sensitive_params):
params = error.data if error.data is not None else {}
data = anonymise_sensitive_params(params, sensitive_params)
logger.info("Handling error: id=%s, code=%s, description=%s, data=%s",
response.id, error.code, error.message, data
)

View File

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,11 @@
from dataclasses import dataclass
from typing import List, Dict, Optional
from typing import Dict, List, Optional
from galaxy.api.consts import LicenseType, LocalGameState, PresenceState, SubscriptionDiscovery
from galaxy.api.consts import LicenseType, LocalGameState
@dataclass
class Authentication():
class Authentication:
"""Return this from :meth:`.authenticate` or :meth:`.pass_login_credentials`
to inform the client that authentication has successfully finished.
@@ -14,8 +15,9 @@ class Authentication():
user_id: str
user_name: str
@dataclass
class Cookie():
class Cookie:
"""Cookie
:param name: name of the cookie
@@ -28,8 +30,9 @@ class Cookie():
domain: Optional[str] = None
path: Optional[str] = None
@dataclass
class NextStep():
class NextStep:
"""Return this from :meth:`.authenticate` or :meth:`.pass_login_credentials` to open client built-in browser with given url.
For example:
@@ -58,18 +61,20 @@ class NextStep():
if not stored_credentials:
return NextStep("web_session", PARAMS, cookies=COOKIES, js=JS)
:param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`, "window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`}
:param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`,
"window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`}
:param cookies: browser initial set of cookies
:param js: a map of the url regex patterns into the list of *js* scripts that should be executed on every document at given step of internal browser authentication.
:param js: a map of the url regex patterns into the list of *js* scripts that should be executed
on every document at given step of internal browser authentication.
"""
next_step: str
auth_params: Dict[str, str]
cookies: Optional[List[Cookie]] = None
js: Optional[Dict[str, List[str]]] = None
@dataclass
class LicenseInfo():
class LicenseInfo:
"""Information about the license of related product.
:param license_type: type of license
@@ -78,8 +83,9 @@ class LicenseInfo():
license_type: LicenseType
owner: Optional[str] = None
@dataclass
class Dlc():
class Dlc:
"""Downloadable content object.
:param dlc_id: id of the dlc
@@ -90,8 +96,9 @@ class Dlc():
dlc_title: str
license_info: LicenseInfo
@dataclass
class Game():
class Game:
"""Game object.
:param game_id: unique identifier of the game, this will be passed as parameter for methods such as launch_game
@@ -104,8 +111,9 @@ class Game():
dlcs: Optional[List[Dlc]]
license_info: LicenseInfo
@dataclass
class Achievement():
class Achievement:
"""Achievement, has to be initialized with either id or name.
:param unlock_time: unlock time of the achievement
@@ -120,8 +128,9 @@ class Achievement():
assert self.achievement_id or self.achievement_name, \
"One of achievement_id or achievement_name is required"
@dataclass
class LocalGame():
class LocalGame:
"""Game locally present on the authenticated user's computer.
:param game_id: id of the game
@@ -130,9 +139,14 @@ class LocalGame():
game_id: str
local_game_state: LocalGameState
@dataclass
class FriendInfo():
"""Information about a friend of the currently authenticated user.
class FriendInfo:
"""
.. deprecated:: 0.56
Use :class:`UserInfo`.
Information about a friend of the currently authenticated user.
:param user_id: id of the user
:param user_name: username of the user
@@ -140,15 +154,104 @@ class FriendInfo():
user_id: str
user_name: str
@dataclass
class GameTime():
class UserInfo:
"""Information about a user of related user.
:param user_id: id of the user
:param user_name: username of the user
:param avatar_url: the URL of the user avatar
:param profile_url: the URL of the user profile
"""
user_id: str
user_name: str
avatar_url: Optional[str]
profile_url: Optional[str]
@dataclass
class GameTime:
"""Game time of a game, defines the total time spent in the game
and the last time the game was played.
:param game_id: id of the related game
:param time_played: the total time spent in the game in **minutes**
:param last_time_played: last time the game was played (**unix timestamp**)
:param last_played_time: last time the game was played (**unix timestamp**)
"""
game_id: str
time_played: Optional[int]
last_played_time: Optional[int]
@dataclass
class GameLibrarySettings:
"""Library settings of a game, defines assigned tags and visibility flag.
:param game_id: id of the related game
:param tags: collection of tags assigned to the game
:param hidden: indicates if the game should be hidden in GOG Galaxy client
"""
game_id: str
tags: Optional[List[str]]
hidden: Optional[bool]
@dataclass
class UserPresence:
"""Presence information of a user.
The GOG Galaxy client will prefer to generate user status basing on `game_id` (or `game_title`)
and `in_game_status` fields but if plugin is not capable of delivering it then the `full_status` will be used if
available
:param presence_state: the state of the user
:param game_id: id of the game a user is currently in
:param game_title: name of the game a user is currently in
:param in_game_status: status set by the game itself e.x. "In Main Menu"
:param full_status: full user status e.x. "Playing <title_name>: <in_game_status>"
"""
presence_state: PresenceState
game_id: Optional[str] = None
game_title: Optional[str] = None
in_game_status: Optional[str] = None
full_status: Optional[str] = None
@dataclass
class Subscription:
"""Information about a subscription.
:param subscription_name: name of the subscription, will also be used as its identifier.
:param owned: whether the subscription is owned or not, None if unknown.
:param end_time: unix timestamp of when the subscription ends, None if unknown.
:param subscription_discovery: combination of settings that can be manually
chosen by user to determine subscription handling behaviour. For example, if the integration cannot retrieve games
for subscription when user doesn't own it, then USER_ENABLED should not be used.
If the integration cannot determine subscription ownership for a user then AUTOMATIC should not be used.
"""
subscription_name: str
owned: Optional[bool] = None
end_time: Optional[int] = None
subscription_discovery: SubscriptionDiscovery = SubscriptionDiscovery.AUTOMATIC | \
SubscriptionDiscovery.USER_ENABLED
def __post_init__(self):
assert self.subscription_discovery in [SubscriptionDiscovery.AUTOMATIC, SubscriptionDiscovery.USER_ENABLED,
SubscriptionDiscovery.AUTOMATIC | SubscriptionDiscovery.USER_ENABLED]
@dataclass
class SubscriptionGame:
"""Information about a game from a subscription.
:param game_title: title of the game
:param game_id: id of the game
:param start_time: unix timestamp of when the game has been added to subscription
:param end_time: unix timestamp of when the game will be removed from subscription.
"""
game_title: str
game_id: str
start_time: Optional[int] = None
end_time: Optional[int] = None

View File

@@ -1,3 +1,33 @@
"""
This module standardizes http traffic and the error handling for further communication with the GOG Galaxy 2.0.
It is recommended to use provided convenient methods for HTTP requests, especially when dealing with authorized sessions.
Exemplary simple web service could looks like:
.. code-block:: python
from galaxy.http import create_client_session, handle_exception
class BackendClient:
AUTH_URL = 'my-integration.com/auth'
HEADERS = {
"My-Custom-Header": "true",
}
def __init__(self):
self._session = create_client_session(headers=self.HEADERS)
async def authenticate(self):
await self._session.request('POST', self.AUTH_URL)
async def close(self):
# to be called on plugin shutdown
await self._session.close()
async def _authorized_request(self, method, url, *args, **kwargs):
with handle_exceptions():
return await self._session.request(method, url, *args, **kwargs)
"""
import asyncio
import ssl
from contextlib import contextmanager
@@ -13,17 +43,25 @@ from galaxy.api.errors import (
)
logger = logging.getLogger(__name__)
#: Default limit of the simultaneous connections for ssl connector.
DEFAULT_LIMIT = 20
DEFAULT_TIMEOUT = 60 # seconds
#: Default timeout in seconds used for client session.
DEFAULT_TIMEOUT = 60
class HttpClient:
"""Deprecated"""
"""
.. deprecated:: 0.41
Use http module functions instead
"""
def __init__(self, limit=DEFAULT_LIMIT, timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT), cookie_jar=None):
connector = create_tcp_connector(limit=limit)
self._session = create_client_session(connector=connector, timeout=timeout, cookie_jar=cookie_jar)
async def close(self):
"""Closes connection. Should be called in :meth:`~galaxy.api.plugin.Plugin.shutdown`"""
await self._session.close()
async def request(self, method, url, *args, **kwargs):
@@ -31,23 +69,52 @@ class HttpClient:
return await self._session.request(method, url, *args, **kwargs)
def create_tcp_connector(*args, **kwargs):
def create_tcp_connector(*args, **kwargs) -> aiohttp.TCPConnector:
"""
Creates TCP connector with reasonable defaults.
For details about available parameters refer to
`aiohttp.TCPConnector <https://docs.aiohttp.org/en/stable/client_reference.html#tcpconnector>`_
"""
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(certifi.where())
kwargs.setdefault("ssl", ssl_context)
kwargs.setdefault("limit", DEFAULT_LIMIT)
return aiohttp.TCPConnector(*args, **kwargs)
# due to https://github.com/python/mypy/issues/4001
return aiohttp.TCPConnector(*args, **kwargs) # type: ignore
def create_client_session(*args, **kwargs):
def create_client_session(*args, **kwargs) -> aiohttp.ClientSession:
"""
Creates client session with reasonable defaults.
For details about available parameters refer to
`aiohttp.ClientSession <https://docs.aiohttp.org/en/stable/client_reference.html>`_
Exemplary customization:
.. code-block:: python
from galaxy.http import create_client_session, create_tcp_connector
session = create_client_session(
headers={
"Keep-Alive": "true"
},
connector=create_tcp_connector(limit=40),
timeout=100)
"""
kwargs.setdefault("connector", create_tcp_connector())
kwargs.setdefault("timeout", aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT))
kwargs.setdefault("raise_for_status", True)
return aiohttp.ClientSession(*args, **kwargs)
# due to https://github.com/python/mypy/issues/4001
return aiohttp.ClientSession(*args, **kwargs) # type: ignore
@contextmanager
def handle_exception():
"""
Context manager translating network related exceptions
to custom :mod:`~galaxy.api.errors`.
"""
try:
yield
except asyncio.TimeoutError:
@@ -56,26 +123,25 @@ def handle_exception():
raise BackendNotAvailable()
except aiohttp.ClientConnectionError:
raise NetworkError()
except aiohttp.ContentTypeError:
raise UnknownBackendResponse()
except aiohttp.ContentTypeError as error:
raise UnknownBackendResponse(error.message)
except aiohttp.ClientResponseError as error:
if error.status == HTTPStatus.UNAUTHORIZED:
raise AuthenticationRequired()
raise AuthenticationRequired(error.message)
if error.status == HTTPStatus.FORBIDDEN:
raise AccessDenied()
raise AccessDenied(error.message)
if error.status == HTTPStatus.SERVICE_UNAVAILABLE:
raise BackendNotAvailable()
raise BackendNotAvailable(error.message)
if error.status == HTTPStatus.TOO_MANY_REQUESTS:
raise TooManyRequests()
raise TooManyRequests(error.message)
if error.status >= 500:
raise BackendError()
raise BackendError(error.message)
if error.status >= 400:
logging.warning(
logger.warning(
"Got status %d while performing %s request for %s",
error.status, error.request_info.method, str(error.request_info.url)
)
raise UnknownError()
except aiohttp.ClientError:
logging.exception("Caught exception while performing request")
raise UnknownError()
raise UnknownError(error.message)
except aiohttp.ClientError as e:
logger.exception("Caught exception while performing request")
raise UnknownError(repr(e))

87
src/galaxy/proc_tools.py Normal file
View File

@@ -0,0 +1,87 @@
import sys
from dataclasses import dataclass
from typing import Iterable, NewType, Optional, List, cast
ProcessId = NewType("ProcessId", int)
@dataclass
class ProcessInfo:
pid: ProcessId
binary_path: Optional[str]
if sys.platform == "win32":
from ctypes import byref, sizeof, windll, create_unicode_buffer, FormatError, WinError
from ctypes.wintypes import DWORD
def pids() -> Iterable[ProcessId]:
_PROC_ID_T = DWORD
list_size = 4096
def try_get_pids(list_size: int) -> List[ProcessId]:
result_size = DWORD()
proc_id_list = (_PROC_ID_T * list_size)()
if not windll.psapi.EnumProcesses(byref(proc_id_list), sizeof(proc_id_list), byref(result_size)):
raise WinError(descr="Failed to get process ID list: %s" % FormatError()) # type: ignore
return cast(List[ProcessId], proc_id_list[:int(result_size.value / sizeof(_PROC_ID_T()))])
while True:
proc_ids = try_get_pids(list_size)
if len(proc_ids) < list_size:
return proc_ids
list_size *= 2
def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]:
_PROC_QUERY_LIMITED_INFORMATION = 0x1000
process_info = ProcessInfo(pid=pid, binary_path=None)
h_process = windll.kernel32.OpenProcess(_PROC_QUERY_LIMITED_INFORMATION, False, pid)
if not h_process:
return process_info
try:
def get_exe_path() -> Optional[str]:
_MAX_PATH = 260
_WIN32_PATH_FORMAT = 0x0000
exe_path_buffer = create_unicode_buffer(_MAX_PATH)
exe_path_len = DWORD(len(exe_path_buffer))
return cast(str, exe_path_buffer[:exe_path_len.value]) if windll.kernel32.QueryFullProcessImageNameW(
h_process, _WIN32_PATH_FORMAT, exe_path_buffer, byref(exe_path_len)
) else None
process_info.binary_path = get_exe_path()
finally:
windll.kernel32.CloseHandle(h_process)
return process_info
else:
import psutil
def pids() -> Iterable[ProcessId]:
for pid in psutil.pids():
yield pid
def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]:
process_info = ProcessInfo(pid=pid, binary_path=None)
try:
process_info.binary_path = psutil.Process(pid=pid).as_dict(attrs=["exe"])["exe"]
except psutil.NoSuchProcess:
pass
finally:
return process_info
def process_iter() -> Iterable[Optional[ProcessInfo]]:
for pid in pids():
yield get_process_info(pid)

View File

@@ -12,7 +12,7 @@ class StreamLineReader:
while True:
# check if there is no unprocessed data in the buffer
if not self._buffer or self._processed_buffer_it != 0:
chunk = await self._reader.read(1024)
chunk = await self._reader.read(1024*1024)
if not chunk:
return bytes() # EOF
self._buffer += chunk

View File

@@ -0,0 +1,99 @@
import sys
if sys.platform == "win32":
import logging
import ctypes
from ctypes.wintypes import LONG, HKEY, LPCWSTR, DWORD, BOOL, HANDLE, LPVOID
LPSECURITY_ATTRIBUTES = LPVOID
RegOpenKeyEx = ctypes.windll.advapi32.RegOpenKeyExW
RegOpenKeyEx.restype = LONG
RegOpenKeyEx.argtypes = [HKEY, LPCWSTR, DWORD, DWORD, ctypes.POINTER(HKEY)]
RegCloseKey = ctypes.windll.advapi32.RegCloseKey
RegCloseKey.restype = LONG
RegCloseKey.argtypes = [HKEY]
RegNotifyChangeKeyValue = ctypes.windll.advapi32.RegNotifyChangeKeyValue
RegNotifyChangeKeyValue.restype = LONG
RegNotifyChangeKeyValue.argtypes = [HKEY, BOOL, DWORD, HANDLE, BOOL]
CloseHandle = ctypes.windll.kernel32.CloseHandle
CloseHandle.restype = BOOL
CloseHandle.argtypes = [HANDLE]
CreateEvent = ctypes.windll.kernel32.CreateEventW
CreateEvent.restype = BOOL
CreateEvent.argtypes = [LPSECURITY_ATTRIBUTES, BOOL, BOOL, LPCWSTR]
WaitForSingleObject = ctypes.windll.kernel32.WaitForSingleObject
WaitForSingleObject.restype = DWORD
WaitForSingleObject.argtypes = [HANDLE, DWORD]
ERROR_SUCCESS = 0x00000000
KEY_READ = 0x00020019
KEY_QUERY_VALUE = 0x00000001
REG_NOTIFY_CHANGE_NAME = 0x00000001
REG_NOTIFY_CHANGE_LAST_SET = 0x00000004
WAIT_OBJECT_0 = 0x00000000
WAIT_TIMEOUT = 0x00000102
class RegistryMonitor:
def __init__(self, root, subkey):
self._root = root
self._subkey = subkey
self._event = CreateEvent(None, False, False, None)
self._key = None
self._open_key()
if self._key:
self._set_key_update_notification()
def close(self):
CloseHandle(self._event)
if self._key:
RegCloseKey(self._key)
self._key = None
def is_updated(self):
wait_result = WaitForSingleObject(self._event, 0)
# previously watched
if wait_result == WAIT_OBJECT_0:
self._set_key_update_notification()
return True
# no changes or no key before
if wait_result != WAIT_TIMEOUT:
# unexpected error
logging.warning("Unexpected WaitForSingleObject result %s", wait_result)
return False
if self._key is None:
self._open_key()
if self._key is not None:
self._set_key_update_notification()
return False
def _set_key_update_notification(self):
filter_ = REG_NOTIFY_CHANGE_NAME | REG_NOTIFY_CHANGE_LAST_SET
status = RegNotifyChangeKeyValue(self._key, True, filter_, self._event, True)
if status != ERROR_SUCCESS:
# key was deleted
RegCloseKey(self._key)
self._key = None
def _open_key(self):
access = KEY_QUERY_VALUE | KEY_READ
self._key = HKEY()
rc = RegOpenKeyEx(self._root, self._subkey, 0, access, ctypes.byref(self._key))
if rc != ERROR_SUCCESS:
self._key = None

View File

@@ -0,0 +1,53 @@
import asyncio
import logging
from collections import OrderedDict
from itertools import count
logger = logging.getLogger(__name__)
class TaskManager:
def __init__(self, name):
self._name = name
self._tasks = OrderedDict()
self._task_counter = count()
def create_task(self, coro, description, handle_exceptions=True):
"""Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown"""
async def task_wrapper(task_id):
try:
result = await coro
logger.debug("Task manager %s: finished task %d (%s)", self._name, task_id, description)
return result
except asyncio.CancelledError:
if handle_exceptions:
logger.debug("Task manager %s: canceled task %d (%s)", self._name, task_id, description)
else:
raise
except Exception:
if handle_exceptions:
logger.exception("Task manager %s: exception raised in task %d (%s)", self._name, task_id, description)
else:
raise
finally:
del self._tasks[task_id]
task_id = next(self._task_counter)
logger.debug("Task manager %s: creating task %d (%s)", self._name, task_id, description)
task = asyncio.create_task(task_wrapper(task_id))
self._tasks[task_id] = task
return task
def cancel(self):
for task in self._tasks.values():
task.cancel()
async def wait(self):
# Tasks can spawn other tasks
while True:
tasks = self._tasks.values()
if not tasks:
return
await asyncio.gather(*tasks, return_exceptions=True)

View File

@@ -3,6 +3,7 @@ import os
import zipfile
from glob import glob
def zip_folder(folder):
files = glob(os.path.join(folder, "**"), recursive=True)
files = [file.replace(folder + os.sep, "") for file in files]
@@ -14,6 +15,7 @@ def zip_folder(folder):
zipf.write(os.path.join(folder, file), arcname=file)
return zip_buffer
def zip_folder_to_file(folder, filename):
zip_content = zip_folder(folder).getbuffer()
with open(filename, "wb") as archive:

View File

@@ -1,12 +1,39 @@
from asyncio import coroutine
import asyncio
from unittest.mock import MagicMock
class AsyncMock(MagicMock):
"""
.. deprecated:: 0.45
Use: :class:`MagicMock` with meth:`~.async_return_value`.
"""
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)
def coroutine_mock():
"""
.. deprecated:: 0.45
Use: :class:`MagicMock` with meth:`~.async_return_value`.
"""
coro = MagicMock(name="CoroutineResult")
corofunc = MagicMock(name="CoroutineFunction", side_effect=coroutine(coro))
corofunc = MagicMock(name="CoroutineFunction", side_effect=asyncio.coroutine(coro))
corofunc.coro = coro
return corofunc
return corofunc
async def skip_loop(iterations=1):
for _ in range(iterations):
await asyncio.sleep(0)
async def async_return_value(return_value, loop_iterations_delay=0):
if loop_iterations_delay > 0:
await skip_loop(loop_iterations_delay)
return return_value
async def async_raise(error, loop_iterations_delay=0):
if loop_iterations_delay > 0:
await skip_loop(loop_iterations_delay)
raise error

View File

@@ -0,0 +1,16 @@
import json
def create_message(request):
return json.dumps(request).encode() + b"\n"
def get_messages(write_mock):
messages = []
for call_args in write_mock.call_args_list:
data = call_args[0][0]
for line in data.splitlines():
message = json.loads(line)
messages.append(message)
return messages

View File

@@ -1,62 +1,86 @@
from contextlib import ExitStack
import logging
from unittest.mock import patch, MagicMock
from contextlib import ExitStack
from unittest.mock import MagicMock, patch
import pytest
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
from galaxy.unittest.mock import AsyncMock, coroutine_mock
from galaxy.api.plugin import Plugin
from galaxy.unittest.mock import async_return_value
@pytest.fixture()
def reader():
stream = MagicMock(name="stream_reader")
stream.read = AsyncMock()
stream.read = MagicMock()
yield stream
@pytest.fixture()
def writer():
async def writer():
stream = MagicMock(name="stream_writer")
stream.write = MagicMock()
stream.drain = AsyncMock()
stream.drain.side_effect = lambda: async_return_value(None)
yield stream
@pytest.fixture()
def read(reader):
yield reader.read
@pytest.fixture()
def write(writer):
yield writer.write
@pytest.fixture()
def plugin(reader, writer):
async def plugin(reader, writer):
"""Return plugin instance with all feature methods mocked"""
async_methods = (
methods = (
"handshake_complete",
"authenticate",
"get_owned_games",
"prepare_achievements_context",
"get_unlocked_achievements",
"achievements_import_complete",
"get_local_games",
"launch_game",
"launch_platform_client",
"install_game",
"uninstall_game",
"get_friends",
"get_game_times",
"shutdown_platform_client"
)
methods = (
"get_game_time",
"prepare_game_times_context",
"game_times_import_complete",
"shutdown_platform_client",
"shutdown",
"tick"
"tick",
"get_game_library_settings",
"prepare_game_library_settings_context",
"game_library_settings_import_complete",
"get_os_compatibility",
"prepare_os_compatibility_context",
"os_compatibility_import_complete",
"get_user_presence",
"prepare_user_presence_context",
"user_presence_import_complete",
"get_local_size",
"prepare_local_size_context",
"local_size_import_complete",
"get_subscriptions",
"get_subscription_games",
"prepare_subscription_games_context",
"subscription_games_import_complete"
)
with ExitStack() as stack:
for method in async_methods:
stack.enter_context(patch.object(Plugin, method, new_callable=coroutine_mock))
for method in methods:
stack.enter_context(patch.object(Plugin, method))
yield Plugin(Platform.Generic, "0.1", reader, writer, "token")
async with Plugin(Platform.Generic, "0.1", reader, writer, "token") as plugin:
plugin.shutdown.return_value = async_return_value(None)
yield plugin
@pytest.fixture(autouse=True)
def my_caplog(caplog):

View File

@@ -1,94 +1,207 @@
import asyncio
import json
from unittest.mock import call
import pytest
from pytest import raises
from galaxy.api.types import Achievement
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError
from galaxy.api.errors import BackendError
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
def test_initialization_no_unlock_time():
with raises(Exception):
Achievement(achievement_id="lvl30", achievement_name="Got level 30")
def test_initialization_no_id_nor_name():
with raises(AssertionError):
Achievement(unlock_time=1234567890)
def test_success(plugin, read, write):
@pytest.mark.asyncio
async def test_get_unlocked_achievements_success(plugin, read, write):
plugin.prepare_achievements_context.return_value = async_return_value(5)
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_unlocked_achievements",
"method": "start_achievements_import",
"params": {
"game_id": "14"
"game_ids": ["14"]
}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_unlocked_achievements.coro.return_value = [
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_unlocked_achievements.return_value = async_return_value([
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395),
Achievement(achievement_id="lvl30", achievement_name="Got level 30", unlock_time=1548495633)
]
asyncio.run(plugin.run())
plugin.get_unlocked_achievements.assert_called_with(game_id="14")
response = json.loads(write.call_args[0][0])
])
await plugin.run()
plugin.prepare_achievements_context.assert_called_with(["14"])
plugin.get_unlocked_achievements.assert_called_with("14", 5)
plugin.achievements_import_complete.asert_called_with()
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"unlocked_achievements": [
{
"achievement_id": "lvl10",
"unlock_time": 1548421241
},
{
"achievement_name": "Got level 20",
"unlock_time": 1548422395
},
{
"achievement_id": "lvl30",
"achievement_name": "Got level 30",
"unlock_time": 1548495633
}
]
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "game_achievements_import_success",
"params": {
"game_id": "14",
"unlocked_achievements": [
{
"achievement_id": "lvl10",
"unlock_time": 1548421241
},
{
"achievement_name": "Got level 20",
"unlock_time": 1548422395
},
{
"achievement_id": "lvl30",
"achievement_name": "Got level 30",
"unlock_time": 1548495633
}
]
}
},
{
"jsonrpc": "2.0",
"method": "achievements_import_finished",
"params": None
}
}
]
def test_failure(plugin, read, write):
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_unlocked_achievements_error(exception, code, message, plugin, read, write):
plugin.prepare_achievements_context.return_value = async_return_value(None)
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_unlocked_achievements",
"method": "start_achievements_import",
"params": {
"game_id": "14"
"game_ids": ["14"]
}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_unlocked_achievements.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_unlocked_achievements.side_effect = exception
await plugin.run()
plugin.get_unlocked_achievements.assert_called()
response = json.loads(write.call_args[0][0])
plugin.achievements_import_complete.asert_called_with()
assert response == {
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "game_achievements_import_failure",
"params": {
"game_id": "14",
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "achievements_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_unlocked_achievements_context_error(plugin, read, write):
plugin.prepare_achievements_context.side_effect = BackendError()
request = {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error"
"method": "start_achievements_import",
"params": {
"game_ids": ["14"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
def test_unlock_achievement(plugin, write):
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 4,
"message": "Backend error"
}
}
]
@pytest.mark.asyncio
async def test_import_in_progress(plugin, read, write):
plugin.prepare_achievements_context.return_value = async_return_value(None)
plugin.get_unlocked_achievements.return_value = async_return_value([])
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_achievements_import",
"params": {
"game_ids": ["14"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_achievements_import",
"params": {
"game_ids": ["15"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"", 10)
]
await plugin.run()
messages = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in messages
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
} in messages
@pytest.mark.asyncio
async def test_unlock_achievement(plugin, write):
achievement = Achievement(achievement_id="lvl20", unlock_time=1548422395)
async def couritine():
plugin.unlock_achievement("14", achievement)
asyncio.run(couritine())
plugin.unlock_achievement("14", achievement)
await skip_loop()
response = json.loads(write.call_args[0][0])
assert response == {
@@ -102,92 +215,3 @@ def test_unlock_achievement(plugin, write):
}
}
}
@pytest.mark.asyncio
async def test_game_achievements_import_success(plugin, write):
achievements = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395)
]
plugin.game_achievements_import_success("134", achievements)
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_achievements_import_success",
"params": {
"game_id": "134",
"unlocked_achievements": [
{
"achievement_id": "lvl10",
"unlock_time": 1548421241
},
{
"achievement_name": "Got level 20",
"unlock_time": 1548422395
}
]
}
}
@pytest.mark.asyncio
async def test_game_achievements_import_failure(plugin, write):
plugin.game_achievements_import_failure("134", ImportInProgress())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_achievements_import_failure",
"params": {
"game_id": "134",
"error": {
"code": 600,
"message": "Import already in progress"
}
}
}
@pytest.mark.asyncio
async def test_achievements_import_finished(plugin, write):
plugin.achievements_import_finished()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "achievements_import_finished",
"params": None
}
@pytest.mark.asyncio
async def test_start_achievements_import(plugin, write, mocker):
game_achievements_import_success = mocker.patch.object(plugin, "game_achievements_import_success")
game_achievements_import_failure = mocker.patch.object(plugin, "game_achievements_import_failure")
achievements_import_finished = mocker.patch.object(plugin, "achievements_import_finished")
game_ids = ["1", "5", "9"]
error = BackendError()
achievements = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395)
]
plugin.get_unlocked_achievements.coro.side_effect = [
achievements,
[],
error
]
await plugin.start_achievements_import(game_ids)
with pytest.raises(ImportInProgress):
await plugin.start_achievements_import(["4", "8"])
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_unlocked_achievements.coro.assert_has_calls([call("1"), call("5"), call("9")])
game_achievements_import_success.assert_has_calls([
call("1", achievements),
call("5", [])
])
game_achievements_import_failure.assert_called_once_with("9", error)
achievements_import_finished.assert_called_once_with()

View File

@@ -1,6 +1,3 @@
import asyncio
import json
import pytest
from galaxy.api.types import Authentication
@@ -8,29 +5,36 @@ from galaxy.api.errors import (
UnknownError, InvalidCredentials, NetworkError, LoggedInElsewhere, ProtocolError,
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied
)
from galaxy.unittest.mock import async_return_value, skip_loop
def test_success(plugin, read, write):
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek")
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.authenticate.return_value = async_return_value(Authentication("132", "Zenek"))
await plugin.run()
plugin.authenticate.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"user_id": "132",
"user_name": "Zenek"
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": {
"user_id": "132",
"user_name": "Zenek"
}
}
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("error,code,message", [
pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"),
pytest.param(BackendNotAvailable, 2, "Backend not available", id="backend_not_available"),
@@ -44,29 +48,32 @@ def test_success(plugin, read, write):
pytest.param(Banned, 105, "Banned", id="banned"),
pytest.param(AccessDenied, 106, "Access denied", id="access_denied"),
])
def test_failure(plugin, read, write, error, code, message):
async def test_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.side_effect = error()
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.authenticate.side_effect = error()
await plugin.run()
plugin.authenticate.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": code,
"message": message
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": code,
"message": message
}
}
}
]
def test_stored_credentials(plugin, read, write):
@pytest.mark.asyncio
async def test_stored_credentials(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -77,39 +84,39 @@ def test_stored_credentials(plugin, read, write):
}
}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek")
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.authenticate.return_value = async_return_value(Authentication("132", "Zenek"))
await plugin.run()
plugin.authenticate.assert_called_with(stored_credentials={"token": "ABC"})
write.assert_called()
def test_store_credentials(plugin, write):
@pytest.mark.asyncio
async def test_store_credentials(plugin, write):
credentials = {
"token": "ABC"
}
plugin.store_credentials(credentials)
await skip_loop()
async def couritine():
plugin.store_credentials(credentials)
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "store_credentials",
"params": credentials
}
]
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "store_credentials",
"params": credentials
}
@pytest.mark.asyncio
async def test_lost_authentication(plugin, write):
plugin.lost_authentication()
await skip_loop()
def test_lost_authentication(plugin, write):
async def couritine():
plugin.lost_authentication()
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "authentication_lost",
"params": None
}
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "authentication_lost",
"params": None
}
]

View File

@@ -1,7 +1,12 @@
import asyncio
import json
def test_chunked_messages(plugin, read):
import pytest
from galaxy.unittest.mock import async_return_value
@pytest.mark.asyncio
async def test_chunked_messages(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
@@ -11,11 +16,13 @@ def test_chunked_messages(plugin, read):
}
message = json.dumps(request).encode() + b"\n"
read.side_effect = [message[:5], message[5:], b""]
asyncio.run(plugin.run())
read.side_effect = [async_return_value(message[:5]), async_return_value(message[5:]), async_return_value(b"")]
await plugin.run()
plugin.install_game.assert_called_with(game_id="3")
def test_joined_messages(plugin, read):
@pytest.mark.asyncio
async def test_joined_messages(plugin, read):
requests = [
{
"jsonrpc": "2.0",
@@ -34,12 +41,14 @@ def test_joined_messages(plugin, read):
]
data = b"".join([json.dumps(request).encode() + b"\n" for request in requests])
read.side_effect = [data, b""]
asyncio.run(plugin.run())
read.side_effect = [async_return_value(data), async_return_value(b"")]
await plugin.run()
plugin.install_game.assert_called_with(game_id="3")
plugin.launch_game.assert_called_with(game_id="3")
def test_not_finished(plugin, read):
@pytest.mark.asyncio
async def test_not_finished(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
@@ -49,6 +58,6 @@ def test_not_finished(plugin, read):
}
message = json.dumps(request).encode() # no new line
read.side_effect = [message, b""]
asyncio.run(plugin.run())
read.side_effect = [async_return_value(message), async_return_value(b"")]
await plugin.run()
plugin.install_game.assert_not_called()

View File

@@ -1,21 +1,56 @@
from galaxy.api.consts import Feature, Platform
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform, Feature
def test_base_class():
plugin = Plugin(Platform.Generic, "0.1", None, None, None)
assert plugin.features == []
assert set(plugin.features) == {
Feature.ImportInstalledGames,
Feature.ImportOwnedGames,
Feature.LaunchGame,
Feature.InstallGame,
Feature.UninstallGame,
Feature.ImportAchievements,
Feature.ImportGameTime,
Feature.ImportFriends,
Feature.ShutdownPlatformClient,
Feature.LaunchPlatformClient,
Feature.ImportGameLibrarySettings,
Feature.ImportOSCompatibility,
Feature.ImportUserPresence,
Feature.ImportLocalSize,
Feature.ImportSubscriptions,
Feature.ImportSubscriptionGames
}
def test_no_overloads():
class PluginImpl(Plugin): #pylint: disable=abstract-method
class PluginImpl(Plugin): # pylint: disable=abstract-method
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == []
def test_one_method_feature():
class PluginImpl(Plugin): #pylint: disable=abstract-method
class PluginImpl(Plugin): # pylint: disable=abstract-method
async def get_owned_games(self):
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [Feature.ImportOwnedGames]
assert plugin.features == [Feature.ImportOwnedGames]
def test_multi_features():
class PluginImpl(Plugin): # pylint: disable=abstract-method
async def get_owned_games(self):
pass
async def get_unlocked_achievements(self, game_id, context):
pass
async def get_game_time(self, game_id, context):
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert set(plugin.features) == {Feature.ImportAchievements, Feature.ImportOwnedGames, Feature.ImportGameTime}

View File

@@ -1,90 +1,124 @@
import asyncio
import json
from galaxy.api.types import FriendInfo
from galaxy.api.types import UserInfo
from galaxy.api.errors import UnknownError
from galaxy.unittest.mock import async_return_value, skip_loop
import pytest
from tests import create_message, get_messages
def test_get_friends_success(plugin, read, write):
@pytest.mark.asyncio
async def test_get_friends_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_friends.coro.return_value = [
FriendInfo("3", "Jan"),
FriendInfo("5", "Ola")
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_friends.return_value = async_return_value([
UserInfo("3", "Jan", "https://avatar.url/u3", None),
UserInfo("5", "Ola", None, "https://profile.url/u5")
])
await plugin.run()
plugin.get_friends.assert_called_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": {
"friend_info_list": [
{"user_id": "3", "user_name": "Jan", "avatar_url": "https://avatar.url/u3"},
{"user_id": "5", "user_name": "Ola", "profile_url": "https://profile.url/u5"}
]
}
}
]
asyncio.run(plugin.run())
plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"friend_info_list": [
{"user_id": "3", "user_name": "Jan"},
{"user_id": "5", "user_name": "Ola"}
]
}
}
def test_get_friends_failure(plugin, read, write):
@pytest.mark.asyncio
async def test_get_friends_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_friends.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_friends.side_effect = UnknownError()
await plugin.run()
plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error",
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error",
}
}
}
]
def test_add_friend(plugin, write):
friend = FriendInfo("7", "Kuba")
@pytest.mark.asyncio
async def test_add_friend(plugin, write):
friend = UserInfo("7", "Kuba", avatar_url="https://avatar.url/kuba.jpg", profile_url="https://profile.url/kuba")
async def couritine():
plugin.add_friend(friend)
plugin.add_friend(friend)
await skip_loop()
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_added",
"params": {
"friend_info": {"user_id": "7", "user_name": "Kuba"}
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "friend_added",
"params": {
"friend_info": {
"user_id": "7",
"user_name": "Kuba",
"avatar_url": "https://avatar.url/kuba.jpg",
"profile_url": "https://profile.url/kuba"
}
}
}
}
]
def test_remove_friend(plugin, write):
async def couritine():
plugin.remove_friend("5")
@pytest.mark.asyncio
async def test_remove_friend(plugin, write):
plugin.remove_friend("5")
await skip_loop()
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_removed",
"params": {
"user_id": "5"
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "friend_removed",
"params": {
"user_id": "5"
}
}
}
]
@pytest.mark.asyncio
async def test_update_friend_info(plugin, write):
plugin.update_friend_info(
UserInfo("7", "Jakub", avatar_url="https://new-avatar.url/kuba2.jpg", profile_url="https://profile.url/kuba")
)
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "friend_updated",
"params": {
"friend_info": {
"user_id": "7",
"user_name": "Jakub",
"avatar_url": "https://new-avatar.url/kuba2.jpg",
"profile_url": "https://profile.url/kuba"
}
}
}
]

View File

@@ -0,0 +1,196 @@
from unittest.mock import call
import pytest
from galaxy.api.types import GameLibrarySettings
from galaxy.api.errors import BackendError
from galaxy.unittest.mock import async_return_value
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_library_settings_success(plugin, read, write):
plugin.prepare_game_library_settings_context.return_value = async_return_value("abc")
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "start_game_library_settings_import",
"params": {
"game_ids": ["3", "5", "7"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_game_library_settings.side_effect = [
async_return_value(GameLibrarySettings("3", None, True)),
async_return_value(GameLibrarySettings("5", [], False)),
async_return_value(GameLibrarySettings("7", ["tag1", "tag2", "tag3"], None)),
]
await plugin.run()
plugin.get_game_library_settings.assert_has_calls([
call("3", "abc"),
call("5", "abc"),
call("7", "abc"),
])
plugin.game_library_settings_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "game_library_settings_import_success",
"params": {
"game_library_settings": {
"game_id": "3",
"hidden": True
}
}
},
{
"jsonrpc": "2.0",
"method": "game_library_settings_import_success",
"params": {
"game_library_settings": {
"game_id": "5",
"tags": [],
"hidden": False
}
}
},
{
"jsonrpc": "2.0",
"method": "game_library_settings_import_success",
"params": {
"game_library_settings": {
"game_id": "7",
"tags": ["tag1", "tag2", "tag3"]
}
}
},
{
"jsonrpc": "2.0",
"method": "game_library_settings_import_finished",
"params": None
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_game_library_settings_error(exception, code, message, plugin, read, write):
plugin.prepare_game_library_settings_context.return_value = async_return_value(None)
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "start_game_library_settings_import",
"params": {
"game_ids": ["6"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_game_library_settings.side_effect = exception
await plugin.run()
plugin.get_game_library_settings.assert_called()
plugin.game_library_settings_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "game_library_settings_import_failure",
"params": {
"game_id": "6",
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "game_library_settings_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_game_library_settings_context_error(plugin, read, write):
plugin.prepare_game_library_settings_context.side_effect = BackendError()
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "start_game_library_settings_import",
"params": {
"game_ids": ["6"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 4,
"message": "Backend error"
}
}
]
@pytest.mark.asyncio
async def test_import_in_progress(plugin, read, write):
plugin.prepare_game_library_settings_context.return_value = async_return_value(None)
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_game_library_settings_import",
"params": {
"game_ids": ["6"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_game_library_settings_import",
"params": {
"game_ids": ["7"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"", 10)
]
await plugin.run()
messages = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in messages
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
} in messages

View File

@@ -1,179 +1,216 @@
import asyncio
import json
from unittest.mock import call
import pytest
from galaxy.api.types import GameTime
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError
from galaxy.api.errors import BackendError
from galaxy.unittest.mock import async_return_value, skip_loop
def test_success(plugin, read, write):
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_game_time_success(plugin, read, write):
plugin.prepare_game_times_context.return_value = async_return_value("abc")
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_game_times"
"method": "start_game_times_import",
"params": {
"game_ids": ["3", "5", "7"]
}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_game_times.coro.return_value = [
GameTime("3", 60, 1549550504),
GameTime("5", 10, None),
GameTime("7", None, 1549550502),
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_game_time.side_effect = [
async_return_value(GameTime("3", 60, 1549550504)),
async_return_value(GameTime("5", 10, None)),
async_return_value(GameTime("7", None, 1549550502)),
]
asyncio.run(plugin.run())
plugin.get_game_times.assert_called_with()
response = json.loads(write.call_args[0][0])
await plugin.run()
plugin.get_game_time.assert_has_calls([
call("3", "abc"),
call("5", "abc"),
call("7", "abc"),
])
plugin.game_times_import_complete.assert_called_once_with()
assert response == {
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "3",
"last_played_time": 1549550504,
"time_played": 60
}
}
},
{
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "5",
"time_played": 10
}
}
},
{
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "7",
"last_played_time": 1549550502
}
}
},
{
"jsonrpc": "2.0",
"method": "game_times_import_finished",
"params": None
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_game_time_error(exception, code, message, plugin, read, write):
plugin.prepare_game_times_context.return_value = async_return_value(None)
request = {
"jsonrpc": "2.0",
"id": "3",
"result": {
"game_times": [
{
"method": "start_game_times_import",
"params": {
"game_ids": ["6"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_game_time.side_effect = exception
await plugin.run()
plugin.get_game_time.assert_called()
plugin.game_times_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "game_time_import_failure",
"params": {
"game_id": "6",
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "game_times_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_game_time_context_error(plugin, read, write):
plugin.prepare_game_times_context.side_effect = BackendError()
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "start_game_times_import",
"params": {
"game_ids": ["6"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 4,
"message": "Backend error"
}
}
]
@pytest.mark.asyncio
async def test_import_in_progress(plugin, read, write):
plugin.prepare_game_times_context.return_value = async_return_value(None)
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_game_times_import",
"params": {
"game_ids": ["6"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_game_times_import",
"params": {
"game_ids": ["7"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"", 10)
]
await plugin.run()
messages = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in messages
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
} in messages
@pytest.mark.asyncio
async def test_update_game(plugin, write):
game_time = GameTime("3", 60, 1549550504)
plugin.update_game_time(game_time)
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "game_time_updated",
"params": {
"game_time": {
"game_id": "3",
"time_played": 60,
"last_played_time": 1549550504
},
{
"game_id": "5",
"time_played": 10,
},
{
"game_id": "7",
"last_played_time": 1549550502
}
]
}
}
def test_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_game_times"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_game_times.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_game_times.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error",
}
}
def test_update_game(plugin, write):
game_time = GameTime("3", 60, 1549550504)
async def couritine():
plugin.update_game_time(game_time)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_updated",
"params": {
"game_time": {
"game_id": "3",
"time_played": 60,
"last_played_time": 1549550504
}
}
}
@pytest.mark.asyncio
async def test_game_time_import_success(plugin, write):
plugin.game_time_import_success(GameTime("3", 60, 1549550504))
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "3",
"time_played": 60,
"last_played_time": 1549550504
}
}
}
@pytest.mark.asyncio
async def test_game_time_import_failure(plugin, write):
plugin.game_time_import_failure("134", ImportInProgress())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_import_failure",
"params": {
"game_id": "134",
"error": {
"code": 600,
"message": "Import already in progress"
}
}
}
@pytest.mark.asyncio
async def test_game_times_import_finished(plugin, write):
plugin.game_times_import_finished()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_times_import_finished",
"params": None
}
@pytest.mark.asyncio
async def test_start_game_times_import(plugin, write, mocker):
game_time_import_success = mocker.patch.object(plugin, "game_time_import_success")
game_time_import_failure = mocker.patch.object(plugin, "game_time_import_failure")
game_times_import_finished = mocker.patch.object(plugin, "game_times_import_finished")
game_ids = ["1", "5"]
game_time = GameTime("1", 10, 1549550502)
plugin.get_game_times.coro.return_value = [
game_time
]
await plugin.start_game_times_import(game_ids)
with pytest.raises(ImportInProgress):
await plugin.start_game_times_import(["4", "8"])
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_game_times.coro.assert_called_once_with()
game_time_import_success.assert_called_once_with(game_time)
game_time_import_failure.assert_called_once_with("5", UnknownError())
game_times_import_finished.assert_called_once_with()
@pytest.mark.asyncio
async def test_start_game_times_import_failure(plugin, write, mocker):
game_time_import_failure = mocker.patch.object(plugin, "game_time_import_failure")
game_times_import_finished = mocker.patch.object(plugin, "game_times_import_finished")
game_ids = ["1", "5"]
error = BackendError()
plugin.get_game_times.coro.side_effect = error
await plugin.start_game_times_import(game_ids)
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_game_times.coro.assert_called_once_with()
assert game_time_import_failure.mock_calls == [call("1", error), call("5", error)]
game_times_import_finished.assert_called_once_with()

View File

@@ -3,6 +3,8 @@ from http import HTTPStatus
import aiohttp
import pytest
from multidict import CIMultiDict, CIMultiDictProxy
from yarl import URL
from galaxy.api.errors import (
AccessDenied, AuthenticationRequired, BackendTimeout, BackendNotAvailable, BackendError, NetworkError,
@@ -10,7 +12,7 @@ from galaxy.api.errors import (
)
from galaxy.http import handle_exception
request_info = aiohttp.RequestInfo("http://o.pl", "GET", {})
request_info = aiohttp.RequestInfo(URL("http://o.pl"), "GET", CIMultiDictProxy(CIMultiDict()))
@pytest.mark.parametrize(
"aiohttp_exception,expected_exception_type",
@@ -18,15 +20,15 @@ request_info = aiohttp.RequestInfo("http://o.pl", "GET", {})
(asyncio.TimeoutError(), BackendTimeout),
(aiohttp.ServerDisconnectedError(), BackendNotAvailable),
(aiohttp.ClientConnectionError(), NetworkError),
(aiohttp.ContentTypeError(request_info, []), UnknownBackendResponse),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.UNAUTHORIZED), AuthenticationRequired),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.FORBIDDEN), AccessDenied),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.SERVICE_UNAVAILABLE), BackendNotAvailable),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.TOO_MANY_REQUESTS), TooManyRequests),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.INTERNAL_SERVER_ERROR), BackendError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.NOT_IMPLEMENTED), BackendError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.BAD_REQUEST), UnknownError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.NOT_FOUND), UnknownError),
(aiohttp.ContentTypeError(request_info, ()), UnknownBackendResponse),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.UNAUTHORIZED), AuthenticationRequired),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.FORBIDDEN), AccessDenied),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.SERVICE_UNAVAILABLE), BackendNotAvailable),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.TOO_MANY_REQUESTS), TooManyRequests),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.INTERNAL_SERVER_ERROR), BackendError),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.NOT_IMPLEMENTED), BackendError),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.BAD_REQUEST), UnknownError),
(aiohttp.ClientResponseError(request_info, (), status=HTTPStatus.NOT_FOUND), UnknownError),
(aiohttp.ClientError(), UnknownError)
]
)

View File

@@ -1,7 +1,12 @@
import asyncio
import json
import pytest
def test_success(plugin, read):
from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
@@ -10,7 +15,6 @@ def test_success(plugin, read):
}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
await plugin.run()
plugin.install_game.assert_called_with(game_id="3")

View File

@@ -1,10 +1,14 @@
import asyncio
import json
import pytest
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
from galaxy.unittest.mock import async_return_value
def test_get_capabilites(reader, writer, read, write):
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_capabilities(reader, writer, read, write):
class PluginImpl(Plugin): #pylint: disable=abstract-method
async def get_owned_games(self):
pass
@@ -16,64 +20,76 @@ def test_get_capabilites(reader, writer, read, write):
}
token = "token"
plugin = PluginImpl(Platform.Generic, "0.1", reader, writer, token)
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"platform_name": "generic",
"features": [
"ImportOwnedGames"
],
"token": token
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": {
"platform_name": "generic",
"features": [
"ImportOwnedGames"
],
"token": token
}
}
}
]
def test_shutdown(plugin, read, write):
@pytest.mark.asyncio
async def test_shutdown(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "5",
"method": "shutdown"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request))]
await plugin.run()
await plugin.wait_closed()
plugin.shutdown.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "5",
"result": None
}
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "5",
"result": None
}
]
def test_ping(plugin, read, write):
@pytest.mark.asyncio
async def test_ping(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "7",
"method": "ping"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "7",
"result": None
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "7",
"result": None
}
]
def test_tick_before_handshake(plugin, read):
read.side_effect = [b""]
asyncio.run(plugin.run())
@pytest.mark.asyncio
async def test_tick_before_handshake(plugin, read):
read.side_effect = [async_return_value(b"")]
await plugin.run()
plugin.tick.assert_not_called()
def test_tick_after_handshake(plugin, read):
@pytest.mark.asyncio
async def test_tick_after_handshake(plugin, read):
request = {
"jsonrpc": "2.0",
"id": "6",
"method": "initialize_cache",
"params": {"data": {}}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
await plugin.run()
plugin.tick.assert_called_with()

View File

@@ -1,7 +1,12 @@
import asyncio
import json
import pytest
def test_success(plugin, read):
from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "launch_game",
@@ -10,7 +15,6 @@ def test_success(plugin, read):
}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
await plugin.run()
plugin.launch_game.assert_called_with(game_id="3")

View File

@@ -0,0 +1,17 @@
import pytest
from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "launch_platform_client"
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.launch_platform_client.return_value = async_return_value(None)
await plugin.run()
plugin.launch_platform_client.assert_called_with()

View File

@@ -1,51 +1,55 @@
import asyncio
import json
import pytest
from galaxy.api.types import LocalGame
from galaxy.api.consts import LocalGameState
from galaxy.api.errors import UnknownError, FailedParsingManifest
from galaxy.unittest.mock import async_return_value, skip_loop
def test_success(plugin, read, write):
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_local_games"
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_local_games.coro.return_value = [
plugin.get_local_games.return_value = async_return_value([
LocalGame("1", LocalGameState.Running),
LocalGame("2", LocalGameState.Installed),
LocalGame("3", LocalGameState.Installed | LocalGameState.Running)
]
asyncio.run(plugin.run())
])
await plugin.run()
plugin.get_local_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"local_games" : [
{
"game_id": "1",
"local_game_state": LocalGameState.Running.value
},
{
"game_id": "2",
"local_game_state": LocalGameState.Installed.value
},
{
"game_id": "3",
"local_game_state": (LocalGameState.Installed | LocalGameState.Running).value
}
]
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": {
"local_games" : [
{
"game_id": "1",
"local_game_state": LocalGameState.Running.value
},
{
"game_id": "2",
"local_game_state": LocalGameState.Installed.value
},
{
"game_id": "3",
"local_game_state": (LocalGameState.Installed | LocalGameState.Running).value
}
]
}
}
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"error,code,message",
[
@@ -53,44 +57,43 @@ def test_success(plugin, read, write):
pytest.param(FailedParsingManifest, 200, "Failed parsing manifest", id="failed_parsing")
],
)
def test_failure(plugin, read, write, error, code, message):
async def test_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_local_games"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_local_games.coro.side_effect = error()
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_local_games.side_effect = error()
await plugin.run()
plugin.get_local_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": code,
"message": message
}
}
def test_local_game_state_update(plugin, write):
game = LocalGame("1", LocalGameState.Running)
async def couritine():
plugin.update_local_game_status(game)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "local_game_status_changed",
"params": {
"local_game": {
"game_id": "1",
"local_game_state": LocalGameState.Running.value
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": code,
"message": message
}
}
}
]
@pytest.mark.asyncio
async def test_local_game_state_update(plugin, write):
game = LocalGame("1", LocalGameState.Running)
plugin.update_local_game_status(game)
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "local_game_status_changed",
"params": {
"local_game": {
"game_id": "1",
"local_game_state": LocalGameState.Running.value
}
}
}
]

188
tests/test_local_size.py Normal file
View File

@@ -0,0 +1,188 @@
from unittest.mock import call
import pytest
from galaxy.api.errors import FailedParsingManifest
from galaxy.unittest.mock import async_return_value
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_local_size_success(plugin, read, write):
context = {'abc': 'def'}
plugin.prepare_local_size_context.return_value = async_return_value(context)
request = {
"jsonrpc": "2.0",
"id": "11",
"method": "start_local_size_import",
"params": {"game_ids": ["777", "13", "42"]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_local_size.side_effect = [
async_return_value(100000000000),
async_return_value(None),
async_return_value(3333333)
]
await plugin.run()
plugin.get_local_size.assert_has_calls([
call("777", context),
call("13", context),
call("42", context)
])
plugin.local_size_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "11",
"result": None
},
{
"jsonrpc": "2.0",
"method": "local_size_import_success",
"params": {
"game_id": "777",
"local_size": 100000000000
}
},
{
"jsonrpc": "2.0",
"method": "local_size_import_success",
"params": {
"game_id": "13",
"local_size": None
}
},
{
"jsonrpc": "2.0",
"method": "local_size_import_success",
"params": {
"game_id": "42",
"local_size": 3333333
}
},
{
"jsonrpc": "2.0",
"method": "local_size_import_finished",
"params": None
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(FailedParsingManifest, 200, "Failed parsing manifest"),
(KeyError, 0, "Unknown error")
])
async def test_get_local_size_error(exception, code, message, plugin, read, write):
game_id = "6"
request_id = "55"
plugin.prepare_local_size_context.return_value = async_return_value(None)
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_local_size_import",
"params": {"game_ids": [game_id]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_local_size.side_effect = exception
await plugin.run()
plugin.get_local_size.assert_called()
plugin.local_size_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"result": None
},
{
"jsonrpc": "2.0",
"method": "local_size_import_failure",
"params": {
"game_id": game_id,
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "local_size_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_local_size_context_error(plugin, read, write):
request_id = "31415"
error_details = "Unexpected syntax"
error_message, error_code = FailedParsingManifest().message, FailedParsingManifest().code
plugin.prepare_local_size_context.side_effect = FailedParsingManifest(error_details)
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_local_size_import",
"params": {"game_ids": ["6"]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": error_code,
"message": error_message,
"data": error_details
}
}
]
@pytest.mark.asyncio
async def test_import_already_in_progress_error(plugin, read, write):
plugin.prepare_local_size_context.return_value = async_return_value(None)
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_local_size_import",
"params": {
"game_ids": ["42"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_local_size_import",
"params": {
"game_ids": ["13"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"", 10)
]
await plugin.run()
responses = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in responses
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
} in responses

View File

@@ -0,0 +1,187 @@
from unittest.mock import call
import pytest
from galaxy.api.consts import OSCompatibility
from galaxy.api.errors import BackendError
from galaxy.unittest.mock import async_return_value
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_os_compatibility_success(plugin, read, write):
context = "abc"
plugin.prepare_os_compatibility_context.return_value = async_return_value(context)
request = {
"jsonrpc": "2.0",
"id": "11",
"method": "start_os_compatibility_import",
"params": {"game_ids": ["666", "13", "42"]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_os_compatibility.side_effect = [
async_return_value(OSCompatibility.Linux),
async_return_value(None),
async_return_value(OSCompatibility.Windows | OSCompatibility.MacOS),
]
await plugin.run()
plugin.get_os_compatibility.assert_has_calls([
call("666", context),
call("13", context),
call("42", context),
])
plugin.os_compatibility_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "11",
"result": None
},
{
"jsonrpc": "2.0",
"method": "os_compatibility_import_success",
"params": {
"game_id": "666",
"os_compatibility": OSCompatibility.Linux.value
}
},
{
"jsonrpc": "2.0",
"method": "os_compatibility_import_success",
"params": {
"game_id": "13",
"os_compatibility": None
}
},
{
"jsonrpc": "2.0",
"method": "os_compatibility_import_success",
"params": {
"game_id": "42",
"os_compatibility": (OSCompatibility.Windows | OSCompatibility.MacOS).value
}
},
{
"jsonrpc": "2.0",
"method": "os_compatibility_import_finished",
"params": None
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_os_compatibility_error(exception, code, message, plugin, read, write):
game_id = "6"
request_id = "55"
plugin.prepare_os_compatibility_context.return_value = async_return_value(None)
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_os_compatibility_import",
"params": {"game_ids": [game_id]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_os_compatibility.side_effect = exception
await plugin.run()
plugin.get_os_compatibility.assert_called()
plugin.os_compatibility_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"result": None
},
{
"jsonrpc": "2.0",
"method": "os_compatibility_import_failure",
"params": {
"game_id": game_id,
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "os_compatibility_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_os_compatibility_context_error(plugin, read, write):
request_id = "31415"
plugin.prepare_os_compatibility_context.side_effect = BackendError()
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_os_compatibility_import",
"params": {"game_ids": ["6"]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": 4,
"message": "Backend error"
}
}
]
@pytest.mark.asyncio
async def test_import_already_in_progress_error(plugin, read, write):
plugin.prepare_os_compatibility_context.return_value = async_return_value(None)
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_os_compatibility_import",
"params": {
"game_ids": ["42"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_os_compatibility_import",
"params": {
"game_ids": ["666"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"", 10)
]
await plugin.run()
responses = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in responses
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
} in responses

View File

@@ -1,19 +1,23 @@
import asyncio
import json
import pytest
from galaxy.api.types import Game, Dlc, LicenseInfo
from galaxy.api.consts import LicenseType
from galaxy.api.errors import UnknownError
from galaxy.unittest.mock import async_return_value, skip_loop
def test_success(plugin, read, write):
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_owned_games"
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.coro.return_value = [
plugin.get_owned_games.return_value = async_return_value([
Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None)),
Game(
"5",
@@ -23,129 +27,129 @@ def test_success(plugin, read, write):
Dlc("8", "Temerian Armor Set", LicenseInfo(LicenseType.FreeToPlay, None)),
],
LicenseInfo(LicenseType.SinglePurchase, None))
]
asyncio.run(plugin.run())
])
await plugin.run()
plugin.get_owned_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"owned_games": [
{
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
}
},
{
"game_id": "5",
"game_title": "Witcher 3",
"dlcs": [
{
"dlc_id": "7",
"dlc_title": "Hearts of Stone",
"license_info": {
"license_type": "SinglePurchase"
}
},
{
"dlc_id": "8",
"dlc_title": "Temerian Armor Set",
"license_info": {
"license_type": "FreeToPlay"
}
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": {
"owned_games": [
{
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
}
},
{
"game_id": "5",
"game_title": "Witcher 3",
"dlcs": [
{
"dlc_id": "7",
"dlc_title": "Hearts of Stone",
"license_info": {
"license_type": "SinglePurchase"
}
},
{
"dlc_id": "8",
"dlc_title": "Temerian Armor Set",
"license_info": {
"license_type": "FreeToPlay"
}
}
],
"license_info": {
"license_type": "SinglePurchase"
}
],
"license_info": {
"license_type": "SinglePurchase"
}
}
]
]
}
}
}
]
def test_failure(plugin, read, write):
@pytest.mark.asyncio
async def test_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_owned_games"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_owned_games.side_effect = UnknownError()
await plugin.run()
plugin.get_owned_games.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error"
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error"
}
}
}
]
def test_add_game(plugin, write):
@pytest.mark.asyncio
async def test_add_game(plugin, write):
game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None))
async def couritine():
plugin.add_game(game)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "owned_game_added",
"params": {
"owned_game": {
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
plugin.add_game(game)
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "owned_game_added",
"params": {
"owned_game": {
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
}
}
}
}
}
]
def test_remove_game(plugin, write):
async def couritine():
plugin.remove_game("5")
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "owned_game_removed",
"params": {
"game_id": "5"
@pytest.mark.asyncio
async def test_remove_game(plugin, write):
plugin.remove_game("5")
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "owned_game_removed",
"params": {
"game_id": "5"
}
}
}
]
def test_update_game(plugin, write):
@pytest.mark.asyncio
async def test_update_game(plugin, write):
game = Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None))
async def couritine():
plugin.update_game(game)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "owned_game_updated",
"params": {
"owned_game": {
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
plugin.update_game(game)
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "owned_game_updated",
"params": {
"owned_game": {
"game_id": "3",
"game_title": "Doom",
"license_info": {
"license_type": "SinglePurchase"
}
}
}
}
}
]

View File

@@ -1,23 +1,28 @@
import asyncio
import json
import pytest
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
def assert_rpc_response(write, response_id, result=None):
assert json.loads(write.call_args[0][0]) == {
"jsonrpc": "2.0",
"id": str(response_id),
"result": result
}
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": str(response_id),
"result": result
}
]
def assert_rpc_request(write, method, params=None):
assert json.loads(write.call_args[0][0]) == {
"jsonrpc": "2.0",
"method": method,
"params": {"data": params}
}
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": method,
"params": {"data": params}
}
]
@pytest.fixture
@@ -28,7 +33,8 @@ def cache_data():
}
def test_initialize_cache(plugin, read, write, cache_data):
@pytest.mark.asyncio
async def test_initialize_cache(plugin, read, write, cache_data):
request_id = 3
request = {
"jsonrpc": "2.0",
@@ -36,36 +42,34 @@ def test_initialize_cache(plugin, read, write, cache_data):
"method": "initialize_cache",
"params": {"data": cache_data}
}
read.side_effect = [json.dumps(request).encode() + b"\n"]
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
assert {} == plugin.persistent_cache
asyncio.run(plugin.run())
await plugin.run()
plugin.handshake_complete.assert_called_once_with()
assert cache_data == plugin.persistent_cache
assert_rpc_response(write, response_id=request_id)
def test_set_cache(plugin, write, cache_data):
async def runner():
assert {} == plugin.persistent_cache
@pytest.mark.asyncio
async def test_set_cache(plugin, write, cache_data):
assert {} == plugin.persistent_cache
plugin.persistent_cache.update(cache_data)
plugin.push_cache()
plugin.persistent_cache.update(cache_data)
plugin.push_cache()
await skip_loop()
assert_rpc_request(write, "push_cache", cache_data)
assert cache_data == plugin.persistent_cache
asyncio.run(runner())
assert_rpc_request(write, "push_cache", cache_data)
assert cache_data == plugin.persistent_cache
def test_clear_cache(plugin, write, cache_data):
async def runner():
plugin._persistent_cache = cache_data
@pytest.mark.asyncio
async def test_clear_cache(plugin, write, cache_data):
plugin._persistent_cache = cache_data
plugin.persistent_cache.clear()
plugin.push_cache()
plugin.persistent_cache.clear()
plugin.push_cache()
await skip_loop()
assert_rpc_request(write, "push_cache", {})
assert {} == plugin.persistent_cache
asyncio.run(runner())
assert_rpc_request(write, "push_cache", {})
assert {} == plugin.persistent_cache

View File

@@ -0,0 +1,72 @@
import pytest
import asyncio
from galaxy.unittest.mock import async_return_value
from tests import create_message, get_messages
from galaxy.api.errors import (
BackendNotAvailable, BackendTimeout, BackendError, InvalidCredentials, NetworkError, AccessDenied, UnknownError
)
from galaxy.api.jsonrpc import JsonRpcError
@pytest.mark.asyncio
async def test_refresh_credentials_success(plugin, read, write):
run_task = asyncio.create_task(plugin.run())
refreshed_credentials = {
"access_token": "new_access_token"
}
response = {
"jsonrpc": "2.0",
"id": "1",
"result": refreshed_credentials
}
# 2 loop iterations delay is to force sending response after request has been sent
read.side_effect = [async_return_value(create_message(response), loop_iterations_delay=2)]
result = await plugin.refresh_credentials({}, False)
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "refresh_credentials",
"params": {
},
"id": "1"
}
]
assert result == refreshed_credentials
await run_task
@pytest.mark.asyncio
@pytest.mark.parametrize("exception", [
BackendNotAvailable, BackendTimeout, BackendError, InvalidCredentials, NetworkError, AccessDenied, UnknownError
])
async def test_refresh_credentials_failure(exception, plugin, read, write):
run_task = asyncio.create_task(plugin.run())
error = exception()
response = {
"jsonrpc": "2.0",
"id": "1",
"error": error.json()
}
# 2 loop iterations delay is to force sending response after request has been sent
read.side_effect = [async_return_value(create_message(response), loop_iterations_delay=2)]
with pytest.raises(JsonRpcError) as e:
await plugin.refresh_credentials({}, False)
assert error == e.value
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "refresh_credentials",
"params": {
},
"id": "1"
}
]
await run_task

View File

@@ -1,7 +1,9 @@
import json
import pytest
from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = {
@@ -9,7 +11,7 @@ async def test_success(plugin, read):
"method": "shutdown_platform_client"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.shutdown_platform_client.return_value = None
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.shutdown_platform_client.return_value = async_return_value(None)
await plugin.run()
plugin.shutdown_platform_client.assert_called_with()

View File

@@ -1,52 +1,46 @@
from unittest.mock import MagicMock
import pytest
from galaxy.reader import StreamLineReader
from galaxy.unittest.mock import AsyncMock
from galaxy.unittest.mock import async_return_value
@pytest.fixture()
def stream_reader():
reader = MagicMock()
reader.read = AsyncMock()
return reader
def stream_line_reader(reader):
return StreamLineReader(reader)
@pytest.fixture()
def read(stream_reader):
return stream_reader.read
@pytest.fixture()
def reader(stream_reader):
return StreamLineReader(stream_reader)
@pytest.mark.asyncio
async def test_message(reader, read):
read.return_value = b"a\n"
assert await reader.readline() == b"a"
async def test_message(stream_line_reader, read):
read.return_value = async_return_value(b"a\n")
assert await stream_line_reader.readline() == b"a"
read.assert_called_once()
@pytest.mark.asyncio
async def test_separate_messages(reader, read):
read.side_effect = [b"a\n", b"b\n"]
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_connected_messages(reader, read):
read.return_value = b"a\nb\n"
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
async def test_separate_messages(stream_line_reader, read):
read.side_effect = [async_return_value(b"a\n"), async_return_value(b"b\n")]
assert await stream_line_reader.readline() == b"a"
assert await stream_line_reader.readline() == b"b"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_connected_messages(stream_line_reader, read):
read.return_value = async_return_value(b"a\nb\n")
assert await stream_line_reader.readline() == b"a"
assert await stream_line_reader.readline() == b"b"
read.assert_called_once()
@pytest.mark.asyncio
async def test_cut_message(reader, read):
read.side_effect = [b"a", b"b\n"]
assert await reader.readline() == b"ab"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_half_message(reader, read):
read.side_effect = [b"a", b""]
assert await reader.readline() == b""
async def test_cut_message(stream_line_reader, read):
read.side_effect = [async_return_value(b"a"), async_return_value(b"b\n")]
assert await stream_line_reader.readline() == b"ab"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_half_message(stream_line_reader, read):
read.side_effect = [async_return_value(b"a"), async_return_value(b"")]
assert await stream_line_reader.readline() == b""
assert read.call_count == 2

340
tests/test_subscriptions.py Normal file
View File

@@ -0,0 +1,340 @@
import pytest
from galaxy.api.types import Subscription, SubscriptionGame
from galaxy.api.consts import SubscriptionDiscovery
from galaxy.api.errors import FailedParsingManifest, BackendError, UnknownError
from galaxy.unittest.mock import async_return_value
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_subscriptions_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_subscriptions"
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_subscriptions.return_value = async_return_value([
Subscription("1"),
Subscription("2", False, subscription_discovery=SubscriptionDiscovery.AUTOMATIC),
Subscription("3", True, 1580899100, SubscriptionDiscovery.USER_ENABLED)
])
await plugin.run()
plugin.get_subscriptions.assert_called_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": {
"subscriptions": [
{
"subscription_name": "1",
'subscription_discovery': 3
},
{
"subscription_name": "2",
"owned": False,
'subscription_discovery': 1
},
{
"subscription_name": "3",
"owned": True,
"end_time": 1580899100,
'subscription_discovery': 2
}
]
}
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize(
"error,code,message",
[
pytest.param(UnknownError, 0, "Unknown error", id="unknown_error"),
pytest.param(FailedParsingManifest, 200, "Failed parsing manifest", id="failed_parsing")
],
)
async def test_get_subscriptions_failure_generic(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_subscriptions"
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_subscriptions.side_effect = error()
await plugin.run()
plugin.get_subscriptions.assert_called_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": code,
"message": message
}
}
]
@pytest.mark.asyncio
async def test_get_subscription_games_success(plugin, read, write):
plugin.prepare_subscription_games_context.return_value = async_return_value(5)
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "start_subscription_games_import",
"params": {
"subscription_names": ["sub_a"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
async def sub_games():
games = [
SubscriptionGame(game_title="game A", game_id="game_A"),
SubscriptionGame(game_title="game B", game_id="game_B", start_time=1548495632),
SubscriptionGame(game_title="game C", game_id="game_C", end_time=1548495633),
SubscriptionGame(game_title="game D", game_id="game_D", start_time=1548495632, end_time=1548495633),
]
yield [game for game in games]
plugin.get_subscription_games.return_value = sub_games()
await plugin.run()
plugin.prepare_subscription_games_context.assert_called_with(["sub_a"])
plugin.get_subscription_games.assert_called_with("sub_a", 5)
plugin.subscription_games_import_complete.asert_called_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_success",
"params": {
"subscription_name": "sub_a",
"subscription_games": [
{
"game_title": "game A",
"game_id": "game_A"
},
{
"game_title": "game B",
"game_id": "game_B",
"start_time": 1548495632
},
{
"game_title": "game C",
"game_id": "game_C",
"end_time": 1548495633
},
{
"game_title": "game D",
"game_id": "game_D",
"start_time": 1548495632,
"end_time": 1548495633
}
]
}
},
{
'jsonrpc': '2.0',
'method':
'subscription_games_partial_import_finished',
'params': {
"subscription_name": "sub_a"
}
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_get_subscription_games_success_empty(plugin, read, write):
plugin.prepare_subscription_games_context.return_value = async_return_value(5)
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "start_subscription_games_import",
"params": {
"subscription_names": ["sub_a"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
async def sub_games():
yield None
plugin.get_subscription_games.return_value = sub_games()
await plugin.run()
plugin.prepare_subscription_games_context.assert_called_with(["sub_a"])
plugin.get_subscription_games.assert_called_with("sub_a", 5)
plugin.subscription_games_import_complete.asert_called_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_success",
"params": {
"subscription_name": "sub_a",
"subscription_games": None
}
},
{
'jsonrpc': '2.0',
'method':
'subscription_games_partial_import_finished',
'params': {
"subscription_name": "sub_a"
}
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_finished",
"params": None
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_subscription_games_error(exception, code, message, plugin, read, write):
plugin.prepare_subscription_games_context.return_value = async_return_value(None)
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "start_subscription_games_import",
"params": {
"subscription_names": ["sub_a"]
}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_subscription_games.side_effect = exception
await plugin.run()
plugin.get_subscription_games.assert_called()
plugin.subscription_games_import_complete.asert_called_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "3",
"result": None
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_failure",
"params": {
"subscription_name": "sub_a",
"error": {
"code": code,
"message": message
}
}
},
{
'jsonrpc': '2.0',
'method':
'subscription_games_partial_import_finished',
'params': {
"subscription_name": "sub_a"
}
},
{
"jsonrpc": "2.0",
"method": "subscription_games_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_subscription_games_context_error(plugin, read, write):
request_id = "31415"
error_details = "Unexpected backend error"
error_message, error_code = BackendError().message, BackendError().code
plugin.prepare_subscription_games_context.side_effect = BackendError(error_details)
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_subscription_games_import",
"params": {"subscription_names": ["sub_a", "sub_b"]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": error_code,
"message": error_message,
"data": error_details
}
}
]
@pytest.mark.asyncio
async def test_import_already_in_progress_error(plugin, read, write):
plugin.prepare_subscription_games_context.return_value = async_return_value(None)
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_subscription_games_import",
"params": {
"subscription_names": ["sub_a"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_subscription_games_import",
"params": {
"subscription_names": ["sub_a","sub_b"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"", 10)
]
await plugin.run()
responses = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in responses
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
} in responses

View File

@@ -1,7 +1,11 @@
import asyncio
import json
import pytest
def test_success(plugin, read):
from galaxy.unittest.mock import async_return_value
from tests import create_message
@pytest.mark.asyncio
async def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "uninstall_game",
@@ -9,8 +13,7 @@ def test_success(plugin, read):
"game_id": "3"
}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"")]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
await plugin.run()
plugin.uninstall_game.assert_called_with(game_id="3")

276
tests/test_user_presence.py Normal file
View File

@@ -0,0 +1,276 @@
from unittest.mock import call
import pytest
from galaxy.api.consts import PresenceState
from galaxy.api.errors import BackendError
from galaxy.api.types import UserPresence
from galaxy.unittest.mock import async_return_value, skip_loop
from tests import create_message, get_messages
@pytest.mark.asyncio
async def test_get_user_presence_success(plugin, read, write):
context = "abc"
user_id_list = ["666", "13", "42", "69", "22"]
plugin.prepare_user_presence_context.return_value = async_return_value(context)
request = {
"jsonrpc": "2.0",
"id": "11",
"method": "start_user_presence_import",
"params": {"user_id_list": user_id_list}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_user_presence.side_effect = [
async_return_value(UserPresence(
PresenceState.Unknown,
"game-id1",
None,
"unknown state",
None
)),
async_return_value(UserPresence(
PresenceState.Offline,
None,
None,
"Going to grandma's house",
None
)),
async_return_value(UserPresence(
PresenceState.Online,
"game-id3",
"game-title3",
"Pew pew",
None
)),
async_return_value(UserPresence(
PresenceState.Away,
None,
"game-title4",
"AFKKTHXBY",
None
)),
async_return_value(UserPresence(
PresenceState.Away,
None,
"game-title5",
None,
"Playing game-title5: In Menu"
)),
]
await plugin.run()
plugin.get_user_presence.assert_has_calls([
call(user_id, context) for user_id in user_id_list
])
plugin.user_presence_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": "11",
"result": None
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "666",
"presence": {
"presence_state": PresenceState.Unknown.value,
"game_id": "game-id1",
"in_game_status": "unknown state"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "13",
"presence": {
"presence_state": PresenceState.Offline.value,
"in_game_status": "Going to grandma's house"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "42",
"presence": {
"presence_state": PresenceState.Online.value,
"game_id": "game-id3",
"game_title": "game-title3",
"in_game_status": "Pew pew"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "69",
"presence": {
"presence_state": PresenceState.Away.value,
"game_title": "game-title4",
"in_game_status": "AFKKTHXBY"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_success",
"params": {
"user_id": "22",
"presence": {
"presence_state": PresenceState.Away.value,
"game_title": "game-title5",
"full_status": "Playing game-title5: In Menu"
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_finished",
"params": None
}
]
@pytest.mark.asyncio
@pytest.mark.parametrize("exception,code,message", [
(BackendError, 4, "Backend error"),
(KeyError, 0, "Unknown error")
])
async def test_get_user_presence_error(exception, code, message, plugin, read, write):
user_id = "69"
request_id = "55"
plugin.prepare_user_presence_context.return_value = async_return_value(None)
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_user_presence_import",
"params": {"user_id_list": [user_id]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
plugin.get_user_presence.side_effect = exception
await plugin.run()
plugin.get_user_presence.assert_called()
plugin.user_presence_import_complete.assert_called_once_with()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"result": None
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_failure",
"params": {
"user_id": user_id,
"error": {
"code": code,
"message": message
}
}
},
{
"jsonrpc": "2.0",
"method": "user_presence_import_finished",
"params": None
}
]
@pytest.mark.asyncio
async def test_prepare_get_user_presence_context_error(plugin, read, write):
request_id = "31415"
plugin.prepare_user_presence_context.side_effect = BackendError()
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": "start_user_presence_import",
"params": {"user_id_list": ["6"]}
}
read.side_effect = [async_return_value(create_message(request)), async_return_value(b"", 10)]
await plugin.run()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": 4,
"message": "Backend error"
}
}
]
@pytest.mark.asyncio
async def test_import_already_in_progress_error(plugin, read, write):
plugin.prepare_user_presence_context.return_value = async_return_value(None)
requests = [
{
"jsonrpc": "2.0",
"id": "3",
"method": "start_user_presence_import",
"params": {
"user_id_list": ["42"]
}
},
{
"jsonrpc": "2.0",
"id": "4",
"method": "start_user_presence_import",
"params": {
"user_id_list": ["666"]
}
}
]
read.side_effect = [
async_return_value(create_message(requests[0])),
async_return_value(create_message(requests[1])),
async_return_value(b"", 10)
]
await plugin.run()
responses = get_messages(write)
assert {
"jsonrpc": "2.0",
"id": "3",
"result": None
} in responses
assert {
"jsonrpc": "2.0",
"id": "4",
"error": {
"code": 600,
"message": "Import already in progress"
}
} in responses
@pytest.mark.asyncio
async def test_update_user_presence(plugin, write):
plugin.update_user_presence("42", UserPresence(PresenceState.Online, "game-id", "game-title", "Pew pew"))
await skip_loop()
assert get_messages(write) == [
{
"jsonrpc": "2.0",
"method": "user_presence_updated",
"params": {
"user_id": "42",
"presence": {
"presence_state": PresenceState.Online.value,
"game_id": "game-id",
"game_title": "game-title",
"in_game_status": "Pew pew"
}
}
}
]