Compare commits

...

3 Commits
0.43 ... 0.44

Author SHA1 Message Date
Aleksej Pawlowskij
49ae2beab9 SDK-2983: Split entry methods from feature detection 2019-07-29 12:59:45 +02:00
Aleksej Pawlowskij
c9e190772c Increment version 2019-07-26 16:28:47 +02:00
Aleksej Pawlowskij
789415d31b SDK-2974: add process info tools 2019-07-26 13:41:43 +02:00
8 changed files with 190 additions and 82 deletions

View File

@@ -5,4 +5,5 @@ pytest-mock==1.10.3
pytest-flakes==4.0.0 pytest-flakes==4.0.0
# because of pip bug https://github.com/pypa/pip/issues/4780 # because of pip bug https://github.com/pypa/pip/issues/4780
aiohttp==3.5.4 aiohttp==3.5.4
certifi==2019.3.9 certifi==2019.3.9
psutil==5.6.3; sys_platform == 'darwin'

View File

@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name="galaxy.plugin.api", name="galaxy.plugin.api",
version="0.43", version="0.44",
description="GOG Galaxy Integrations Python API", description="GOG Galaxy Integrations Python API",
author='Galaxy team', author='Galaxy team',
author_email='galaxy@gog.com', author_email='galaxy@gog.com',

View File

@@ -1,21 +1,18 @@
import asyncio import asyncio
import dataclasses
import json import json
import logging import logging
import logging.handlers import logging.handlers
import dataclasses
from enum import Enum
from collections import OrderedDict
from itertools import count
import sys import sys
from collections import OrderedDict
from enum import Enum
from itertools import count
from typing import Any, Dict, List, Optional, Set, Union
from typing import Any, List, Dict, Optional, Union
from galaxy.api.types import Achievement, Game, LocalGame, FriendInfo, GameTime
from galaxy.api.jsonrpc import Server, NotificationClient, ApplicationError
from galaxy.api.consts import Feature from galaxy.api.consts import Feature
from galaxy.api.errors import UnknownError, ImportInProgress from galaxy.api.errors import ImportInProgress, UnknownError
from galaxy.api.types import Authentication, NextStep from galaxy.api.jsonrpc import ApplicationError, NotificationClient, Server
from galaxy.api.types import Achievement, Authentication, FriendInfo, Game, GameTime, LocalGame, NextStep
class JSONEncoder(json.JSONEncoder): class JSONEncoder(json.JSONEncoder):
@@ -24,6 +21,7 @@ class JSONEncoder(json.JSONEncoder):
# filter None values # filter None values
def dict_factory(elements): def dict_factory(elements):
return {k: v for k, v in elements if v is not None} return {k: v for k, v in elements if v is not None}
return dataclasses.asdict(o, dict_factory=dict_factory) return dataclasses.asdict(o, dict_factory=dict_factory)
if isinstance(o, Enum): if isinstance(o, Enum):
return o.value return o.value
@@ -32,12 +30,13 @@ class JSONEncoder(json.JSONEncoder):
class Plugin: class Plugin:
"""Use and override methods of this class to create a new platform integration.""" """Use and override methods of this class to create a new platform integration."""
def __init__(self, platform, version, reader, writer, handshake_token): def __init__(self, platform, version, reader, writer, handshake_token):
logging.info("Creating plugin for platform %s, version %s", platform.value, version) logging.info("Creating plugin for platform %s, version %s", platform.value, version)
self._platform = platform self._platform = platform
self._version = version self._version = version
self._feature_methods = OrderedDict() self._features: Set[Feature] = set()
self._active = True self._active = True
self._pass_control_task = None self._pass_control_task = None
@@ -50,6 +49,7 @@ class Plugin:
def eof_handler(): def eof_handler():
self._shutdown() self._shutdown()
self._server.register_eof(eof_handler) self._server.register_eof(eof_handler)
self._achievements_import_in_progress = False self._achievements_import_in_progress = False
@@ -85,63 +85,47 @@ class Plugin:
self._register_method( self._register_method(
"import_owned_games", "import_owned_games",
self.get_owned_games, self.get_owned_games,
result_name="owned_games", result_name="owned_games"
feature=Feature.ImportOwnedGames
) )
self._detect_feature(Feature.ImportOwnedGames, ["get_owned_games"])
self._register_method( self._register_method(
"import_unlocked_achievements", "import_unlocked_achievements",
self.get_unlocked_achievements, self.get_unlocked_achievements,
result_name="unlocked_achievements", result_name="unlocked_achievements"
feature=Feature.ImportAchievements
)
self._register_method(
"start_achievements_import",
self.start_achievements_import,
)
self._register_method(
"import_local_games",
self.get_local_games,
result_name="local_games",
feature=Feature.ImportInstalledGames
)
self._register_notification("launch_game", self.launch_game, feature=Feature.LaunchGame)
self._register_notification("install_game", self.install_game, feature=Feature.InstallGame)
self._register_notification(
"uninstall_game",
self.uninstall_game,
feature=Feature.UninstallGame
)
self._register_notification(
"shutdown_platform_client",
self.shutdown_platform_client,
feature=Feature.ShutdownPlatformClient
)
self._register_method(
"import_friends",
self.get_friends,
result_name="friend_info_list",
feature=Feature.ImportFriends
)
self._register_method(
"import_game_times",
self.get_game_times,
result_name="game_times",
feature=Feature.ImportGameTime
)
self._register_method(
"start_game_times_import",
self.start_game_times_import,
) )
self._detect_feature(Feature.ImportAchievements, ["get_unlocked_achievements"])
self._register_method("start_achievements_import", self.start_achievements_import)
self._detect_feature(Feature.ImportAchievements, ["import_games_achievements"])
self._register_method("import_local_games", self.get_local_games, result_name="local_games")
self._detect_feature(Feature.ImportInstalledGames, ["get_local_games"])
self._register_notification("launch_game", self.launch_game)
self._detect_feature(Feature.LaunchGame, ["launch_game"])
self._register_notification("install_game", self.install_game)
self._detect_feature(Feature.InstallGame, ["install_game"])
self._register_notification("uninstall_game", self.uninstall_game)
self._detect_feature(Feature.UninstallGame, ["uninstall_game"])
self._register_notification("shutdown_platform_client", self.shutdown_platform_client)
self._detect_feature(Feature.ShutdownPlatformClient, ["shutdown_platform_client"])
self._register_method("import_friends", self.get_friends, result_name="friend_info_list")
self._detect_feature(Feature.ImportFriends, ["get_friends"])
self._register_method("import_game_times", self.get_game_times, result_name="game_times")
self._detect_feature(Feature.ImportGameTime, ["get_game_times"])
self._register_method("start_game_times_import", self.start_game_times_import)
self._detect_feature(Feature.ImportGameTime, ["import_game_times"])
@property @property
def features(self): def features(self) -> List[Feature]:
features = [] return list(self._features)
if self.__class__ != Plugin:
for feature, handlers in self._feature_methods.items():
if self._implements(handlers):
features.append(feature)
return features
@property @property
def persistent_cache(self) -> Dict: def persistent_cache(self) -> Dict:
@@ -149,13 +133,17 @@ class Plugin:
""" """
return self._persistent_cache return self._persistent_cache
def _implements(self, handlers): def _implements(self, methods: List[str]) -> bool:
for handler in handlers: for method in methods:
if handler.__name__ not in self.__class__.__dict__: if method not in self.__class__.__dict__:
return False return False
return True return True
def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False, feature=None): def _detect_feature(self, feature: Feature, methods: List[str]):
if self._implements(methods):
self._features.add(feature)
def _register_method(self, name, handler, result_name=None, internal=False, sensitive_params=False):
if internal: if internal:
def method(*args, **kwargs): def method(*args, **kwargs):
result = handler(*args, **kwargs) result = handler(*args, **kwargs)
@@ -164,6 +152,7 @@ class Plugin:
result_name: result result_name: result
} }
return result return result
self._server.register_method(name, method, True, sensitive_params) self._server.register_method(name, method, True, sensitive_params)
else: else:
async def method(*args, **kwargs): async def method(*args, **kwargs):
@@ -173,17 +162,12 @@ class Plugin:
result_name: result result_name: result
} }
return result return result
self._server.register_method(name, method, False, sensitive_params) self._server.register_method(name, method, False, sensitive_params)
if feature is not None: def _register_notification(self, name, handler, internal=False, sensitive_params=False):
self._feature_methods.setdefault(feature, []).append(handler)
def _register_notification(self, name, handler, internal=False, sensitive_params=False, feature=None):
self._server.register_notification(name, handler, internal, sensitive_params) self._server.register_notification(name, handler, internal, sensitive_params)
if feature is not None:
self._feature_methods.setdefault(feature, []).append(handler)
async def run(self): async def run(self):
"""Plugin's main coroutine.""" """Plugin's main coroutine."""
await self._server.run() await self._server.run()
@@ -192,6 +176,7 @@ class Plugin:
def create_task(self, coro, description): def create_task(self, coro, description):
"""Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown""" """Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown"""
async def task_wrapper(task_id): async def task_wrapper(task_id):
try: try:
return await coro return await coro
@@ -524,7 +509,7 @@ class Plugin:
raise NotImplementedError() raise NotImplementedError()
async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \ async def pass_login_credentials(self, step: str, credentials: Dict[str, str], cookies: List[Dict[str, str]]) \
-> Union[NextStep, Authentication]: -> Union[NextStep, Authentication]:
"""This method is called if we return galaxy.api.types.NextStep from authenticate or from pass_login_credentials. """This method is called if we return galaxy.api.types.NextStep from authenticate or from pass_login_credentials.
This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on. This method's parameters provide the data extracted from the web page navigation that previous NextStep finished on.
This method should either return galaxy.api.types.Authentication if the authentication is finished This method should either return galaxy.api.types.Authentication if the authentication is finished
@@ -607,6 +592,7 @@ class Plugin:
:param game_ids: ids of the games for which to import unlocked achievements :param game_ids: ids of the games for which to import unlocked achievements
""" """
async def import_game_achievements(game_id): async def import_game_achievements(game_id):
try: try:
achievements = await self.get_unlocked_achievements(game_id) achievements = await self.get_unlocked_achievements(game_id)

View File

@@ -61,7 +61,6 @@ class NextStep():
:param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`, "window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`} :param auth_params: configuration options: {"window_title": :class:`str`, "window_width": :class:`str`, "window_height": :class:`int`, "start_uri": :class:`int`, "end_uri_regex": :class:`str`}
:param cookies: browser initial set of cookies :param cookies: browser initial set of cookies
:param js: a map of the url regex patterns into the list of *js* scripts that should be executed on every document at given step of internal browser authentication. :param js: a map of the url regex patterns into the list of *js* scripts that should be executed on every document at given step of internal browser authentication.
""" """
next_step: str next_step: str
auth_params: Dict[str, str] auth_params: Dict[str, str]

91
src/galaxy/proc_tools.py Normal file
View File

@@ -0,0 +1,91 @@
import platform
from dataclasses import dataclass
from typing import Iterable, NewType, Optional, Set
def is_windows():
return platform.system() == "Windows"
ProcessId = NewType("ProcessId", int)
@dataclass
class ProcessInfo:
pid: ProcessId
binary_path: Optional[str]
if is_windows():
from ctypes import byref, sizeof, windll, create_unicode_buffer, FormatError, WinError
from ctypes.wintypes import DWORD
def pids() -> Iterable[ProcessId]:
_PROC_ID_T = DWORD
list_size = 4096
def try_get_pids(list_size: int) -> Set[ProcessId]:
result_size = DWORD()
proc_id_list = (_PROC_ID_T * list_size)()
if not windll.psapi.EnumProcesses(byref(proc_id_list), sizeof(proc_id_list), byref(result_size)):
raise WinError(descr="Failed to get process ID list: %s" % FormatError())
return proc_id_list[:int(result_size.value / sizeof(_PROC_ID_T()))]
while True:
proc_ids = try_get_pids(list_size)
if len(proc_ids) < list_size:
return proc_ids
list_size *= 2
def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]:
_PROC_QUERY_LIMITED_INFORMATION = 0x1000
process_info = ProcessInfo(pid=pid, binary_path=None)
h_process = windll.kernel32.OpenProcess(_PROC_QUERY_LIMITED_INFORMATION, False, pid)
if not h_process:
return process_info
try:
def get_exe_path() -> Optional[str]:
_MAX_PATH = 260
_WIN32_PATH_FORMAT = 0x0000
exe_path_buffer = create_unicode_buffer(_MAX_PATH)
exe_path_len = DWORD(len(exe_path_buffer))
return exe_path_buffer[:exe_path_len.value] if windll.kernel32.QueryFullProcessImageNameW(
h_process, _WIN32_PATH_FORMAT, exe_path_buffer, byref(exe_path_len)
) else None
process_info.binary_path = get_exe_path()
finally:
windll.kernel32.CloseHandle(h_process)
return process_info
else:
import psutil
def pids() -> Iterable[ProcessId]:
for pid in psutil.pids():
yield pid
def get_process_info(pid: ProcessId) -> Optional[ProcessInfo]:
process_info = ProcessInfo(pid=pid, binary_path=None)
try:
process_info.binary_path = psutil.Process(pid=pid).as_dict(attrs=["exe"])["exe"]
except psutil.NoSuchProcess:
pass
finally:
return process_info
def process_iter() -> Iterable[ProcessInfo]:
for pid in pids():
yield get_process_info(pid)

View File

@@ -3,6 +3,7 @@ import os
import zipfile import zipfile
from glob import glob from glob import glob
def zip_folder(folder): def zip_folder(folder):
files = glob(os.path.join(folder, "**"), recursive=True) files = glob(os.path.join(folder, "**"), recursive=True)
files = [file.replace(folder + os.sep, "") for file in files] files = [file.replace(folder + os.sep, "") for file in files]
@@ -14,6 +15,7 @@ def zip_folder(folder):
zipf.write(os.path.join(folder, file), arcname=file) zipf.write(os.path.join(folder, file), arcname=file)
return zip_buffer return zip_buffer
def zip_folder_to_file(folder, filename): def zip_folder_to_file(folder, filename):
zip_content = zip_folder(folder).getbuffer() zip_content = zip_folder(folder).getbuffer()
with open(filename, "wb") as archive: with open(filename, "wb") as archive:

View File

@@ -58,6 +58,7 @@ def plugin(reader, writer):
stack.enter_context(patch.object(Plugin, method)) stack.enter_context(patch.object(Plugin, method))
yield Plugin(Platform.Generic, "0.1", reader, writer, "token") yield Plugin(Platform.Generic, "0.1", reader, writer, "token")
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def my_caplog(caplog): def my_caplog(caplog):
caplog.set_level(logging.DEBUG) caplog.set_level(logging.DEBUG)

View File

@@ -1,21 +1,49 @@
from galaxy.api.consts import Feature, Platform
from galaxy.api.plugin import Plugin from galaxy.api.plugin import Plugin
from galaxy.api.consts import Platform, Feature
def test_base_class(): def test_base_class():
plugin = Plugin(Platform.Generic, "0.1", None, None, None) plugin = Plugin(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [] assert set(plugin.features) == {
Feature.ImportInstalledGames,
Feature.ImportOwnedGames,
Feature.LaunchGame,
Feature.InstallGame,
Feature.UninstallGame,
Feature.ImportAchievements,
Feature.ImportGameTime,
Feature.ImportFriends,
Feature.ShutdownPlatformClient
}
def test_no_overloads(): def test_no_overloads():
class PluginImpl(Plugin): #pylint: disable=abstract-method class PluginImpl(Plugin): # pylint: disable=abstract-method
pass pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None) plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [] assert plugin.features == []
def test_one_method_feature(): def test_one_method_feature():
class PluginImpl(Plugin): #pylint: disable=abstract-method class PluginImpl(Plugin): # pylint: disable=abstract-method
async def get_owned_games(self): async def get_owned_games(self):
pass pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None) plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert plugin.features == [Feature.ImportOwnedGames] assert plugin.features == [Feature.ImportOwnedGames]
def test_multi_features():
class PluginImpl(Plugin): # pylint: disable=abstract-method
async def get_owned_games(self):
pass
async def import_games_achievements(self, game_ids) -> None:
pass
async def start_game_times_import(self, game_ids) -> None:
pass
plugin = PluginImpl(Platform.Generic, "0.1", None, None, None)
assert set(plugin.features) == {Feature.ImportAchievements, Feature.ImportOwnedGames}