Compare commits

..

106 Commits
0.22 ... 0.41

Author SHA1 Message Date
Romuald Juchnowicz-Bierbasz
630d878a3c Use constants 2019-07-12 11:46:48 +02:00
Romuald Juchnowicz-Bierbasz
cc63c24bde Increment version 2019-07-12 11:46:48 +02:00
Romuald Juchnowicz-Bierbasz
0d0f657240 SDK-2930: Refactor http module 2019-07-12 11:46:48 +02:00
Steven M. Vascellaro
33c630225d README.md cleanup (#17) 2019-07-12 10:39:37 +02:00
apaulouski
f4bd18a8ab Merge pull request #2 from rogersachan/rogersachan-patch-1
Fix spelling errors in the README
2019-07-10 12:12:07 +02:00
Roger
fa4541434f Merge branch 'master' into rogersachan-patch-1 2019-07-09 13:30:35 -04:00
rbierbasz-gog
c083a3089a Create .travis.yml 2019-07-08 15:43:12 +02:00
Romuald Juchnowicz-Bierbasz
f6b5a12b24 SDK-2932: Remove github deployment (use mirroring) 2019-07-03 13:42:06 +02:00
Romuald Juchnowicz-Bierbasz
8a67747df5 Merge remote-tracking branch 'github/master'
* github/master:
  version 0.38
  version 0.35.2
  version 0.35.1
  version 0.34
  version 0.33.1
  version 0.33
  version 0.32.1
  version 0.32.0
  version 0.31.3
  version 0.31.2
  version 0.31.1
  Initial commit
2019-07-03 13:35:34 +02:00
Romuald Juchnowicz-Bierbasz
2db9d0f383 Increment version 2019-07-01 14:35:05 +02:00
Mieszko Banczerowski
9d93762867 Workaround for removing creds on push_cache 2019-07-01 14:32:23 +02:00
Romuald Juchnowicz-Bierbasz
c364b716f4 Increment version 2019-07-01 13:16:08 +02:00
Romuald Juchnowicz-Bierbasz
48e1782484 SDK-2893: Optional game time and last played 2019-07-01 13:14:07 +02:00
Romuald Juchnowicz-Bierbasz
ff30675a25 Do not invoke tick before handshake 2019-07-01 12:26:05 +02:00
Aliaksei Paulouski
7b3965ff4b Add poe platform 2019-06-28 15:09:46 +02:00
Piotr Marzec
2ebdfabd9b Path of Exile added 2019-06-28 14:49:56 +02:00
Romuald Juchnowicz-Bierbasz
4e1ea8056d Add StreamLineReader with unit tests 2019-06-28 14:00:44 +02:00
Romuald Juchnowicz-Bierbasz
67e8681de6 Increment version 2019-06-28 11:59:01 +02:00
Romuald Juchnowicz-Bierbasz
77d742ce18 SDK-2910: Fix readline 2019-06-28 11:58:32 +02:00
GOG Galaxy SDK Team
f1fd00fcd3 version 0.38 2019-06-26 12:46:23 +02:00
Romuald Juchnowicz-Bierbasz
692bdbf370 Increment version 2019-06-26 12:07:28 +02:00
Romuald Juchnowicz-Bierbasz
207b1e1313 SDK-2880: Fix readline 2019-06-26 12:02:25 +02:00
Romuald Juchnowicz-Bierbasz
05042fe430 Increment version 2019-06-25 17:53:35 +02:00
Romuald Juchnowicz-Bierbasz
58b17d94fa SDK-2880: Read in chunks 2019-06-25 17:53:16 +02:00
Romuald Juchnowicz-Bierbasz
be03c83d45 SDK-2873: Add typing to API 2019-06-24 10:20:04 +02:00
Roger
1edf4ff5ba Fix spelling errors 2019-06-19 15:39:22 -04:00
Rafal Makagon
c07c7a2c2a Platform list 2019-06-18 15:56:45 +02:00
GOG Galaxy SDK Team
9d5d48032e version 0.35.2 2019-06-17 18:11:24 +02:00
Pawel Czoppa
cb1a5fa5e4 fix for merge request 2019-06-17 18:01:09 +02:00
GOG Galaxy SDK Team
179fd147c1 version 0.35.1 2019-06-17 17:41:44 +02:00
Pawel Czoppa
4790238638 Update setup.py 2019-06-17 17:38:43 +02:00
Aliaksei Paulouski
5d90ba0c09 Increment version 2019-06-17 10:39:36 +02:00
Aliaksei Paulouski
d74ed3a4b5 Hide persistent cache data 2019-06-17 10:39:30 +02:00
Pawel Czoppa
d6f2d00fb9 typo 2019-06-15 15:11:05 +02:00
Piotr Marzec
ce9f33f5d0 platform names clieanup 2019-06-15 15:05:12 +02:00
Pawel Czoppa
b28fc60088 include Platform id's in sphinx generated documentation + some fixed 2019-06-15 14:57:20 +02:00
Piotr Marzec
be3d3bb7e5 text cleanup 2019-06-15 10:16:04 +02:00
Piotr Marzec
6dec4a99d3 add platforms list 2019-06-14 19:30:40 +02:00
Piotr Marzec
69ffef2fde create platform id list 2019-06-14 19:25:08 +02:00
GOG Galaxy SDK Team
7789927ed9 version 0.34 2019-06-14 16:54:11 +02:00
Rafal Makagon
da59670d8e Increment version 2019-06-14 16:35:20 +02:00
Rafal Makagon
ed1049b543 Write down unexpected responses from http client 2019-06-14 16:32:07 +02:00
GOG Galaxy SDK Team
e2f26271cb version 0.33.1 2019-06-14 14:49:25 +02:00
Rafal Makagon
9e8748b032 Increment version 2019-06-14 14:41:34 +02:00
Mieszko Banczerowski
bb482d4ed6 SDK-2872 update documentation about plugin deployment 2019-06-14 11:51:30 +02:00
GOG Galaxy SDK Team
3bd0b71ab3 version 0.33 2019-06-13 12:30:12 +02:00
Aliaksei Paulouski
909cc10762 Increment version 2019-06-13 12:26:32 +02:00
Rafal Makagon
9b4537c54f Fix anonimize params method 2019-06-13 12:25:57 +02:00
Rafal Makagon
2e90c66390 Remove not used errors 2019-06-13 12:21:34 +02:00
Aliaksei Paulouski
8647b06ca2 Add TooManyRequests error 2019-06-13 08:52:40 +02:00
Aliaksei Paulouski
f6522be74d SDK-2874: Persistent cache 2019-06-12 17:13:41 +02:00
GOG Galaxy SDK Team
192d655d51 version 0.32.1 2019-06-10 19:04:08 +02:00
Mieszko Banczerowski
5ca9254d2a GPI-438: Add link to docs in readme.md; fix make.py 2019-06-10 18:53:02 +02:00
GOG Galaxy SDK Team
6c6dc42cd6 version 0.32.0 2019-06-07 15:08:47 +02:00
Mieszko Banczerowski
79808e49f7 Bump version after adding documentation 2019-06-07 14:02:23 +02:00
Mateusz Silaczewski
7099cf3195 Merge branch 'documentation' of https://gitlab.gog.com/galaxy-client/galaxy-plugin-api into documentation 2019-06-06 15:56:01 +02:00
GOG Galaxy SDK Team
f97b6c8971 version 0.31.3 2019-05-31 12:09:18 +02:00
Rafal Makagon
ed91fd582c Deploy setup py 2019-05-31 12:08:49 +02:00
GOG Galaxy SDK Team
0af7387342 version 0.31.2 2019-05-31 11:53:15 +02:00
Rafal Makagon
bd393a96f0 Increment version 2019-05-31 11:50:53 +02:00
Piotr Marzec
c53aab1abb Legal Notice added 2019-05-31 11:29:05 +02:00
GOG Galaxy SDK Team
60fab25a55 version 0.31.1 2019-05-29 13:09:13 +02:00
GOG Galaxy SDK Team
6f717a1e31 Initial commit 2019-05-29 13:08:20 +02:00
Romuald Juchnowicz-Bierbasz
80f40b1971 Increment version 2019-05-29 13:04:35 +02:00
Romuald Juchnowicz-Bierbasz
0da0296154 Release on gogcom 2019-05-28 17:59:11 +02:00
Romuald Juchnowicz-Bierbasz
9a115557b3 Fix username 2019-05-28 12:03:56 +02:00
Romuald Juchnowicz-Bierbasz
14c2d7d9e8 Update README 2019-05-28 11:28:32 +02:00
Romuald Juchnowicz-Bierbasz
4a7a759cea Update release scripts 2019-05-28 11:22:38 +02:00
Romuald Juchnowicz-Bierbasz
da8da24b01 Remove changelog 2019-05-28 11:19:48 +02:00
Rafal Makagon
ccbb13e685 Update README.md 2019-05-27 17:48:42 +02:00
Piotr Marzec
a3ca815975 cosmetic changes to readme.md 2019-05-27 17:41:28 +02:00
Romuald Juchnowicz-Bierbasz
f2d4127a31 Increment version 2019-05-24 14:04:17 +02:00
Romuald Juchnowicz-Bierbasz
07b6edce12 SDK-2840: Refactor import methods 2019-05-24 14:04:05 +02:00
Romuald Bierbasz
ef7f9ccca1 Fix pipeline 2019-05-21 14:27:02 +02:00
Romuald Juchnowicz-Bierbasz
3b296cbcc9 Fix pipeline 2019-05-21 14:18:45 +02:00
Romuald Juchnowicz-Bierbasz
f5361cd5ab Fix pipeline 2019-05-21 14:09:57 +02:00
Romuald Juchnowicz-Bierbasz
758909efba Fix release.py 2019-05-21 11:20:38 +02:00
Romuald Bierbasz
0bc8000f14 Fix release pipeline 2019-05-21 11:10:52 +02:00
Romuald Juchnowicz-Bierbasz
e62e7e0e6e Add pipeline for releasing to github 2019-05-20 15:52:35 +02:00
Romuald Juchnowicz-Bierbasz
be6c0eb03e Increment version 2019-05-20 15:38:24 +02:00
Aliaksei Paulouski
0ee56193de Fix aiohttp exception hierarchy 2019-05-20 11:20:27 +02:00
Romuald Juchnowicz-Bierbasz
6bc91a12fa Register new requests 2019-05-17 16:42:13 +02:00
Romuald Juchnowicz-Bierbasz
6d513d86bf Increment version 2019-05-17 16:42:13 +02:00
Romuald Juchnowicz-Bierbasz
bdd2225262 Add new interface methods for game time and achievements 2019-05-17 16:42:12 +02:00
Rafal Makagon
68fdc4d188 Printing own port 2019-05-17 15:32:24 +02:00
Romuald Juchnowicz-Bierbasz
f283c10a95 Anonymise params in pass_login_credentials 2019-05-15 19:49:50 +02:00
Romuald Juchnowicz-Bierbasz
453734cefe Increment version 2019-05-10 17:21:31 +02:00
Romuald Juchnowicz-Bierbasz
85f1d83c28 Fix parameters 2019-05-10 17:21:05 +02:00
Romuald Juchnowicz-Bierbasz
701d3cf522 Increment version 2019-05-10 17:06:01 +02:00
Romuald Juchnowicz-Bierbasz
c8083b9006 Add cookie_jar param to HttpClient 2019-05-10 17:05:40 +02:00
Romuald Juchnowicz-Bierbasz
0608ade6d3 Fix name 2019-05-10 14:14:33 +02:00
Romuald Juchnowicz-Bierbasz
c349a3df8e Add timeout 2019-05-10 13:56:53 +02:00
Romuald Juchnowicz-Bierbasz
1fd959a665 Handle ServerDisconnectedError 2019-05-10 13:50:46 +02:00
Romuald Juchnowicz-Bierbasz
234a21d085 Add HttpClient 2019-05-10 13:36:32 +02:00
Romuald Juchnowicz-Bierbasz
90835ece58 Change project layout 2019-05-10 13:16:28 +02:00
Aliaksei Paulouski
9e1c8cfddd Add JS to NextStep params 2019-05-03 15:56:40 +02:00
Aliaksei Paulouski
f7f170b9ca Increment version 2019-05-03 15:56:40 +02:00
Romuald Juchnowicz-Bierbasz
8ad5ed76b7 Increment version 2019-04-30 16:59:13 +02:00
Romuald Juchnowicz-Bierbasz
7727098c6f SDK-2762: Fix error handling 2019-04-30 16:58:54 +02:00
Romuald Juchnowicz-Bierbasz
e53dc8f2c6 Merge branch 'master' into parameter-checking
* master:
  Add friends features
2019-04-30 14:50:23 +02:00
Romuald Juchnowicz-Bierbasz
527fd034bf Increment version 2019-04-29 15:45:18 +02:00
Romuald Juchnowicz-Bierbasz
6e251c6eb9 SDK-2762: Bind method params before calling 2019-04-29 15:45:03 +02:00
Romuald Juchnowicz-Bierbasz
dc9fc2cc5d SDK-2762: Standarize parameter binding 2019-04-29 14:51:12 +02:00
Aliaksei Paulouski
1fb79eb21a Add friends features 2019-04-26 11:08:49 +02:00
Romuald Juchnowicz-Bierbasz
7b9bcf86a1 Increment version 2019-04-16 14:53:57 +02:00
Romuald Juchnowicz-Bierbasz
30b3533e1d Old style namespace package 2019-04-16 14:53:28 +02:00
47 changed files with 2393 additions and 800 deletions

7
.gitignore vendored
View File

@@ -1,2 +1,9 @@
# pytest
__pycache__/
.vscode/
.venv/
src/galaxy.plugin.api.egg-info/
docs/build/
Pipfile
.idea
docs/source/_build

8
.travis.yml Normal file
View File

@@ -0,0 +1,8 @@
dist: xenial # required for Python >= 3.7
language: python
python:
- "3.7"
install:
- pip install -r requirements.txt
script:
- pytest

82
PLATFORM_IDs.md Normal file
View File

@@ -0,0 +1,82 @@
### PLATFORM ID LIST
Platform ID list for GOG Galaxy 2.0 Integrations
| ID | Name |
| --- | --- |
| steam | Steam |
| psn | PlayStation Network |
| xboxone | Xbox Live |
| generic | Manually added games |
| origin | Origin |
| uplay | Uplay |
| battlenet | Battle.net |
| epic | Epic Games Store |
| bethesda | Bethesda.net |
| paradox | Paradox Plaza |
| humble | Humble Bundle |
| kartridge | Kartridge |
| itch | Itch.io |
| nswitch | Nintendo Switch |
| nwiiu | Nintendo Wii U |
| nwii | Nintendo Wii |
| ncube | Nintendo Game Cube |
| riot | Riot |
| wargaming | Wargaming |
| ngameboy | Nintendo Game Boy |
| atari | Atari |
| amiga | Amiga |
| snes | SNES |
| beamdog | Beamdog |
| d2d | Direct2Drive |
| discord | Discord |
| dotemu | DotEmu |
| gamehouse | GameHouse |
| gmg | Green Man Gaming |
| weplay | WePlay |
| zx | Zx Spectrum PC |
| vision | ColecoVision |
| nes | NES |
| sms | Sega Master System |
| c64 | Commodore 64 |
| pce | PC Engine |
| segag | Sega Genesis |
| neo | NeoGeo |
| sega32 | Sega 32X |
| segacd | Sega CD |
| 3do | 3DO Interactive |
| saturn | SegaSaturn |
| psx | Sony PlayStation |
| ps2 | Sony PlayStation 2 |
| n64 | Nintendo64 |
| jaguar | Atari Jaguar |
| dc | Sega Dreamcast |
| xboxog | Original Xbox games |
| amazon | Amazon |
| gg | GamersGate |
| egg | Newegg |
| bb | BestBuy |
| gameuk | Game UK |
| fanatical | Fanatical store |
| playasia | PlayAsia |
| stadia | Google Stadia |
| arc | ARC |
| eso | ESO |
| glyph | Trion World |
| aionl | Aion: Legions of War |
| aion | Aion |
| blade | Blade and Soul |
| gw | Guild Wars |
| gw2 | Guild Wars 2 |
| lin2 | Lineage 2 |
| ffxi | Final Fantasy XI |
| ffxiv | Final Fantasy XIV |
| totalwar | TotalWar |
| winstore | Windows Store |
| elites | Elite Dangerous |
| star | Star Citizen |
| psp | Playstation Portable |
| psvita | Playstation Vita |
| nds | Nintendo DS |
| 3ds | Nintendo 3DS |
| pathofexile | Path of Exile |

133
README.md
View File

@@ -1,45 +1,128 @@
# Galaxy python plugin API
# GOG Galaxy Integrations Python API
## Usage
This Python library allows developers to easily build community integrations for various gaming platforms with GOG Galaxy 2.0.
Implement plugin:
- refer to our <a href='https://galaxy-integrations-python-api.readthedocs.io'>documentation</a>
## Features
Each integration in GOG Galaxy 2.0 comes as a separate Python script and is launched as a separate process that needs to communicate with the main instance of GOG Galaxy 2.0.
The provided features are:
- multistep authorization using a browser built into GOG Galaxy 2.0
- support for GOG Galaxy 2.0 features:
- importing owned and detecting installed games
- installing and launching games
- importing achievements and game time
- importing friends lists and statuses
- importing friends recommendations list
- receiving and sending chat messages
- cache storage
## Platform Id's
Each integration can implement only one platform. Each integration must declare which platform it's integrating.
[List of possible Platform IDs](PLATFORM_IDs.md)
## Basic usage
Each integration should inherit from the :class:`~galaxy.api.plugin.Plugin` class. Supported methods like :meth:`~galaxy.api.plugin.Plugin.get_owned_games` should be overwritten - they are called from the GOG Galaxy client at the appropriate times.
Each of those methods can raise exceptions inherited from the :exc:`~galaxy.api.jsonrpc.ApplicationError`.
Communication between an integration and the client is also possible with the use of notifications, for example: :meth:`~galaxy.api.plugin.Plugin.update_local_game_status`.
```python
import asyncio
from galaxy.api.plugin import Plugin
import sys
from galaxy.api.plugin import Plugin, create_and_run_plugin
from galaxy.api.consts import Platform
class PluginExample(Plugin):
def __init__(self, reader, writer, token):
super().__init__(
Platform.Generic, # Choose platform from available list
"0.1", # Version
reader,
writer,
token
)
# implement methods
async def authenticate(self, stored_credentials=None):
pass
def main():
create_and_run_plugin(PluginExample, sys.argv)
# run plugin event loop
if __name__ == "__main__":
asyncio.run(MockPlugin().run())
main()
```
Use [pyinstaller](https://www.pyinstaller.org/) to create plugin executbale.
## Deployment
## Development
The client has a built-in Python 3.7 interpreter, so integrations are delivered as Python modules.
In order to be found by GOG Galaxy 2.0 an integration folder should be placed in [lookup directory](#deploy-location). Beside all the Python files, the integration folder must contain [manifest.json](#deploy-manifest) and all third-party dependencies. See an [exemplary structure](#deploy-structure-example).
### Lookup directory
<a name="deploy-location"></a>
- Windows:
`%localappdata%\GOG.com\Galaxy\plugins\installed`
- macOS:
`~/Library/Application Support/GOG.com/Galaxy/plugins/installed`
### Manifest
<a name="deploy-manifest"></a>
Obligatory JSON file to be placed in an integration folder.
```json
{
"name": "Example plugin",
"platform": "generic",
"guid": "UNIQUE-GUID",
"version": "0.1",
"description": "Example plugin",
"author": "Name",
"email": "author@email.com",
"url": "https://github.com/user/galaxy-plugin-example",
"script": "plugin.py"
}
```
| property | description |
|---------------|---|
| `guid` | |
| `description` | |
| `url` | |
| `script` | path of the entry point module, relative to the integration folder |
### Dependencies
All third-party packages (packages not included in the Python 3.7 standard library) should be deployed along with plugin files. Use the following command structure:
```pip install DEP --target DIR --implementation cp --python-version 37```
For example, a plugin that uses *requests* could have the following structure:
<a name="deploy-structure-example"></a>
Install required packages:
```bash
pip install -r requirements.txt
installed
└── my_integration
   ├── galaxy
   │   └── api
   ├── requests
   │   └── ...
   ├── plugin.py
└── manifest.json
```
Run tests:
```bash
pytest
```
## Legal Notice
## Changelog
### 0.21
* Add `Epic` platform.
### 0.16
* Do not log sensitive data.
* Return `LocalGameState` as int (possible combination of flags).
### 0.15
* `shutdown()` is called on socket disconnection.
### 0.14
* Added required version parameter to Plugin constructor.
By integrating or attempting to integrate any applications or content with or into GOG Galaxy 2.0 you represent that such application or content is your original creation (other than any software made available by GOG) and/or that you have all necessary rights to grant such applicable rights to the relevant community integration to GOG and to GOG Galaxy 2.0 end users for the purpose of use of such community integration and that such community integration comply with any third party license and other requirements including compliance with applicable laws.

16
docs/make.py Normal file
View File

@@ -0,0 +1,16 @@
"""Builds documentation locally. Use for preview only"""
import pathlib
import subprocess
import webbrowser
source = pathlib.Path("docs", "source")
build = pathlib.Path("docs", "build")
master_doc = 'index.html'
subprocess.run(['sphinx-build', '-M', 'clean', str(source), str(build)])
subprocess.run(['sphinx-build', '-M', 'html', str(source), str(build)])
master_path = build / 'html' / master_doc
webbrowser.open(f'file://{master_path.resolve()}')

4
docs/requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
Sphinx==2.0.1
sphinx-rtd-theme==0.4.3
sphinx-autodoc-typehints==1.6.0
m2r==0.2.1

74
docs/source/conf.py Normal file
View File

@@ -0,0 +1,74 @@
# Configuration file for the Sphinx documentation builder.
# Documentation:
# http://www.sphinx-doc.org/en/master/config
import os
import sys
import subprocess
# -- Path setup --------------------------------------------------------------
_ROOT = os.path.join('..', '..')
sys.path.append(os.path.abspath(os.path.join(_ROOT, 'src')))
# -- Project information -----------------------------------------------------
project = 'GOG Galaxy Integrations API'
copyright = '2019, GOG.com'
_author, _version = subprocess.check_output(
['python', os.path.join(_ROOT, 'setup.py'), '--author', '--version'],
universal_newlines=True).strip().split('\n')
author = _author
version = _version
release = _version
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx_autodoc_typehints',
'm2r' # mdinclude directive for makrdown files
]
autodoc_member_order = 'bysource'
autodoc_inherit_docstrings = False
autodoc_mock_imports = ["galaxy.http"]
set_type_checking_flag = True
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = []
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = "sphinx_rtd_theme"
html_theme_options = {
# 'canonical_url': '', # main page to be serach in google with trailing slash
'display_version': True,
'style_external_links': True,
# Toc options
'collapse_navigation': False,
'sticky_navigation': True,
'navigation_depth': 4,
}
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
master_doc = 'index'

View File

@@ -0,0 +1,40 @@
galaxy.api
=================
plugin
------------------------
.. automodule:: galaxy.api.plugin
:members:
:undoc-members:
:exclude-members: JSONEncoder, features, achievements_import_finished, game_times_import_finished, start_achievements_import, start_game_times_import, get_game_times, get_unlocked_achievements
types
-----------------------
.. automodule:: galaxy.api.types
:members:
:undoc-members:
consts
------------------------
.. automodule:: galaxy.api.consts
:members:
:undoc-members:
:show-inheritance:
:exclude-members: Feature
errors
------------------------
.. autoexception:: galaxy.api.jsonrpc.ApplicationError
:show-inheritance:
.. autoexception:: galaxy.api.jsonrpc.UnknownError
:show-inheritance:
.. automodule:: galaxy.api.errors
:members:
:undoc-members:
:show-inheritance:

15
docs/source/index.rst Normal file
View File

@@ -0,0 +1,15 @@
GOG Galaxy Integrations Python API
=================================================
.. toctree::
:maxdepth: 2
:includehidden:
Overview <overview>
API <galaxy.api>
Platform ID's <platforms>
Index
-------------------
* :ref:`genindex`

15
docs/source/overview.rst Normal file
View File

@@ -0,0 +1,15 @@
.. mdinclude:: ../../README.md
:end-line: 4
.. excluding self-pointing documentation link
.. mdinclude:: ../../README.md
:start-line: 6
:end-line: 26
.. excluding Platforms Id's link
:ref:`platforms-link`
.. mdinclude:: ../../README.md
:start-line: 28

View File

@@ -0,0 +1,2 @@
.. _platforms-link:
.. mdinclude:: ../../PLATFORM_IDs.md

View File

@@ -1,43 +0,0 @@
from enum import Enum, Flag
class Platform(Enum):
Unknown = "unknown"
Gog = "gog"
Steam = "steam"
Psn = "psn"
XBoxOne = "xboxone"
Generic = "generic"
Origin = "origin"
Uplay = "uplay"
Battlenet = "battlenet"
Epic = "epic"
class Feature(Enum):
Unknown = "Unknown"
ImportInstalledGames = "ImportInstalledGames"
ImportOwnedGames = "ImportOwnedGames"
LaunchGame = "LaunchGame"
InstallGame = "InstallGame"
UninstallGame = "UninstallGame"
ImportAchievements = "ImportAchievements"
ImportGameTime = "ImportGameTime"
Chat = "Chat"
ImportUsers = "ImportUsers"
VerifyGame = "VerifyGame"
class LicenseType(Enum):
Unknown = "Unknown"
SinglePurchase = "SinglePurchase"
FreeToPlay = "FreeToPlay"
OtherUserLicense = "OtherUserLicense"
class LocalGameState(Flag):
None_ = 0
Installed = 1
Running = 2
class PresenceState(Enum):
Unknown = "Unknown"
Online = "online"
Offline = "offline"
Away = "away"

View File

@@ -1,352 +0,0 @@
import asyncio
import json
import logging
import logging.handlers
import dataclasses
from enum import Enum
from collections import OrderedDict
import sys
from galaxy.api.jsonrpc import Server, NotificationClient
from galaxy.api.consts import Feature
class JSONEncoder(json.JSONEncoder):
def default(self, o): # pylint: disable=method-hidden
if dataclasses.is_dataclass(o):
# filter None values
def dict_factory(elements):
return {k: v for k, v in elements if v is not None}
return dataclasses.asdict(o, dict_factory=dict_factory)
if isinstance(o, Enum):
return o.value
return super().default(o)
class Plugin():
def __init__(self, platform, version, reader, writer, handshake_token):
logging.info("Creating plugin for platform %s, version %s", platform.value, version)
self._platform = platform
self._version = version
self._feature_methods = OrderedDict()
self._active = True
self._reader, self._writer = reader, writer
self._handshake_token = handshake_token
encoder = JSONEncoder()
self._server = Server(self._reader, self._writer, encoder)
self._notification_client = NotificationClient(self._writer, encoder)
def eof_handler():
self._shutdown()
self._server.register_eof(eof_handler)
# internal
self._register_method("shutdown", self._shutdown, internal=True)
self._register_method("get_capabilities", self._get_capabilities, internal=True)
self._register_method("ping", self._ping, internal=True)
# implemented by developer
self._register_method("init_authentication", self.authenticate, sensitive_params=["stored_credentials"])
self._register_method("pass_login_credentials", self.pass_login_credentials)
self._register_method(
"import_owned_games",
self.get_owned_games,
result_name="owned_games",
feature=Feature.ImportOwnedGames
)
self._register_method(
"import_unlocked_achievements",
self.get_unlocked_achievements,
result_name="unlocked_achievements",
feature=Feature.ImportAchievements
)
self._register_method(
"import_local_games",
self.get_local_games,
result_name="local_games",
feature=Feature.ImportInstalledGames
)
self._register_notification("launch_game", self.launch_game, feature=Feature.LaunchGame)
self._register_notification("install_game", self.install_game, feature=Feature.InstallGame)
self._register_notification(
"uninstall_game",
self.uninstall_game,
feature=Feature.UninstallGame
)
self._register_method(
"import_friends",
self.get_friends,
result_name="user_info_list",
feature=Feature.ImportUsers
)
self._register_method(
"import_user_infos",
self.get_users,
result_name="user_info_list",
feature=Feature.ImportUsers
)
self._register_method(
"send_message",
self.send_message,
feature=Feature.Chat
)
self._register_method(
"mark_as_read",
self.mark_as_read,
feature=Feature.Chat
)
self._register_method(
"import_rooms",
self.get_rooms,
result_name="rooms",
feature=Feature.Chat
)
self._register_method(
"import_room_history_from_message",
self.get_room_history_from_message,
result_name="messages",
feature=Feature.Chat
)
self._register_method(
"import_room_history_from_timestamp",
self.get_room_history_from_timestamp,
result_name="messages",
feature=Feature.Chat
)
self._register_method(
"import_game_times",
self.get_game_times,
result_name="game_times",
feature=Feature.ImportGameTime
)
@property
def features(self):
features = []
if self.__class__ != Plugin:
for feature, handlers in self._feature_methods.items():
if self._implements(handlers):
features.append(feature)
return features
def _implements(self, handlers):
for handler in handlers:
if handler.__name__ not in self.__class__.__dict__:
return False
return True
def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False, feature=None):
if internal:
def method(params):
result = handler(**params)
if result_name:
result = {
result_name: result
}
return result
self._server.register_method(name, method, True, sensitive_params)
else:
async def method(params):
result = await handler(**params)
if result_name:
result = {
result_name: result
}
return result
self._server.register_method(name, method, False, sensitive_params)
if feature is not None:
self._feature_methods.setdefault(feature, []).append(handler)
def _register_notification(self, name, handler, internal=False, sensitive_params=False, feature=None):
self._server.register_notification(name, handler, internal, sensitive_params)
if feature is not None:
self._feature_methods.setdefault(feature, []).append(handler)
async def run(self):
"""Plugin main coorutine"""
async def pass_control():
while self._active:
try:
self.tick()
except Exception:
logging.exception("Unexpected exception raised in plugin tick")
await asyncio.sleep(1)
await asyncio.gather(pass_control(), self._server.run())
def _shutdown(self):
logging.info("Shuting down")
self._server.stop()
self._active = False
self.shutdown()
def _get_capabilities(self):
return {
"platform_name": self._platform,
"features": self.features,
"token": self._handshake_token
}
@staticmethod
def _ping():
pass
# notifications
def store_credentials(self, credentials):
"""Notify client to store plugin credentials.
They will be pass to next authencicate calls.
"""
self._notification_client.notify("store_credentials", credentials, sensitive_params=True)
def add_game(self, game):
params = {"owned_game" : game}
self._notification_client.notify("owned_game_added", params)
def remove_game(self, game_id):
params = {"game_id" : game_id}
self._notification_client.notify("owned_game_removed", params)
def update_game(self, game):
params = {"owned_game" : game}
self._notification_client.notify("owned_game_updated", params)
def unlock_achievement(self, game_id, achievement):
params = {
"game_id": game_id,
"achievement": achievement
}
self._notification_client.notify("achievement_unlocked", params)
def update_local_game_status(self, local_game):
params = {"local_game" : local_game}
self._notification_client.notify("local_game_status_changed", params)
def add_friend(self, user):
params = {"user_info" : user}
self._notification_client.notify("friend_added", params)
def remove_friend(self, user_id):
params = {"user_id" : user_id}
self._notification_client.notify("friend_removed", params)
def update_friend(self, user):
params = {"user_info" : user}
self._notification_client.notify("friend_updated", params)
def update_room(self, room_id, unread_message_count=None, new_messages=None):
params = {"room_id": room_id}
if unread_message_count is not None:
params["unread_message_count"] = unread_message_count
if new_messages is not None:
params["messages"] = new_messages
self._notification_client.notify("chat_room_updated", params)
def update_game_time(self, game_time):
params = {"game_time" : game_time}
self._notification_client.notify("game_time_updated", params)
def lost_authentication(self):
self._notification_client.notify("authentication_lost", None)
# handlers
def tick(self):
"""This method is called periodicaly.
Override it to implement periodical tasks like refreshing cache.
This method should not be blocking - any longer actions should be
handled by asycio tasks.
"""
def shutdown(self):
"""This method is called on plugin shutdown.
Override it to implement tear down.
"""
# methods
async def authenticate(self, stored_credentials=None):
"""Overide this method to handle plugin authentication.
The method should return galaxy.api.types.Authentication
or raise galaxy.api.types.LoginError on authentication failure.
"""
raise NotImplementedError()
async def pass_login_credentials(self, step, credentials, cookies):
raise NotImplementedError()
async def get_owned_games(self):
raise NotImplementedError()
async def get_unlocked_achievements(self, game_id):
raise NotImplementedError()
async def get_local_games(self):
raise NotImplementedError()
async def launch_game(self, game_id):
raise NotImplementedError()
async def install_game(self, game_id):
raise NotImplementedError()
async def uninstall_game(self, game_id):
raise NotImplementedError()
async def get_friends(self):
raise NotImplementedError()
async def get_users(self, user_id_list):
raise NotImplementedError()
async def send_message(self, room_id, message_text):
raise NotImplementedError()
async def mark_as_read(self, room_id, last_message_id):
raise NotImplementedError()
async def get_rooms(self):
raise NotImplementedError()
async def get_room_history_from_message(self, room_id, message_id):
raise NotImplementedError()
async def get_room_history_from_timestamp(self, room_id, from_timestamp):
raise NotImplementedError()
async def get_game_times(self):
raise NotImplementedError()
def create_and_run_plugin(plugin_class, argv):
if len(argv) < 3:
logging.critical("Not enough parameters, required: token, port")
sys.exit(1)
token = argv[1]
try:
port = int(argv[2])
except ValueError:
logging.critical("Failed to parse port value: %s", argv[2])
sys.exit(2)
if not (1 <= port <= 65535):
logging.critical("Port value out of range (1, 65535)")
sys.exit(3)
if not issubclass(plugin_class, Plugin):
logging.critical("plugin_class must be subclass of Plugin")
sys.exit(4)
async def coroutine():
reader, writer = await asyncio.open_connection("127.0.0.1", port)
plugin = plugin_class(reader, writer, token)
await plugin.run()
try:
asyncio.run(coroutine())
except Exception:
logging.exception("Error while running plugin")
sys.exit(5)

View File

@@ -1,88 +0,0 @@
from dataclasses import dataclass
from typing import List, Dict, Optional
from galaxy.api.consts import LicenseType, LocalGameState, PresenceState
@dataclass
class Authentication():
user_id: str
user_name: str
@dataclass
class Cookie():
name: str
value: str
domain: Optional[str] = None
path: Optional[str] = None
@dataclass
class NextStep():
next_step: str
auth_params: Dict[str, str]
cookies: Optional[List[Cookie]] = None
@dataclass
class LicenseInfo():
license_type: LicenseType
owner: Optional[str] = None
@dataclass
class Dlc():
dlc_id: str
dlc_title: str
license_info: LicenseInfo
@dataclass
class Game():
game_id: str
game_title: str
dlcs: Optional[List[Dlc]]
license_info: LicenseInfo
@dataclass
class Achievement():
unlock_time: int
achievement_id: Optional[str] = None
achievement_name: Optional[str] = None
def __post_init__(self):
assert self.achievement_id or self.achievement_name, \
"One of achievement_id or achievement_name is required"
@dataclass
class LocalGame():
game_id: str
local_game_state: LocalGameState
@dataclass
class Presence():
presence_state: PresenceState
game_id: Optional[str] = None
presence_status: Optional[str] = None
@dataclass
class UserInfo():
user_id: str
is_friend: bool
user_name: str
avatar_url: str
presence: Presence
@dataclass
class Room():
room_id: str
unread_message_count: int
last_message_id: str
@dataclass
class Message():
message_id: str
sender_id: str
sent_time: int
message_text: str
@dataclass
class GameTime():
game_id: str
time_played: int
last_played_time: int

View File

@@ -1,5 +0,0 @@
from unittest.mock import MagicMock
class AsyncMock(MagicMock):
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)

View File

@@ -1,2 +1,8 @@
-e .
pytest==4.2.0
pytest-asyncio==0.10.0
pytest-mock==1.10.3
pytest-flakes==4.0.0
# because of pip bug https://github.com/pypa/pip/issues/4780
aiohttp==3.5.4
certifi==2019.3.9

View File

@@ -2,9 +2,14 @@ from setuptools import setup, find_packages
setup(
name="galaxy.plugin.api",
version="0.22",
description="Galaxy python plugin API",
version="0.41",
description="GOG Galaxy Integrations Python API",
author='Galaxy team',
author_email='galaxy@gog.com',
packages=find_packages(exclude=["tests"])
packages=find_packages("src"),
package_dir={'': 'src'},
install_requires=[
"aiohttp==3.5.4",
"certifi==2019.3.9"
]
)

12
src/.readthedocs.yml Normal file
View File

@@ -0,0 +1,12 @@
version: 2
sphinx:
configuration: docs/source/conf.py
formats: all
python:
version: 3.7
install:
- requirements: requirements.txt
- requirements: docs/requirements.txt

1
src/galaxy/__init__.py Normal file
View File

@@ -0,0 +1 @@
__path__ = __import__('pkgutil').extend_path(__path__, __name__)

126
src/galaxy/api/consts.py Normal file
View File

@@ -0,0 +1,126 @@
from enum import Enum, Flag
class Platform(Enum):
"""Supported gaming platforms"""
Unknown = "unknown"
Gog = "gog"
Steam = "steam"
Psn = "psn"
XBoxOne = "xboxone"
Generic = "generic"
Origin = "origin"
Uplay = "uplay"
Battlenet = "battlenet"
Epic = "epic"
Bethesda = "bethesda"
ParadoxPlaza = "paradox"
HumbleBundle = "humble"
Kartridge = "kartridge"
ItchIo = "itch"
NintendoSwitch = "nswitch"
NintendoWiiU = "nwiiu"
NintendoWii = "nwii"
NintendoGameCube = "ncube"
RiotGames = "riot"
Wargaming = "wargaming"
NintendoGameBoy = "ngameboy"
Atari = "atari"
Amiga = "amiga"
SuperNintendoEntertainmentSystem = "snes"
Beamdog = "beamdog"
Direct2Drive = "d2d"
Discord = "discord"
DotEmu = "dotemu"
GameHouse = "gamehouse"
GreenManGaming = "gmg"
WePlay = "weplay"
ZxSpectrum = "zx"
ColecoVision = "vision"
NintendoEntertainmentSystem = "nes"
SegaMasterSystem = "sms"
Commodore64 = "c64"
PcEngine = "pce"
SegaGenesis = "segag"
NeoGeo = "neo"
Sega32X = "sega32"
SegaCd = "segacd"
_3Do = "3do"
SegaSaturn = "saturn"
PlayStation = "psx"
PlayStation2 = "ps2"
Nintendo64 = "n64"
AtariJaguar = "jaguar"
SegaDreamcast = "dc"
Xbox = "xboxog"
Amazon = "amazon"
GamersGate = "gg"
Newegg = "egg"
BestBuy = "bb"
GameUk = "gameuk"
Fanatical = "fanatical"
PlayAsia = "playasia"
Stadia = "stadia"
Arc = "arc"
ElderScrollsOnline = "eso"
Glyph = "glyph"
AionLegionsOfWar = "aionl"
Aion = "aion"
BladeAndSoul = "blade"
GuildWars = "gw"
GuildWars2 = "gw2"
Lineage2 = "lin2"
FinalFantasy11 = "ffxi"
FinalFantasy14 = "ffxiv"
TotalWar = "totalwar"
WindowsStore = "winstore"
EliteDangerous = "elites"
StarCitizen = "star"
PlayStationPortable = "psp"
PlayStationVita = "psvita"
NintendoDs = "nds"
Nintendo3Ds = "3ds"
PathOfExile = "pathofexile"
class Feature(Enum):
"""Possible features that can be implemented by an integration.
It does not have to support all or any specific features from the list.
"""
Unknown = "Unknown"
ImportInstalledGames = "ImportInstalledGames"
ImportOwnedGames = "ImportOwnedGames"
LaunchGame = "LaunchGame"
InstallGame = "InstallGame"
UninstallGame = "UninstallGame"
ImportAchievements = "ImportAchievements"
ImportGameTime = "ImportGameTime"
Chat = "Chat"
ImportUsers = "ImportUsers"
VerifyGame = "VerifyGame"
ImportFriends = "ImportFriends"
class LicenseType(Enum):
"""Possible game license types, understandable for the GOG Galaxy client."""
Unknown = "Unknown"
SinglePurchase = "SinglePurchase"
FreeToPlay = "FreeToPlay"
OtherUserLicense = "OtherUserLicense"
class LocalGameState(Flag):
"""Possible states that a local game can be in.
For example a game which is both installed and currently running should have its state set as a "bitwise or" of Running and Installed flags:
``local_game_state=<LocalGameState.Running|Installed: 3>``
"""
None_ = 0
Installed = 1
Running = 2
class PresenceState(Enum):
""""Possible states that a user can be in."""
Unknown = "Unknown"
Online = "online"
Offline = "offline"
Away = "away"

View File

@@ -22,6 +22,10 @@ class UnknownBackendResponse(ApplicationError):
def __init__(self, data=None):
super().__init__(4, "Backend responded in uknown way", data)
class TooManyRequests(ApplicationError):
def __init__(self, data=None):
super().__init__(5, "Too many requests. Try again later", data)
class InvalidCredentials(ApplicationError):
def __init__(self, data=None):
super().__init__(100, "Invalid credentials", data)
@@ -50,18 +54,6 @@ class AccessDenied(ApplicationError):
def __init__(self, data=None):
super().__init__(106, "Access denied", data)
class ParentalControlBlock(ApplicationError):
def __init__(self, data=None):
super().__init__(107, "Parental control block", data)
class DeviceBlocked(ApplicationError):
def __init__(self, data=None):
super().__init__(108, "Device blocked", data)
class RegionBlocked(ApplicationError):
def __init__(self, data=None):
super().__init__(109, "Region blocked", data)
class FailedParsingManifest(ApplicationError):
def __init__(self, data=None):
super().__init__(200, "Failed parsing manifest", data)
@@ -77,3 +69,7 @@ class IncoherentLastMessage(ApplicationError):
class MessageNotFound(ApplicationError):
def __init__(self, data=None):
super().__init__(500, "Message not found", data)
class ImportInProgress(ApplicationError):
def __init__(self, data=None):
super().__init__(600, "Import already in progress", data)

View File

@@ -2,8 +2,11 @@ import asyncio
from collections import namedtuple
from collections.abc import Iterable
import logging
import inspect
import json
from galaxy.reader import StreamLineReader
class JsonRpcError(Exception):
def __init__(self, code, message, data=None):
self.code = code
@@ -11,6 +14,9 @@ class JsonRpcError(Exception):
self.data = data
super().__init__()
def __eq__(self, other):
return self.code == other.code and self.message == other.message and self.data == other.data
class ParseError(JsonRpcError):
def __init__(self):
super().__init__(-32700, "Parse error")
@@ -46,26 +52,24 @@ class UnknownError(ApplicationError):
super().__init__(0, "Unknown error", data)
Request = namedtuple("Request", ["method", "params", "id"], defaults=[{}, None])
Method = namedtuple("Method", ["callback", "internal", "sensitive_params"])
Method = namedtuple("Method", ["callback", "signature", "internal", "sensitive_params"])
def anonymise_sensitive_params(params, sensitive_params):
anomized_data = "****"
if not sensitive_params:
return params
if isinstance(sensitive_params, bool):
if sensitive_params:
return {k:anomized_data for k,v in params.items()}
if isinstance(sensitive_params, Iterable):
anomized_params = params.copy()
for key in anomized_params.keys():
if key in sensitive_params:
anomized_params[key] = anomized_data
return anomized_params
return {k: anomized_data if k in sensitive_params else v for k, v in params.items()}
return anomized_data
return params
class Server():
def __init__(self, reader, writer, encoder=json.JSONEncoder()):
self._active = True
self._reader = reader
self._reader = StreamLineReader(reader)
self._writer = writer
self._encoder = encoder
self._methods = {}
@@ -75,24 +79,26 @@ class Server():
def register_method(self, name, callback, internal, sensitive_params=False):
"""
Register method
:param name:
:param callback:
:param internal: if True the callback will be processed immediately (synchronously)
:param sensitive_params: list of parameters that will by anonymized before logging; if False - no params
are considered sensitive, if True - all params are considered sensitive
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._methods[name] = Method(callback, internal, sensitive_params)
self._methods[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
def register_notification(self, name, callback, internal, sensitive_params=False):
"""
Register notification
:param name:
:param callback:
:param internal: if True the callback will be processed immediately (synchronously)
:param sensitive_params: list of parameters that will by anonymized before logging; if False - no params
are considered sensitive, if True - all params are considered sensitive
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
self._notifications[name] = Method(callback, internal, sensitive_params)
self._notifications[name] = Method(callback, inspect.signature(callback), internal, sensitive_params)
def register_eof(self, callback):
self._eof_listeners.append(callback)
@@ -110,6 +116,7 @@ class Server():
data = data.strip()
logging.debug("Received %d bytes of data", len(data))
self._handle_input(data)
await asyncio.sleep(0) # To not starve task queue
def stop(self):
self._active = False
@@ -138,15 +145,20 @@ class Server():
logging.error("Received unknown notification: %s", request.method)
return
callback, internal, sensitive_params = method
callback, signature, internal, sensitive_params = method
self._log_request(request, sensitive_params)
try:
bound_args = signature.bind(**request.params)
except TypeError:
self._send_error(request.id, InvalidParams())
if internal:
# internal requests are handled immediately
callback(**request.params)
callback(*bound_args.args, **bound_args.kwargs)
else:
try:
asyncio.create_task(callback(**request.params))
asyncio.create_task(callback(*bound_args.args, **bound_args.kwargs))
except Exception:
logging.exception("Unexpected exception raised in notification handler")
@@ -157,25 +169,28 @@ class Server():
self._send_error(request.id, MethodNotFound())
return
callback, internal, sensitive_params = method
callback, signature, internal, sensitive_params = method
self._log_request(request, sensitive_params)
try:
bound_args = signature.bind(**request.params)
except TypeError:
self._send_error(request.id, InvalidParams())
if internal:
# internal requests are handled immediately
response = callback(request.params)
response = callback(*bound_args.args, **bound_args.kwargs)
self._send_response(request.id, response)
else:
async def handle():
try:
result = await callback(request.params)
result = await callback(*bound_args.args, **bound_args.kwargs)
self._send_response(request.id, result)
except TypeError:
self._send_error(request.id, InvalidParams())
except NotImplementedError:
self._send_error(request.id, MethodNotFound())
except JsonRpcError as error:
self._send_error(request.id, error)
except Exception as e: #pylint: disable=broad-except
except Exception as e: #pylint: disable=broad-except
logging.exception("Unexpected exception raised in plugin handler")
self._send_error(request.id, UnknownError(str(e)))
@@ -244,10 +259,11 @@ class NotificationClient():
def notify(self, method, params, sensitive_params=False):
"""
Send notification
:param method:
:param params:
:param sensitive_params: list of parameters that will by anonymized before logging; if False - no params
are considered sensitive, if True - all params are considered sensitive
:param sensitive_params: list of parameters that are anonymized before logging; \
if False - no params are considered sensitive, if True - all params are considered sensitive
"""
notification = {
"jsonrpc": "2.0",

890
src/galaxy/api/plugin.py Normal file
View File

@@ -0,0 +1,890 @@
import asyncio
import json
import logging
import logging.handlers
import dataclasses
from enum import Enum
from collections import OrderedDict
import sys
from typing import Any, List, Dict, Optional, Union
from galaxy.api.types import Achievement, Game, LocalGame, FriendInfo, GameTime, UserInfo, Room
from galaxy.api.jsonrpc import Server, NotificationClient, ApplicationError
from galaxy.api.consts import Feature
from galaxy.api.errors import UnknownError, ImportInProgress
from galaxy.api.types import Authentication, NextStep, Message
class JSONEncoder(json.JSONEncoder):
def default(self, o): # pylint: disable=method-hidden
if dataclasses.is_dataclass(o):
# filter None values
def dict_factory(elements):
return {k: v for k, v in elements if v is not None}
return dataclasses.asdict(o, dict_factory=dict_factory)
if isinstance(o, Enum):
return o.value
return super().default(o)
class Plugin:
"""Use and override methods of this class to create a new platform integration."""
def __init__(self, platform, version, reader, writer, handshake_token):
logging.info("Creating plugin for platform %s, version %s", platform.value, version)
self._platform = platform
self._version = version
self._feature_methods = OrderedDict()
self._active = True
self._pass_control_task = None
self._reader, self._writer = reader, writer
self._handshake_token = handshake_token
encoder = JSONEncoder()
self._server = Server(self._reader, self._writer, encoder)
self._notification_client = NotificationClient(self._writer, encoder)
def eof_handler():
self._shutdown()
self._server.register_eof(eof_handler)
self._achievements_import_in_progress = False
self._game_times_import_in_progress = False
self._persistent_cache = dict()
# internal
self._register_method("shutdown", self._shutdown, internal=True)
self._register_method("get_capabilities", self._get_capabilities, internal=True)
self._register_method(
"initialize_cache",
self._initialize_cache,
internal=True,
sensitive_params="data"
)
self._register_method("ping", self._ping, internal=True)
# implemented by developer
self._register_method(
"init_authentication",
self.authenticate,
sensitive_params=["stored_credentials"]
)
self._register_method(
"pass_login_credentials",
self.pass_login_credentials,
sensitive_params=["cookies", "credentials"]
)
self._register_method(
"import_owned_games",
self.get_owned_games,
result_name="owned_games",
feature=Feature.ImportOwnedGames
)
self._register_method(
"import_unlocked_achievements",
self.get_unlocked_achievements,
result_name="unlocked_achievements",
feature=Feature.ImportAchievements
)
self._register_method(
"start_achievements_import",
self.start_achievements_import,
)
self._register_method(
"import_local_games",
self.get_local_games,
result_name="local_games",
feature=Feature.ImportInstalledGames
)
self._register_notification("launch_game", self.launch_game, feature=Feature.LaunchGame)
self._register_notification("install_game", self.install_game, feature=Feature.InstallGame)
self._register_notification(
"uninstall_game",
self.uninstall_game,
feature=Feature.UninstallGame
)
self._register_method(
"import_friends",
self.get_friends,
result_name="friend_info_list",
feature=Feature.ImportFriends
)
self._register_method(
"import_user_infos",
self.get_users,
result_name="user_info_list",
feature=Feature.ImportUsers
)
self._register_method(
"send_message",
self.send_message,
feature=Feature.Chat
)
self._register_method(
"mark_as_read",
self.mark_as_read,
feature=Feature.Chat
)
self._register_method(
"import_rooms",
self.get_rooms,
result_name="rooms",
feature=Feature.Chat
)
self._register_method(
"import_room_history_from_message",
self.get_room_history_from_message,
result_name="messages",
feature=Feature.Chat
)
self._register_method(
"import_room_history_from_timestamp",
self.get_room_history_from_timestamp,
result_name="messages",
feature=Feature.Chat
)
self._register_method(
"import_game_times",
self.get_game_times,
result_name="game_times",
feature=Feature.ImportGameTime
)
self._register_method(
"start_game_times_import",
self.start_game_times_import,
)
@property
def features(self):
features = []
if self.__class__ != Plugin:
for feature, handlers in self._feature_methods.items():
if self._implements(handlers):
features.append(feature)
return features
@property
def persistent_cache(self) -> Dict:
"""The cache is only available after the :meth:`~.handshake_complete()` is called.
"""
return self._persistent_cache
def _implements(self, handlers):
for handler in handlers:
if handler.__name__ not in self.__class__.__dict__:
return False
return True
def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False, feature=None):
if internal:
def method(*args, **kwargs):
result = handler(*args, **kwargs)
if result_name:
result = {
result_name: result
}
return result
self._server.register_method(name, method, True, sensitive_params)
else:
async def method(*args, **kwargs):
result = await handler(*args, **kwargs)
if result_name:
result = {
result_name: result
}
return result
self._server.register_method(name, method, False, sensitive_params)
if feature is not None:
self._feature_methods.setdefault(feature, []).append(handler)
def _register_notification(self, name, handler, internal=False, sensitive_params=False, feature=None):
self._server.register_notification(name, handler, internal, sensitive_params)
if feature is not None:
self._feature_methods.setdefault(feature, []).append(handler)
async def run(self):
"""Plugin's main coroutine."""
await self._server.run()
if self._pass_control_task is not None:
await self._pass_control_task
async def _pass_control(self):
while self._active:
try:
self.tick()
except Exception:
logging.exception("Unexpected exception raised in plugin tick")
await asyncio.sleep(1)
def _shutdown(self):
logging.info("Shutting down")
self._server.stop()
self._active = False
self.shutdown()
def _get_capabilities(self):
return {
"platform_name": self._platform,
"features": self.features,
"token": self._handshake_token
}
def _initialize_cache(self, data: Dict):
self._persistent_cache = data
self.handshake_complete()
self._pass_control_task = asyncio.create_task(self._pass_control())
@staticmethod
def _ping():
pass
# notifications
def store_credentials(self, credentials: Dict[str, Any]) -> None:
"""Notify the client to store authentication credentials.
Credentials are passed on the next authenticate call.
:param credentials: credentials that client will store; they are stored locally on a user pc
Example use case of store_credentials:
.. code-block:: python
:linenos:
async def pass_login_credentials(self, step, credentials, cookies):
if self.got_everything(credentials,cookies):
user_data = await self.parse_credentials(credentials,cookies)
else:
next_params = self.get_next_params(credentials,cookies)
next_cookies = self.get_next_cookies(credentials,cookies)
return NextStep("web_session", next_params, cookies=next_cookies)
self.store_credentials(user_data['credentials'])
return Authentication(user_data['userId'], user_data['username'])
"""
self.persistent_cache['credentials'] = credentials
self._notification_client.notify("store_credentials", credentials, sensitive_params=True)
def add_game(self, game: Game) -> None:
"""Notify the client to add game to the list of owned games
of the currently authenticated user.
:param game: Game to add to the list of owned games
Example use case of add_game:
.. code-block:: python
:linenos:
async def check_for_new_games(self):
games = await self.get_owned_games()
for game in games:
if game not in self.owned_games_cache:
self.owned_games_cache.append(game)
self.add_game(game)
"""
params = {"owned_game": game}
self._notification_client.notify("owned_game_added", params)
def remove_game(self, game_id: str) -> None:
"""Notify the client to remove game from the list of owned games
of the currently authenticated user.
:param game_id: game id of the game to remove from the list of owned games
Example use case of remove_game:
.. code-block:: python
:linenos:
async def check_for_removed_games(self):
games = await self.get_owned_games()
for game in self.owned_games_cache:
if game not in games:
self.owned_games_cache.remove(game)
self.remove_game(game.game_id)
"""
params = {"game_id": game_id}
self._notification_client.notify("owned_game_removed", params)
def update_game(self, game: Game) -> None:
"""Notify the client to update the status of a game
owned by the currently authenticated user.
:param game: Game to update
"""
params = {"owned_game": game}
self._notification_client.notify("owned_game_updated", params)
def unlock_achievement(self, game_id: str, achievement: Achievement) -> None:
"""Notify the client to unlock an achievement for a specific game.
:param game_id: game_id of the game for which to unlock an achievement.
:param achievement: achievement to unlock.
"""
params = {
"game_id": game_id,
"achievement": achievement
}
self._notification_client.notify("achievement_unlocked", params)
def game_achievements_import_success(self, game_id: str, achievements: List[Achievement]) -> None:
"""Notify the client that import of achievements for a given game has succeeded.
This method is called by import_games_achievements.
:param game_id: id of the game for which the achievements were imported
:param achievements: list of imported achievements
"""
params = {
"game_id": game_id,
"unlocked_achievements": achievements
}
self._notification_client.notify("game_achievements_import_success", params)
def game_achievements_import_failure(self, game_id: str, error: ApplicationError) -> None:
"""Notify the client that import of achievements for a given game has failed.
This method is called by import_games_achievements.
:param game_id: id of the game for which the achievements import failed
:param error: error which prevented the achievements import
"""
params = {
"game_id": game_id,
"error": {
"code": error.code,
"message": error.message
}
}
self._notification_client.notify("game_achievements_import_failure", params)
def achievements_import_finished(self) -> None:
"""Notify the client that importing achievements has finished.
This method is called by import_games_achievements_task"""
self._notification_client.notify("achievements_import_finished", None)
def update_local_game_status(self, local_game: LocalGame) -> None:
"""Notify the client to update the status of a local game.
:param local_game: the LocalGame to update
Example use case triggered by the :meth:`.tick` method:
.. code-block:: python
:linenos:
:emphasize-lines: 5
async def _check_statuses(self):
for game in await self._get_local_games():
if game.status == self._cached_game_statuses.get(game.id):
continue
self.update_local_game_status(LocalGame(game.id, game.status))
self._cached_games_statuses[game.id] = game.status
asyncio.sleep(5) # interval
def tick(self):
if self._check_statuses_task is None or self._check_statuses_task.done():
self._check_statuses_task = asyncio.create_task(self._check_statuses())
"""
params = {"local_game": local_game}
self._notification_client.notify("local_game_status_changed", params)
def add_friend(self, user: FriendInfo) -> None:
"""Notify the client to add a user to friends list of the currently authenticated user.
:param user: FriendInfo of a user that the client will add to friends list
"""
params = {"friend_info": user}
self._notification_client.notify("friend_added", params)
def remove_friend(self, user_id: str) -> None:
"""Notify the client to remove a user from friends list of the currently authenticated user.
:param user_id: id of the user to remove from friends list
"""
params = {"user_id": user_id}
self._notification_client.notify("friend_removed", params)
def update_room(
self,
room_id: str,
unread_message_count: Optional[int]=None,
new_messages: Optional[List[Message]]=None
) -> None:
"""WIP, Notify the client to update the information regarding
a chat room that the currently authenticated user is in.
:param room_id: id of the room to update
:param unread_message_count: information about the new unread message count in the room
:param new_messages: list of new messages that the user received
"""
params = {"room_id": room_id}
if unread_message_count is not None:
params["unread_message_count"] = unread_message_count
if new_messages is not None:
params["messages"] = new_messages
self._notification_client.notify("chat_room_updated", params)
def update_game_time(self, game_time: GameTime) -> None:
"""Notify the client to update game time for a game.
:param game_time: game time to update
"""
params = {"game_time": game_time}
self._notification_client.notify("game_time_updated", params)
def game_time_import_success(self, game_time: GameTime) -> None:
"""Notify the client that import of a given game_time has succeeded.
This method is called by import_game_times.
:param game_time: game_time which was imported
"""
params = {"game_time": game_time}
self._notification_client.notify("game_time_import_success", params)
def game_time_import_failure(self, game_id: str, error: ApplicationError) -> None:
"""Notify the client that import of a game time for a given game has failed.
This method is called by import_game_times.
:param game_id: id of the game for which the game time could not be imported
:param error: error which prevented the game time import
"""
params = {
"game_id": game_id,
"error": {
"code": error.code,
"message": error.message
}
}
self._notification_client.notify("game_time_import_failure", params)
def game_times_import_finished(self) -> None:
"""Notify the client that importing game times has finished.
This method is called by :meth:`~.import_game_times_task`.
"""
self._notification_client.notify("game_times_import_finished", None)
def lost_authentication(self) -> None:
"""Notify the client that integration has lost authentication for the
current user and is unable to perform actions which would require it.
"""
self._notification_client.notify("authentication_lost", None)
def push_cache(self) -> None:
"""Push local copy of the persistent cache to the GOG Galaxy Client replacing existing one.
"""
self._notification_client.notify(
"push_cache",
params={"data": self._persistent_cache},
sensitive_params="data"
)
# handlers
def handshake_complete(self) -> None:
"""This method is called right after the handshake with the GOG Galaxy Client is complete and
before any other operations are called by the GOG Galaxy Client.
Persistent cache is available when this method is called.
Override it if you need to do additional plugin initializations.
This method is called internally."""
def tick(self) -> None:
"""This method is called periodically.
Override it to implement periodical non-blocking tasks.
This method is called internally.
Example of possible override of the method:
.. code-block:: python
:linenos:
def tick(self):
if not self.checking_for_new_games:
asyncio.create_task(self.check_for_new_games())
if not self.checking_for_removed_games:
asyncio.create_task(self.check_for_removed_games())
if not self.updating_game_statuses:
asyncio.create_task(self.update_game_statuses())
"""
def shutdown(self) -> None:
"""This method is called on integration shutdown.
Override it to implement tear down.
This method is called by the GOG Galaxy Client."""
# methods
async def authenticate(self, stored_credentials: Optional[Dict] = None) -> Union[NextStep, Authentication]:
"""Override this method to handle user authentication.
This method should either return :class:`~galaxy.api.types.Authentication` if the authentication is finished
or :class:`~galaxy.api.types.NextStep` if it requires going to another url.
This method is called by the GOG Galaxy Client.
:param stored_credentials: If the client received any credentials to store locally
in the previous session they will be passed here as a parameter.
Example of possible override of the method:
.. code-block:: python
:linenos:
async def authenticate(self, stored_credentials=None):
if not stored_credentials:
return NextStep("web_session", PARAMS, cookies=COOKIES)
else:
try:
user_data = self._authenticate(stored_credentials)
except AccessDenied:
raise InvalidCredentials()
return Authentication(user_data['userId'], user_data['username'])
"""
raise NotImplementedError()
async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \
-> Union[NextStep, Authentication]:
"""This method is called if we return galaxy.api.types.NextStep from authenticate or from pass_login_credentials.
This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on.
This method should either return galaxy.api.types.Authentication if the authentication is finished
or galaxy.api.types.NextStep if it requires going to another cef url.
This method is called by the GOG Galaxy Client.
:param step: deprecated.
:param credentials: end_uri previous NextStep finished on.
:param cookies: cookies extracted from the end_uri site.
Example of possible override of the method:
.. code-block:: python
:linenos:
async def pass_login_credentials(self, step, credentials, cookies):
if self.got_everything(credentials,cookies):
user_data = await self.parse_credentials(credentials,cookies)
else:
next_params = self.get_next_params(credentials,cookies)
next_cookies = self.get_next_cookies(credentials,cookies)
return NextStep("web_session", next_params, cookies=next_cookies)
self.store_credentials(user_data['credentials'])
return Authentication(user_data['userId'], user_data['username'])
"""
raise NotImplementedError()
async def get_owned_games(self) -> List[Game]:
"""Override this method to return owned games for currently logged in user.
This method is called by the GOG Galaxy Client.
Example of possible override of the method:
.. code-block:: python
:linenos:
async def get_owned_games(self):
if not self.authenticated():
raise AuthenticationRequired()
games = self.retrieve_owned_games()
return games
"""
raise NotImplementedError()
async def get_unlocked_achievements(self, game_id: str) -> List[Achievement]:
"""
.. deprecated:: 0.33
Use :meth:`~.import_games_achievements`.
"""
raise NotImplementedError()
async def start_achievements_import(self, game_ids: List[str]) -> None:
"""Starts the task of importing achievements.
This method is called by the GOG Galaxy Client.
:param game_ids: ids of the games for which the achievements are imported
"""
if self._achievements_import_in_progress:
raise ImportInProgress()
async def import_games_achievements_task(game_ids):
try:
await self.import_games_achievements(game_ids)
finally:
self.achievements_import_finished()
self._achievements_import_in_progress = False
asyncio.create_task(import_games_achievements_task(game_ids))
self._achievements_import_in_progress = True
async def import_games_achievements(self, game_ids: List[str]) -> None:
"""
Override this method to return the unlocked achievements
of the user that is currently logged in to the plugin.
Call game_achievements_import_success/game_achievements_import_failure for each game_id on the list.
This method is called by the GOG Galaxy Client.
:param game_ids: ids of the games for which to import unlocked achievements
"""
async def import_game_achievements(game_id):
try:
achievements = await self.get_unlocked_achievements(game_id)
self.game_achievements_import_success(game_id, achievements)
except Exception as error:
self.game_achievements_import_failure(game_id, error)
imports = [import_game_achievements(game_id) for game_id in game_ids]
await asyncio.gather(*imports)
async def get_local_games(self) -> List[LocalGame]:
"""Override this method to return the list of
games present locally on the users pc.
This method is called by the GOG Galaxy Client.
Example of possible override of the method:
.. code-block:: python
:linenos:
async def get_local_games(self):
local_games = []
for game in self.games_present_on_user_pc:
local_game = LocalGame()
local_game.game_id = game.id
local_game.local_game_state = game.get_installation_status()
local_games.append(local_game)
return local_games
"""
raise NotImplementedError()
async def launch_game(self, game_id: str) -> None:
"""Override this method to launch the game
identified by the provided game_id.
This method is called by the GOG Galaxy Client.
:param str game_id: id of the game to launch
Example of possible override of the method:
.. code-block:: python
:linenos:
async def launch_game(self, game_id):
await self.open_uri(f"start client://launchgame/{game_id}")
"""
raise NotImplementedError()
async def install_game(self, game_id: str) -> None:
"""Override this method to install the game
identified by the provided game_id.
This method is called by the GOG Galaxy Client.
:param str game_id: id of the game to install
Example of possible override of the method:
.. code-block:: python
:linenos:
async def install_game(self, game_id):
await self.open_uri(f"start client://installgame/{game_id}")
"""
raise NotImplementedError()
async def uninstall_game(self, game_id: str) -> None:
"""Override this method to uninstall the game
identified by the provided game_id.
This method is called by the GOG Galaxy Client.
:param str game_id: id of the game to uninstall
Example of possible override of the method:
.. code-block:: python
:linenos:
async def uninstall_game(self, game_id):
await self.open_uri(f"start client://uninstallgame/{game_id}")
"""
raise NotImplementedError()
async def get_friends(self) -> List[FriendInfo]:
"""Override this method to return the friends list
of the currently authenticated user.
This method is called by the GOG Galaxy Client.
Example of possible override of the method:
.. code-block:: python
:linenos:
async def get_friends(self):
if not self._http_client.is_authenticated():
raise AuthenticationRequired()
friends = self.retrieve_friends()
return friends
"""
raise NotImplementedError()
async def get_users(self, user_id_list: List[str]) -> List[UserInfo]:
"""WIP, Override this method to return the list of users matching the provided ids.
This method is called by the GOG Galaxy Client.
:param user_id_list: list of user ids
"""
raise NotImplementedError()
async def send_message(self, room_id: str, message_text: str) -> None:
"""WIP, Override this method to send message to a chat room.
This method is called by the GOG Galaxy Client.
:param room_id: id of the room to which the message should be sent
:param message_text: text which should be sent in the message
"""
raise NotImplementedError()
async def mark_as_read(self, room_id: str, last_message_id: str) -> None:
"""WIP, Override this method to mark messages in a chat room as read up to the id provided in the parameter.
This method is called by the GOG Galaxy Client.
:param room_id: id of the room
:param last_message_id: id of the last message; room is marked as read only if this id matches
the last message id known to the client
"""
raise NotImplementedError()
async def get_rooms(self) -> List[Room]:
"""WIP, Override this method to return the chat rooms in which the user is currently in.
This method is called by the GOG Galaxy Client
"""
raise NotImplementedError()
async def get_room_history_from_message(self, room_id: str, message_id: str) -> List[Message]:
"""WIP, Override this method to return the chat room history since the message provided in parameter.
This method is called by the GOG Galaxy Client.
:param room_id: id of the room
:param message_id: id of the message since which the history should be retrieved
"""
raise NotImplementedError()
async def get_room_history_from_timestamp(self, room_id: str, from_timestamp: int) -> List[Message]:
"""WIP, Override this method to return the chat room history since the timestamp provided in parameter.
This method is called by the GOG Galaxy Client.
:param room_id: id of the room
:param from_timestamp: timestamp since which the history should be retrieved
"""
raise NotImplementedError()
async def get_game_times(self) -> List[GameTime]:
"""
.. deprecated:: 0.33
Use :meth:`~.import_game_times`.
"""
raise NotImplementedError()
async def start_game_times_import(self, game_ids: List[str]) -> None:
"""Starts the task of importing game times
This method is called by the GOG Galaxy Client.
:param game_ids: ids of the games for which the game time is imported
"""
if self._game_times_import_in_progress:
raise ImportInProgress()
async def import_game_times_task(game_ids):
try:
await self.import_game_times(game_ids)
finally:
self.game_times_import_finished()
self._game_times_import_in_progress = False
asyncio.create_task(import_game_times_task(game_ids))
self._game_times_import_in_progress = True
async def import_game_times(self, game_ids: List[str]) -> None:
"""
Override this method to return game times for
games owned by the currently authenticated user.
Call game_time_import_success/game_time_import_failure for each game_id on the list.
This method is called by GOG Galaxy Client.
:param game_ids: ids of the games for which the game time is imported
"""
try:
game_times = await self.get_game_times()
game_ids_set = set(game_ids)
for game_time in game_times:
if game_time.game_id not in game_ids_set:
continue
self.game_time_import_success(game_time)
game_ids_set.discard(game_time.game_id)
for game_id in game_ids_set:
self.game_time_import_failure(game_id, UnknownError())
except Exception as error:
for game_id in game_ids:
self.game_time_import_failure(game_id, error)
def create_and_run_plugin(plugin_class, argv):
"""Call this method as an entry point for the implemented integration.
:param plugin_class: your plugin class.
:param argv: command line arguments with which the script was started.
Example of possible use of the method:
.. code-block:: python
:linenos:
def main():
create_and_run_plugin(PlatformPlugin, sys.argv)
if __name__ == "__main__":
main()
"""
if len(argv) < 3:
logging.critical("Not enough parameters, required: token, port")
sys.exit(1)
token = argv[1]
try:
port = int(argv[2])
except ValueError:
logging.critical("Failed to parse port value: %s", argv[2])
sys.exit(2)
if not (1 <= port <= 65535):
logging.critical("Port value out of range (1, 65535)")
sys.exit(3)
if not issubclass(plugin_class, Plugin):
logging.critical("plugin_class must be subclass of Plugin")
sys.exit(4)
async def coroutine():
reader, writer = await asyncio.open_connection("127.0.0.1", port)
extra_info = writer.get_extra_info("sockname")
logging.info("Using local address: %s:%u", *extra_info)
plugin = plugin_class(reader, writer, token)
await plugin.run()
try:
asyncio.run(coroutine())
except Exception:
logging.exception("Error while running plugin")
sys.exit(5)

208
src/galaxy/api/types.py Normal file
View File

@@ -0,0 +1,208 @@
from dataclasses import dataclass
from typing import List, Dict, Optional
from galaxy.api.consts import LicenseType, LocalGameState, PresenceState
@dataclass
class Authentication():
"""Return this from :meth:`.authenticate` or :meth:`.pass_login_credentials`
to inform the client that authentication has successfully finished.
:param user_id: id of the authenticated user
:param user_name: username of the authenticated user
"""
user_id: str
user_name: str
@dataclass
class Cookie():
"""Cookie
:param name: name of the cookie
:param value: value of the cookie
:param domain: optional domain of the cookie
:param path: optional path of the cookie
"""
name: str
value: str
domain: Optional[str] = None
path: Optional[str] = None
@dataclass
class NextStep():
"""Return this from :meth:`.authenticate` or :meth:`.pass_login_credentials` to open client built-in browser with given url.
For example:
.. code-block:: python
:linenos:
PARAMS = {
"window_title": "Login to platform",
"window_width": 800,
"window_height": 600,
"start_uri": URL,
"end_uri_regex": r"^https://platform_website\.com/.*"
}
JS = {r"^https://platform_website\.com/.*": [
r'''
location.reload();
'''
]}
COOKIES = [Cookie("Cookie1", "ok", ".platform.com"),
Cookie("Cookie2", "ok", ".platform.com")
]
async def authenticate(self, stored_credentials=None):
if not stored_credentials:
return NextStep("web_session", PARAMS, cookies=COOKIES, js=JS)
:param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`, "window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`}
:param cookies: browser initial set of cookies
:param js: a map of the url regex patterns into the list of *js* scripts that should be executed on every document at given step of internal browser authentication.
"""
next_step: str
auth_params: Dict[str, str]
cookies: Optional[List[Cookie]] = None
js: Optional[Dict[str, List[str]]] = None
@dataclass
class LicenseInfo():
"""Information about the license of related product.
:param license_type: type of license
:param owner: optional owner of the related product, defaults to currently authenticated user
"""
license_type: LicenseType
owner: Optional[str] = None
@dataclass
class Dlc():
"""Downloadable content object.
:param dlc_id: id of the dlc
:param dlc_title: title of the dlc
:param license_info: information about the license attached to the dlc
"""
dlc_id: str
dlc_title: str
license_info: LicenseInfo
@dataclass
class Game():
"""Game object.
:param game_id: unique identifier of the game, this will be passed as parameter for methods such as launch_game
:param game_title: title of the game
:param dlcs: list of dlcs available for the game
:param license_info: information about the license attached to the game
"""
game_id: str
game_title: str
dlcs: Optional[List[Dlc]]
license_info: LicenseInfo
@dataclass
class Achievement():
"""Achievement, has to be initialized with either id or name.
:param unlock_time: unlock time of the achievement
:param achievement_id: optional id of the achievement
:param achievement_name: optional name of the achievement
"""
unlock_time: int
achievement_id: Optional[str] = None
achievement_name: Optional[str] = None
def __post_init__(self):
assert self.achievement_id or self.achievement_name, \
"One of achievement_id or achievement_name is required"
@dataclass
class LocalGame():
"""Game locally present on the authenticated user's computer.
:param game_id: id of the game
:param local_game_state: state of the game
"""
game_id: str
local_game_state: LocalGameState
@dataclass
class Presence():
"""Information about a presence of a user.
:param presence_state: the state in which the user's presence is
:param game_id: id of the game which the user is currently playing
:param presence_status: optional attached string with the detailed description of the user's presence
"""
presence_state: PresenceState
game_id: Optional[str] = None
presence_status: Optional[str] = None
@dataclass
class UserInfo():
"""Detailed information about a user.
:param user_id: of the user
:param is_friend: whether the user is a friend of the currently authenticated user
:param user_name: of the user
:param avatar_url: to the avatar of the user
:param presence: about the users presence
"""
user_id: str
is_friend: bool
user_name: str
avatar_url: str
presence: Presence
@dataclass
class FriendInfo():
"""Information about a friend of the currently authenticated user.
:param user_id: id of the user
:param user_name: username of the user
"""
user_id: str
user_name: str
@dataclass
class Room():
"""WIP, Chatroom.
:param room_id: id of the room
:param unread_message_count: number of unread messages in the room
:param last_message_id: id of the last message in the room
"""
room_id: str
unread_message_count: int
last_message_id: str
@dataclass
class Message():
"""WIP, A chatroom message.
:param message_id: id of the message
:param sender_id: id of the sender of the message
:param sent_time: time at which the message was sent
:param message_text: text attached to the message
"""
message_id: str
sender_id: str
sent_time: int
message_text: str
@dataclass
class GameTime():
"""Game time of a game, defines the total time spent in the game
and the last time the game was played.
:param game_id: id of the related game
:param time_played: the total time spent in the game in **minutes**
:param last_time_played: last time the game was played (**unix timestamp**)
"""
game_id: str
time_played: Optional[int]
last_played_time: Optional[int]

81
src/galaxy/http.py Normal file
View File

@@ -0,0 +1,81 @@
import asyncio
import ssl
from contextlib import contextmanager
from http import HTTPStatus
import aiohttp
import certifi
import logging
from galaxy.api.errors import (
AccessDenied, AuthenticationRequired, BackendTimeout, BackendNotAvailable, BackendError, NetworkError,
TooManyRequests, UnknownBackendResponse, UnknownError
)
DEFAULT_LIMIT = 20
DEFAULT_TIMEOUT = 60 # seconds
class HttpClient:
"""Deprecated"""
def __init__(self, limit=DEFAULT_LIMIT, timeout=aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT), cookie_jar=None):
connector = create_tcp_connector(limit=limit)
self._session = create_client_session(connector=connector, timeout=timeout, cookie_jar=cookie_jar)
async def close(self):
await self._session.close()
async def request(self, method, url, *args, **kwargs):
with handle_exception():
return await self._session.request(method, url, *args, **kwargs)
def create_tcp_connector(*args, **kwargs):
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.load_verify_locations(certifi.where())
kwargs.setdefault("ssl", ssl_context)
kwargs.setdefault("limit", DEFAULT_LIMIT)
return aiohttp.TCPConnector(*args, **kwargs)
def create_client_session(*args, **kwargs):
kwargs.setdefault("connector", create_tcp_connector())
kwargs.setdefault("timeout", aiohttp.ClientTimeout(total=DEFAULT_TIMEOUT))
kwargs.setdefault("raise_for_status", True)
return aiohttp.ClientSession(*args, **kwargs)
@contextmanager
def handle_exception():
try:
yield
except asyncio.TimeoutError:
raise BackendTimeout()
except aiohttp.ServerDisconnectedError:
raise BackendNotAvailable()
except aiohttp.ClientConnectionError:
raise NetworkError()
except aiohttp.ContentTypeError:
raise UnknownBackendResponse()
except aiohttp.ClientResponseError as error:
if error.status == HTTPStatus.UNAUTHORIZED:
raise AuthenticationRequired()
if error.status == HTTPStatus.FORBIDDEN:
raise AccessDenied()
if error.status == HTTPStatus.SERVICE_UNAVAILABLE:
raise BackendNotAvailable()
if error.status == HTTPStatus.TOO_MANY_REQUESTS:
raise TooManyRequests()
if error.status >= 500:
raise BackendError()
if error.status >= 400:
logging.warning(
"Got status %d while performing %s request for %s",
error.status, error.request_info.method, str(error.request_info.url)
)
raise UnknownError()
except aiohttp.ClientError:
logging.exception("Caught exception while performing request")
raise UnknownError()

28
src/galaxy/reader.py Normal file
View File

@@ -0,0 +1,28 @@
from asyncio import StreamReader
class StreamLineReader:
"""Handles StreamReader readline without buffer limit"""
def __init__(self, reader: StreamReader):
self._reader = reader
self._buffer = bytes()
self._processed_buffer_it = 0
async def readline(self):
while True:
# check if there is no unprocessed data in the buffer
if not self._buffer or self._processed_buffer_it != 0:
chunk = await self._reader.read(1024)
if not chunk:
return bytes() # EOF
self._buffer += chunk
it = self._buffer.find(b"\n", self._processed_buffer_it)
if it < 0:
self._processed_buffer_it = len(self._buffer)
continue
line = self._buffer[:it]
self._buffer = self._buffer[it+1:]
self._processed_buffer_it = 0
return line

View File

View File

@@ -0,0 +1,12 @@
from asyncio import coroutine
from unittest.mock import MagicMock
class AsyncMock(MagicMock):
async def __call__(self, *args, **kwargs):
return super(AsyncMock, self).__call__(*args, **kwargs)
def coroutine_mock():
coro = MagicMock(name="CoroutineResult")
corofunc = MagicMock(name="CoroutineFunction", side_effect=coroutine(coro))
corofunc.coro = coro
return corofunc

View File

@@ -6,12 +6,12 @@ import pytest
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
from galaxy.unittest.mock import AsyncMock
from galaxy.unittest.mock import AsyncMock, coroutine_mock
@pytest.fixture()
def reader():
stream = MagicMock(name="stream_reader")
stream.readline = AsyncMock()
stream.read = AsyncMock()
yield stream
@pytest.fixture()
@@ -22,8 +22,8 @@ def writer():
yield stream
@pytest.fixture()
def readline(reader):
yield reader.readline
def read(reader):
yield reader.read
@pytest.fixture()
def write(writer):
@@ -33,6 +33,7 @@ def write(writer):
def plugin(reader, writer):
"""Return plugin instance with all feature methods mocked"""
async_methods = (
"handshake_complete",
"authenticate",
"get_owned_games",
"get_unlocked_achievements",
@@ -57,7 +58,7 @@ def plugin(reader, writer):
with ExitStack() as stack:
for method in async_methods:
stack.enter_context(patch.object(Plugin, method, new_callable=AsyncMock))
stack.enter_context(patch.object(Plugin, method, new_callable=coroutine_mock))
for method in methods:
stack.enter_context(patch.object(Plugin, method))
yield Plugin(Platform.Generic, "0.1", reader, writer, "token")

View File

@@ -1,9 +1,12 @@
import asyncio
import json
from unittest.mock import call
import pytest
from pytest import raises
from galaxy.api.types import Achievement
from galaxy.api.errors import UnknownError
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError
def test_initialization_no_unlock_time():
with raises(Exception):
@@ -13,7 +16,7 @@ def test_initialization_no_id_nor_name():
with raises(AssertionError):
Achievement(unlock_time=1234567890)
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -22,8 +25,8 @@ def test_success(plugin, readline, write):
"game_id": "14"
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_unlocked_achievements.return_value = [
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_unlocked_achievements.coro.return_value = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395),
Achievement(achievement_id="lvl30", achievement_name="Got level 30", unlock_time=1548495633)
@@ -54,7 +57,7 @@ def test_success(plugin, readline, write):
}
}
def test_failure(plugin, readline, write):
def test_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -64,8 +67,8 @@ def test_failure(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_unlocked_achievements.side_effect = UnknownError()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_unlocked_achievements.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_unlocked_achievements.assert_called()
response = json.loads(write.call_args[0][0])
@@ -99,3 +102,92 @@ def test_unlock_achievement(plugin, write):
}
}
}
@pytest.mark.asyncio
async def test_game_achievements_import_success(plugin, write):
achievements = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395)
]
plugin.game_achievements_import_success("134", achievements)
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_achievements_import_success",
"params": {
"game_id": "134",
"unlocked_achievements": [
{
"achievement_id": "lvl10",
"unlock_time": 1548421241
},
{
"achievement_name": "Got level 20",
"unlock_time": 1548422395
}
]
}
}
@pytest.mark.asyncio
async def test_game_achievements_import_failure(plugin, write):
plugin.game_achievements_import_failure("134", ImportInProgress())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_achievements_import_failure",
"params": {
"game_id": "134",
"error": {
"code": 600,
"message": "Import already in progress"
}
}
}
@pytest.mark.asyncio
async def test_achievements_import_finished(plugin, write):
plugin.achievements_import_finished()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "achievements_import_finished",
"params": None
}
@pytest.mark.asyncio
async def test_start_achievements_import(plugin, write, mocker):
game_achievements_import_success = mocker.patch.object(plugin, "game_achievements_import_success")
game_achievements_import_failure = mocker.patch.object(plugin, "game_achievements_import_failure")
achievements_import_finished = mocker.patch.object(plugin, "achievements_import_finished")
game_ids = ["1", "5", "9"]
error = BackendError()
achievements = [
Achievement(achievement_id="lvl10", unlock_time=1548421241),
Achievement(achievement_name="Got level 20", unlock_time=1548422395)
]
plugin.get_unlocked_achievements.coro.side_effect = [
achievements,
[],
error
]
await plugin.start_achievements_import(game_ids)
with pytest.raises(ImportInProgress):
await plugin.start_achievements_import(["4", "8"])
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_unlocked_achievements.coro.assert_has_calls([call("1"), call("5"), call("9")])
game_achievements_import_success.assert_has_calls([
call("1", achievements),
call("5", [])
])
game_achievements_import_failure.assert_called_once_with("9", error)
achievements_import_finished.assert_called_once_with()

View File

@@ -6,19 +6,18 @@ import pytest
from galaxy.api.types import Authentication
from galaxy.api.errors import (
UnknownError, InvalidCredentials, NetworkError, LoggedInElsewhere, ProtocolError,
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied,
ParentalControlBlock, DeviceBlocked, RegionBlocked
BackendNotAvailable, BackendTimeout, BackendError, TemporaryBlocked, Banned, AccessDenied
)
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication"
}
readline.side_effect = [json.dumps(request), ""]
plugin.authenticate.return_value = Authentication("132", "Zenek")
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek")
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with()
response = json.loads(write.call_args[0][0])
@@ -44,19 +43,16 @@ def test_success(plugin, readline, write):
pytest.param(TemporaryBlocked, 104, "Temporary blocked", id="temporary_blocked"),
pytest.param(Banned, 105, "Banned", id="banned"),
pytest.param(AccessDenied, 106, "Access denied", id="access_denied"),
pytest.param(ParentalControlBlock, 107, "Parental control block", id="parental_control_clock"),
pytest.param(DeviceBlocked, 108, "Device blocked", id="device_blocked"),
pytest.param(RegionBlocked, 109, "Region blocked", id="region_blocked")
])
def test_failure(plugin, readline, write, error, code, message):
def test_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "init_authentication"
}
readline.side_effect = [json.dumps(request), ""]
plugin.authenticate.side_effect = error()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with()
response = json.loads(write.call_args[0][0])
@@ -70,7 +66,7 @@ def test_failure(plugin, readline, write, error, code, message):
}
}
def test_stored_credentials(plugin, readline, write):
def test_stored_credentials(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -81,8 +77,8 @@ def test_stored_credentials(plugin, readline, write):
}
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.authenticate.return_value = Authentication("132", "Zenek")
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.authenticate.coro.return_value = Authentication("132", "Zenek")
asyncio.run(plugin.run())
plugin.authenticate.assert_called_with(stored_credentials={"token": "ABC"})
write.assert_called()
@@ -104,7 +100,7 @@ def test_store_credentials(plugin, write):
"params": credentials
}
def test_lost_authentication(plugin, readline, write):
def test_lost_authentication(plugin, write):
async def couritine():
plugin.lost_authentication()

View File

@@ -9,7 +9,7 @@ from galaxy.api.errors import (
TooManyMessagesSent, IncoherentLastMessage, MessageNotFound
)
def test_send_message_success(plugin, readline, write):
def test_send_message_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -20,8 +20,8 @@ def test_send_message_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.send_message.return_value = None
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.send_message.coro.return_value = None
asyncio.run(plugin.run())
plugin.send_message.assert_called_with(room_id="14", message="Hello!")
response = json.loads(write.call_args[0][0])
@@ -40,7 +40,7 @@ def test_send_message_success(plugin, readline, write):
pytest.param(BackendError, 4, "Backend error", id="backend_error"),
pytest.param(TooManyMessagesSent, 300, "Too many messages sent", id="too_many_messages")
])
def test_send_message_failure(plugin, readline, write, error, code, message):
def test_send_message_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "6",
@@ -51,8 +51,8 @@ def test_send_message_failure(plugin, readline, write, error, code, message):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.send_message.side_effect = error()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.send_message.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.send_message.assert_called_with(room_id="15", message="Bye")
response = json.loads(write.call_args[0][0])
@@ -66,7 +66,7 @@ def test_send_message_failure(plugin, readline, write, error, code, message):
}
}
def test_mark_as_read_success(plugin, readline, write):
def test_mark_as_read_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "7",
@@ -77,8 +77,8 @@ def test_mark_as_read_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.mark_as_read.return_value = None
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.mark_as_read.coro.return_value = None
asyncio.run(plugin.run())
plugin.mark_as_read.assert_called_with(room_id="14", last_message_id="67")
response = json.loads(write.call_args[0][0])
@@ -102,7 +102,7 @@ def test_mark_as_read_success(plugin, readline, write):
id="incoherent_last_message"
)
])
def test_mark_as_read_failure(plugin, readline, write, error, code, message):
def test_mark_as_read_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "4",
@@ -113,8 +113,8 @@ def test_mark_as_read_failure(plugin, readline, write, error, code, message):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.mark_as_read.side_effect = error()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.mark_as_read.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.mark_as_read.assert_called_with(room_id="18", last_message_id="7")
response = json.loads(write.call_args[0][0])
@@ -128,15 +128,15 @@ def test_mark_as_read_failure(plugin, readline, write, error, code, message):
}
}
def test_get_rooms_success(plugin, readline, write):
def test_get_rooms_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "2",
"method": "import_rooms"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_rooms.return_value = [
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_rooms.coro.return_value = [
Room("13", 0, None),
Room("15", 34, "8")
]
@@ -162,15 +162,15 @@ def test_get_rooms_success(plugin, readline, write):
}
}
def test_get_rooms_failure(plugin, readline, write):
def test_get_rooms_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "9",
"method": "import_rooms"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_rooms.side_effect = UnknownError()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_rooms.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_rooms.assert_called_with()
response = json.loads(write.call_args[0][0])
@@ -184,7 +184,7 @@ def test_get_rooms_failure(plugin, readline, write):
}
}
def test_get_room_history_from_message_success(plugin, readline, write):
def test_get_room_history_from_message_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "2",
@@ -195,8 +195,8 @@ def test_get_room_history_from_message_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_room_history_from_message.return_value = [
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_room_history_from_message.coro.return_value = [
Message("13", "149", 1549454837, "Hello"),
Message("14", "812", 1549454899, "Hi")
]
@@ -233,7 +233,7 @@ def test_get_room_history_from_message_success(plugin, readline, write):
pytest.param(BackendError, 4, "Backend error", id="backend_error"),
pytest.param(MessageNotFound, 500, "Message not found", id="message_not_found")
])
def test_get_room_history_from_message_failure(plugin, readline, write, error, code, message):
def test_get_room_history_from_message_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "7",
@@ -244,8 +244,8 @@ def test_get_room_history_from_message_failure(plugin, readline, write, error, c
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_room_history_from_message.side_effect = error()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_room_history_from_message.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.get_room_history_from_message.assert_called_with(room_id="33", message_id="88")
response = json.loads(write.call_args[0][0])
@@ -259,7 +259,7 @@ def test_get_room_history_from_message_failure(plugin, readline, write, error, c
}
}
def test_get_room_history_from_timestamp_success(plugin, readline, write):
def test_get_room_history_from_timestamp_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "7",
@@ -270,8 +270,8 @@ def test_get_room_history_from_timestamp_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_room_history_from_timestamp.return_value = [
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_room_history_from_timestamp.coro.return_value = [
Message("12", "155", 1549454836, "Bye")
]
asyncio.run(plugin.run())
@@ -296,7 +296,7 @@ def test_get_room_history_from_timestamp_success(plugin, readline, write):
}
}
def test_get_room_history_from_timestamp_failure(plugin, readline, write):
def test_get_room_history_from_timestamp_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
@@ -307,8 +307,8 @@ def test_get_room_history_from_timestamp_failure(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_room_history_from_timestamp.side_effect = UnknownError()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_room_history_from_timestamp.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_room_history_from_timestamp.assert_called_with(
room_id="10",

View File

@@ -0,0 +1,54 @@
import asyncio
import json
def test_chunked_messages(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
"params": {
"game_id": "3"
}
}
message = json.dumps(request).encode() + b"\n"
read.side_effect = [message[:5], message[5:], b""]
asyncio.run(plugin.run())
plugin.install_game.assert_called_with(game_id="3")
def test_joined_messages(plugin, read):
requests = [
{
"jsonrpc": "2.0",
"method": "install_game",
"params": {
"game_id": "3"
}
},
{
"jsonrpc": "2.0",
"method": "launch_game",
"params": {
"game_id": "3"
}
}
]
data = b"".join([json.dumps(request).encode() + b"\n" for request in requests])
read.side_effect = [data, b""]
asyncio.run(plugin.run())
plugin.install_game.assert_called_with(game_id="3")
plugin.launch_game.assert_called_with(game_id="3")
def test_not_finished(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
"params": {
"game_id": "3"
}
}
message = json.dumps(request).encode() # no new line
read.side_effect = [message, b""]
asyncio.run(plugin.run())
plugin.install_game.assert_not_called()

90
tests/test_friends.py Normal file
View File

@@ -0,0 +1,90 @@
import asyncio
import json
from galaxy.api.types import FriendInfo
from galaxy.api.errors import UnknownError
def test_get_friends_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_friends.coro.return_value = [
FriendInfo("3", "Jan"),
FriendInfo("5", "Ola")
]
asyncio.run(plugin.run())
plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"friend_info_list": [
{"user_id": "3", "user_name": "Jan"},
{"user_id": "5", "user_name": "Ola"}
]
}
}
def test_get_friends_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_friends.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error",
}
}
def test_add_friend(plugin, write):
friend = FriendInfo("7", "Kuba")
async def couritine():
plugin.add_friend(friend)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_added",
"params": {
"friend_info": {"user_id": "7", "user_name": "Kuba"}
}
}
def test_remove_friend(plugin, write):
async def couritine():
plugin.remove_friend("5")
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_removed",
"params": {
"user_id": "5"
}
}

View File

@@ -1,20 +1,23 @@
import asyncio
import json
from unittest.mock import call
import pytest
from galaxy.api.types import GameTime
from galaxy.api.errors import UnknownError
from galaxy.api.errors import UnknownError, ImportInProgress, BackendError
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_game_times"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_game_times.return_value = [
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_game_times.coro.return_value = [
GameTime("3", 60, 1549550504),
GameTime("5", 10, 1549550502)
GameTime("5", 10, None),
GameTime("7", None, 1549550502),
]
asyncio.run(plugin.run())
plugin.get_game_times.assert_called_with()
@@ -33,21 +36,24 @@ def test_success(plugin, readline, write):
{
"game_id": "5",
"time_played": 10,
"last_played_time": 1549550502
},
{
"game_id": "7",
"last_played_time": 1549550502
}
]
}
}
def test_failure(plugin, readline, write):
def test_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_game_times"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_game_times.side_effect = UnknownError()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_game_times.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_game_times.assert_called_with()
response = json.loads(write.call_args[0][0])
@@ -81,3 +87,93 @@ def test_update_game(plugin, write):
}
}
}
@pytest.mark.asyncio
async def test_game_time_import_success(plugin, write):
plugin.game_time_import_success(GameTime("3", 60, 1549550504))
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_import_success",
"params": {
"game_time": {
"game_id": "3",
"time_played": 60,
"last_played_time": 1549550504
}
}
}
@pytest.mark.asyncio
async def test_game_time_import_failure(plugin, write):
plugin.game_time_import_failure("134", ImportInProgress())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_time_import_failure",
"params": {
"game_id": "134",
"error": {
"code": 600,
"message": "Import already in progress"
}
}
}
@pytest.mark.asyncio
async def test_game_times_import_finished(plugin, write):
plugin.game_times_import_finished()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "game_times_import_finished",
"params": None
}
@pytest.mark.asyncio
async def test_start_game_times_import(plugin, write, mocker):
game_time_import_success = mocker.patch.object(plugin, "game_time_import_success")
game_time_import_failure = mocker.patch.object(plugin, "game_time_import_failure")
game_times_import_finished = mocker.patch.object(plugin, "game_times_import_finished")
game_ids = ["1", "5"]
game_time = GameTime("1", 10, 1549550502)
plugin.get_game_times.coro.return_value = [
game_time
]
await plugin.start_game_times_import(game_ids)
with pytest.raises(ImportInProgress):
await plugin.start_game_times_import(["4", "8"])
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_game_times.coro.assert_called_once_with()
game_time_import_success.assert_called_once_with(game_time)
game_time_import_failure.assert_called_once_with("5", UnknownError())
game_times_import_finished.assert_called_once_with()
@pytest.mark.asyncio
async def test_start_game_times_import_failure(plugin, write, mocker):
game_time_import_failure = mocker.patch.object(plugin, "game_time_import_failure")
game_times_import_finished = mocker.patch.object(plugin, "game_times_import_finished")
game_ids = ["1", "5"]
error = BackendError()
plugin.get_game_times.coro.side_effect = error
await plugin.start_game_times_import(game_ids)
# wait until all tasks are finished
for _ in range(4):
await asyncio.sleep(0)
plugin.get_game_times.coro.assert_called_once_with()
assert game_time_import_failure.mock_calls == [call("1", error), call("5", error)]
game_times_import_finished.assert_called_once_with()

37
tests/test_http.py Normal file
View File

@@ -0,0 +1,37 @@
import asyncio
from http import HTTPStatus
import aiohttp
import pytest
from galaxy.api.errors import (
AccessDenied, AuthenticationRequired, BackendTimeout, BackendNotAvailable, BackendError, NetworkError,
TooManyRequests, UnknownBackendResponse, UnknownError
)
from galaxy.http import handle_exception
request_info = aiohttp.RequestInfo("http://o.pl", "GET", {})
@pytest.mark.parametrize(
"aiohttp_exception,expected_exception_type",
[
(asyncio.TimeoutError(), BackendTimeout),
(aiohttp.ServerDisconnectedError(), BackendNotAvailable),
(aiohttp.ClientConnectionError(), NetworkError),
(aiohttp.ContentTypeError(request_info, []), UnknownBackendResponse),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.UNAUTHORIZED), AuthenticationRequired),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.FORBIDDEN), AccessDenied),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.SERVICE_UNAVAILABLE), BackendNotAvailable),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.TOO_MANY_REQUESTS), TooManyRequests),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.INTERNAL_SERVER_ERROR), BackendError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.NOT_IMPLEMENTED), BackendError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.BAD_REQUEST), UnknownError),
(aiohttp.ClientResponseError(request_info, [], status=HTTPStatus.NOT_FOUND), UnknownError),
(aiohttp.ClientError(), UnknownError)
]
)
def test_handle_exception(aiohttp_exception, expected_exception_type):
with pytest.raises(expected_exception_type):
with handle_exception():
raise aiohttp_exception

View File

@@ -1,7 +1,7 @@
import asyncio
import json
def test_success(plugin, readline):
def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "install_game",
@@ -10,7 +10,7 @@ def test_success(plugin, readline):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.install_game.assert_called_with(game_id="3")

View File

@@ -4,7 +4,7 @@ import json
from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform
def test_get_capabilites(reader, writer, readline, write):
def test_get_capabilites(reader, writer, read, write):
class PluginImpl(Plugin): #pylint: disable=abstract-method
async def get_owned_games(self):
pass
@@ -16,7 +16,7 @@ def test_get_capabilites(reader, writer, readline, write):
}
token = "token"
plugin = PluginImpl(Platform.Generic, "0.1", reader, writer, token)
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
response = json.loads(write.call_args[0][0])
assert response == {
@@ -31,13 +31,13 @@ def test_get_capabilites(reader, writer, readline, write):
}
}
def test_shutdown(plugin, readline, write):
def test_shutdown(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "5",
"method": "shutdown"
}
readline.side_effect = [json.dumps(request)]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
plugin.shutdown.assert_called_with()
response = json.loads(write.call_args[0][0])
@@ -47,13 +47,13 @@ def test_shutdown(plugin, readline, write):
"result": None
}
def test_ping(plugin, readline, write):
def test_ping(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "7",
"method": "ping"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
response = json.loads(write.call_args[0][0])
assert response == {
@@ -62,7 +62,18 @@ def test_ping(plugin, readline, write):
"result": None
}
def test_tick(plugin, readline):
readline.side_effect = [""]
def test_tick_before_handshake(plugin, read):
read.side_effect = [b""]
asyncio.run(plugin.run())
plugin.tick.assert_not_called()
def test_tick_after_handshake(plugin, read):
request = {
"jsonrpc": "2.0",
"id": "6",
"method": "initialize_cache",
"params": {"data": {}}
}
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
asyncio.run(plugin.run())
plugin.tick.assert_called_with()

View File

@@ -1,7 +1,7 @@
import asyncio
import json
def test_success(plugin, readline):
def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "launch_game",
@@ -10,7 +10,7 @@ def test_success(plugin, readline):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.launch_game.assert_called_with(game_id="3")

View File

@@ -7,16 +7,16 @@ from galaxy.api.types import LocalGame
from galaxy.api.consts import LocalGameState
from galaxy.api.errors import UnknownError, FailedParsingManifest
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_local_games"
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_local_games.return_value = [
plugin.get_local_games.coro.return_value = [
LocalGame("1", LocalGameState.Running),
LocalGame("2", LocalGameState.Installed),
LocalGame("3", LocalGameState.Installed | LocalGameState.Running)
@@ -53,15 +53,15 @@ def test_success(plugin, readline, write):
pytest.param(FailedParsingManifest, 200, "Failed parsing manifest", id="failed_parsing")
],
)
def test_failure(plugin, readline, write, error, code, message):
def test_failure(plugin, read, write, error, code, message):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_local_games"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_local_games.side_effect = error()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_local_games.coro.side_effect = error()
asyncio.run(plugin.run())
plugin.get_local_games.assert_called_with()
response = json.loads(write.call_args[0][0])

View File

@@ -5,15 +5,15 @@ from galaxy.api.types import Game, Dlc, LicenseInfo
from galaxy.api.consts import LicenseType
from galaxy.api.errors import UnknownError
def test_success(plugin, readline, write):
def test_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_owned_games"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_owned_games.return_value = [
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.coro.return_value = [
Game("3", "Doom", None, LicenseInfo(LicenseType.SinglePurchase, None)),
Game(
"5",
@@ -67,15 +67,15 @@ def test_success(plugin, readline, write):
}
}
def test_failure(plugin, readline, write):
def test_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_owned_games"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_owned_games.side_effect = UnknownError()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_owned_games.assert_called_with()
response = json.loads(write.call_args[0][0])

View File

@@ -0,0 +1,71 @@
import asyncio
import json
import pytest
def assert_rpc_response(write, response_id, result=None):
assert json.loads(write.call_args[0][0]) == {
"jsonrpc": "2.0",
"id": str(response_id),
"result": result
}
def assert_rpc_request(write, method, params=None):
assert json.loads(write.call_args[0][0]) == {
"jsonrpc": "2.0",
"method": method,
"params": {"data": params}
}
@pytest.fixture
def cache_data():
return {
"persistent key": "persistent value",
"persistent object": {"answer to everything": 42}
}
def test_initialize_cache(plugin, read, write, cache_data):
request_id = 3
request = {
"jsonrpc": "2.0",
"id": str(request_id),
"method": "initialize_cache",
"params": {"data": cache_data}
}
read.side_effect = [json.dumps(request).encode() + b"\n"]
assert {} == plugin.persistent_cache
asyncio.run(plugin.run())
plugin.handshake_complete.assert_called_once_with()
assert cache_data == plugin.persistent_cache
assert_rpc_response(write, response_id=request_id)
def test_set_cache(plugin, write, cache_data):
async def runner():
assert {} == plugin.persistent_cache
plugin.persistent_cache.update(cache_data)
plugin.push_cache()
assert_rpc_request(write, "push_cache", cache_data)
assert cache_data == plugin.persistent_cache
asyncio.run(runner())
def test_clear_cache(plugin, write, cache_data):
async def runner():
plugin._persistent_cache = cache_data
plugin.persistent_cache.clear()
plugin.push_cache()
assert_rpc_request(write, "push_cache", {})
assert {} == plugin.persistent_cache
asyncio.run(runner())

View File

@@ -0,0 +1,52 @@
from unittest.mock import MagicMock
import pytest
from galaxy.reader import StreamLineReader
from galaxy.unittest.mock import AsyncMock
@pytest.fixture()
def stream_reader():
reader = MagicMock()
reader.read = AsyncMock()
return reader
@pytest.fixture()
def read(stream_reader):
return stream_reader.read
@pytest.fixture()
def reader(stream_reader):
return StreamLineReader(stream_reader)
@pytest.mark.asyncio
async def test_message(reader, read):
read.return_value = b"a\n"
assert await reader.readline() == b"a"
read.assert_called_once()
@pytest.mark.asyncio
async def test_separate_messages(reader, read):
read.side_effect = [b"a\n", b"b\n"]
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_connected_messages(reader, read):
read.return_value = b"a\nb\n"
assert await reader.readline() == b"a"
assert await reader.readline() == b"b"
read.assert_called_once()
@pytest.mark.asyncio
async def test_cut_message(reader, read):
read.side_effect = [b"a", b"b\n"]
assert await reader.readline() == b"ab"
assert read.call_count == 2
@pytest.mark.asyncio
async def test_half_message(reader, read):
read.side_effect = [b"a", b""]
assert await reader.readline() == b""
assert read.call_count == 2

View File

@@ -1,7 +1,7 @@
import asyncio
import json
def test_success(plugin, readline):
def test_success(plugin, read):
request = {
"jsonrpc": "2.0",
"method": "uninstall_game",
@@ -10,7 +10,7 @@ def test_success(plugin, readline):
}
}
readline.side_effect = [json.dumps(request), ""]
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_owned_games.return_value = None
asyncio.run(plugin.run())
plugin.uninstall_game.assert_called_with(game_id="3")

View File

@@ -5,155 +5,8 @@ from galaxy.api.types import UserInfo, Presence
from galaxy.api.errors import UnknownError
from galaxy.api.consts import PresenceState
def test_get_friends_success(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_friends.return_value = [
UserInfo(
"3",
True,
"Jan",
"http://avatar1.png",
Presence(
PresenceState.Online,
"123",
"Main menu"
)
),
UserInfo(
"5",
True,
"Ola",
"http://avatar2.png",
Presence(PresenceState.Offline)
)
]
asyncio.run(plugin.run())
plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"result": {
"user_info_list": [
{
"user_id": "3",
"is_friend": True,
"user_name": "Jan",
"avatar_url": "http://avatar1.png",
"presence": {
"presence_state": "online",
"game_id": "123",
"presence_status": "Main menu"
}
},
{
"user_id": "5",
"is_friend": True,
"user_name": "Ola",
"avatar_url": "http://avatar2.png",
"presence": {
"presence_state": "offline"
}
}
]
}
}
def test_get_friends_failure(plugin, readline, write):
request = {
"jsonrpc": "2.0",
"id": "3",
"method": "import_friends"
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_friends.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_friends.assert_called_with()
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"id": "3",
"error": {
"code": 0,
"message": "Unknown error",
}
}
def test_add_friend(plugin, write):
friend = UserInfo("7", True, "Kuba", "http://avatar.png", Presence(PresenceState.Offline))
async def couritine():
plugin.add_friend(friend)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_added",
"params": {
"user_info": {
"user_id": "7",
"is_friend": True,
"user_name": "Kuba",
"avatar_url": "http://avatar.png",
"presence": {
"presence_state": "offline"
}
}
}
}
def test_remove_friend(plugin, write):
async def couritine():
plugin.remove_friend("5")
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_removed",
"params": {
"user_id": "5"
}
}
def test_update_friend(plugin, write):
friend = UserInfo("9", True, "Anna", "http://avatar.png", Presence(PresenceState.Offline))
async def couritine():
plugin.update_friend(friend)
asyncio.run(couritine())
response = json.loads(write.call_args[0][0])
assert response == {
"jsonrpc": "2.0",
"method": "friend_updated",
"params": {
"user_info": {
"user_id": "9",
"is_friend": True,
"user_name": "Anna",
"avatar_url": "http://avatar.png",
"presence": {
"presence_state": "offline"
}
}
}
}
def test_get_users_success(plugin, readline, write):
def test_get_users_success(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "8",
@@ -163,8 +16,8 @@ def test_get_users_success(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_users.return_value = [
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_users.coro.return_value = [
UserInfo("5", False, "Ula", "http://avatar.png", Presence(PresenceState.Offline))
]
asyncio.run(plugin.run())
@@ -189,7 +42,8 @@ def test_get_users_success(plugin, readline, write):
}
}
def test_get_users_failure(plugin, readline, write):
def test_get_users_failure(plugin, read, write):
request = {
"jsonrpc": "2.0",
"id": "12",
@@ -199,8 +53,8 @@ def test_get_users_failure(plugin, readline, write):
}
}
readline.side_effect = [json.dumps(request), ""]
plugin.get_users.side_effect = UnknownError()
read.side_effect = [json.dumps(request).encode() + b"\n", b""]
plugin.get_users.coro.side_effect = UnknownError()
asyncio.run(plugin.run())
plugin.get_users.assert_called_with(user_id_list=["10", "11", "12"])
response = json.loads(write.call_args[0][0])