diff --git a/src/galaxy/api/jsonrpc.py b/src/galaxy/api/jsonrpc.py index 753a7a7..4af843e 100644 --- a/src/galaxy/api/jsonrpc.py +++ b/src/galaxy/api/jsonrpc.py @@ -8,6 +8,10 @@ import json from galaxy.reader import StreamLineReader from galaxy.task_manager import TaskManager + +logger = logging.getLogger(__name__) + + class JsonRpcError(Exception): def __init__(self, code, message, data=None): self.code = code @@ -133,7 +137,7 @@ class Connection(): future = loop.create_future() self._requests_futures[self._last_request_id] = (future, sensitive_params) - logging.info( + logger.info( "Sending request: id=%s, method=%s, params=%s", request_id, method, anonymise_sensitive_params(params, sensitive_params) ) @@ -151,7 +155,7 @@ class Connection(): if False - no params are considered sensitive, if True - all params are considered sensitive """ - logging.info( + logger.info( "Sending notification: method=%s, params=%s", method, anonymise_sensitive_params(params, sensitive_params) ) @@ -169,20 +173,20 @@ class Connection(): self._eof() continue data = data.strip() - logging.debug("Received %d bytes of data", len(data)) + logger.debug("Received %d bytes of data", len(data)) self._handle_input(data) await asyncio.sleep(0) # To not starve task queue def close(self): if self._active: - logging.info("Closing JSON-RPC server - not more messages will be read") + logger.info("Closing JSON-RPC server - not more messages will be read") self._active = False async def wait_closed(self): await self._task_manager.wait() def _eof(self): - logging.info("Received EOF") + logger.info("Received EOF") self.close() def _handle_input(self, data): @@ -204,7 +208,7 @@ class Connection(): request_future = self._requests_futures.get(int(response.id)) if request_future is None: response_type = "response" if response.result is not None else "error" - logging.warning("Received %s for unknown request: %s", response_type, response.id) + logger.warning("Received %s for unknown request: %s", response_type, response.id) return future, sensitive_params = request_future @@ -225,7 +229,7 @@ class Connection(): def _handle_notification(self, request): method = self._notifications.get(request.method) if not method: - logging.error("Received unknown notification: %s", request.method) + logger.error("Received unknown notification: %s", request.method) return callback, signature, immediate, sensitive_params = method @@ -242,12 +246,12 @@ class Connection(): try: self._task_manager.create_task(callback(*bound_args.args, **bound_args.kwargs), request.method) except Exception: - logging.exception("Unexpected exception raised in notification handler") + logger.exception("Unexpected exception raised in notification handler") def _handle_request(self, request): method = self._methods.get(request.method) if not method: - logging.error("Received unknown request: %s", request.method) + logger.error("Received unknown request: %s", request.method) self._send_error(request.id, MethodNotFound()) return @@ -274,7 +278,7 @@ class Connection(): except asyncio.CancelledError: self._send_error(request.id, Aborted()) except Exception as e: #pylint: disable=broad-except - logging.exception("Unexpected exception raised in plugin handler") + logger.exception("Unexpected exception raised in plugin handler") self._send_error(request.id, UnknownError(str(e))) self._task_manager.create_task(handle(), request.method) @@ -305,10 +309,10 @@ class Connection(): try: line = self._encoder.encode(data) data = (line + "\n").encode("utf-8") - logging.debug("Sending %d byte of data", len(data)) + logger.debug("Sending %d byte of data", len(data)) self._task_manager.create_task(send_task(data), "send") except TypeError as error: - logging.error(str(error)) + logger.error(str(error)) def _send_response(self, request_id, result): response = { @@ -348,18 +352,18 @@ class Connection(): def _log_request(request, sensitive_params): params = anonymise_sensitive_params(request.params, sensitive_params) if request.id is not None: - logging.info("Handling request: id=%s, method=%s, params=%s", request.id, request.method, params) + logger.info("Handling request: id=%s, method=%s, params=%s", request.id, request.method, params) else: - logging.info("Handling notification: method=%s, params=%s", request.method, params) + logger.info("Handling notification: method=%s, params=%s", request.method, params) @staticmethod def _log_response(response, sensitive_params): result = anonymise_sensitive_params(response.result, sensitive_params) - logging.info("Handling response: id=%s, result=%s", response.id, result) + logger.info("Handling response: id=%s, result=%s", response.id, result) @staticmethod def _log_error(response, error, sensitive_params): data = anonymise_sensitive_params(error.data, sensitive_params) - logging.info("Handling error: id=%s, code=%s, description=%s, data=%s", + logger.info("Handling error: id=%s, code=%s, description=%s, data=%s", response.id, error.code, error.message, data ) diff --git a/src/galaxy/api/plugin.py b/src/galaxy/api/plugin.py index 7eb47d7..1ccfa7f 100644 --- a/src/galaxy/api/plugin.py +++ b/src/galaxy/api/plugin.py @@ -2,7 +2,6 @@ import asyncio import dataclasses import json import logging -import logging.handlers import sys from enum import Enum from typing import Any, Dict, List, Optional, Set, Union @@ -16,6 +15,9 @@ from galaxy.api.types import ( from galaxy.task_manager import TaskManager +logger = logging.getLogger(__name__) + + class JSONEncoder(json.JSONEncoder): def default(self, o): # pylint: disable=method-hidden if dataclasses.is_dataclass(o): @@ -33,7 +35,7 @@ class Plugin: """Use and override methods of this class to create a new platform integration.""" def __init__(self, platform, version, reader, writer, handshake_token): - logging.info("Creating plugin for platform %s, version %s", platform.value, version) + logger.info("Creating plugin for platform %s, version %s", platform.value, version) self._platform = platform self._version = version @@ -189,24 +191,24 @@ class Plugin: async def run(self): """Plugin's main coroutine.""" await self._connection.run() - logging.debug("Plugin run loop finished") + logger.debug("Plugin run loop finished") def close(self) -> None: if not self._active: return - logging.info("Closing plugin") + logger.info("Closing plugin") self._connection.close() self._external_task_manager.cancel() self._internal_task_manager.create_task(self.shutdown(), "shutdown") self._active = False async def wait_closed(self) -> None: - logging.debug("Waiting for plugin to close") + logger.debug("Waiting for plugin to close") await self._external_task_manager.wait() await self._internal_task_manager.wait() await self._connection.wait_closed() - logging.debug("Plugin closed") + logger.debug("Plugin closed") def create_task(self, coro, description): """Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown""" @@ -217,11 +219,11 @@ class Plugin: try: self.tick() except Exception: - logging.exception("Unexpected exception raised in plugin tick") + logger.exception("Unexpected exception raised in plugin tick") await asyncio.sleep(1) async def _shutdown(self): - logging.info("Shutting down") + logger.info("Shutting down") self.close() await self._external_task_manager.wait() await self._internal_task_manager.wait() @@ -238,7 +240,7 @@ class Plugin: try: self.handshake_complete() except Exception: - logging.exception("Unhandled exception during `handshake_complete` step") + logger.exception("Unhandled exception during `handshake_complete` step") self._internal_task_manager.create_task(self._pass_control(), "tick") @staticmethod @@ -639,7 +641,7 @@ class Plugin: except ApplicationError as error: self._game_achievements_import_failure(game_id, error) except Exception: - logging.exception("Unexpected exception raised in import_game_achievements") + logger.exception("Unexpected exception raised in import_game_achievements") self._game_achievements_import_failure(game_id, UnknownError()) async def import_games_achievements(game_ids_, context_): @@ -803,7 +805,7 @@ class Plugin: except ApplicationError as error: self._game_time_import_failure(game_id, error) except Exception: - logging.exception("Unexpected exception raised in import_game_time") + logger.exception("Unexpected exception raised in import_game_time") self._game_time_import_failure(game_id, UnknownError()) async def import_game_times(game_ids_, context_): @@ -861,7 +863,7 @@ class Plugin: except ApplicationError as error: self._game_library_settings_import_failure(game_id, error) except Exception: - logging.exception("Unexpected exception raised in import_game_library_settings") + logger.exception("Unexpected exception raised in import_game_library_settings") self._game_library_settings_import_failure(game_id, UnknownError()) async def import_game_library_settings_set(game_ids_, context_): @@ -919,7 +921,7 @@ class Plugin: except ApplicationError as error: self._os_compatibility_import_failure(game_id, error) except Exception: - logging.exception("Unexpected exception raised in import_os_compatibility") + logger.exception("Unexpected exception raised in import_os_compatibility") self._os_compatibility_import_failure(game_id, UnknownError()) async def import_os_compatibility_set(game_ids_, context_): @@ -974,7 +976,7 @@ class Plugin: except ApplicationError as error: self._user_presence_import_failure(user_id, error) except Exception: - logging.exception("Unexpected exception raised in import_user_presence") + logger.exception("Unexpected exception raised in import_user_presence") self._user_presence_import_failure(user_id, UnknownError()) async def import_user_presence_set(user_ids_, context_) -> None: @@ -1037,7 +1039,7 @@ def create_and_run_plugin(plugin_class, argv): main() """ if len(argv) < 3: - logging.critical("Not enough parameters, required: token, port") + logger.critical("Not enough parameters, required: token, port") sys.exit(1) token = argv[1] @@ -1045,21 +1047,21 @@ def create_and_run_plugin(plugin_class, argv): try: port = int(argv[2]) except ValueError: - logging.critical("Failed to parse port value: %s", argv[2]) + logger.critical("Failed to parse port value: %s", argv[2]) sys.exit(2) if not (1 <= port <= 65535): - logging.critical("Port value out of range (1, 65535)") + logger.critical("Port value out of range (1, 65535)") sys.exit(3) if not issubclass(plugin_class, Plugin): - logging.critical("plugin_class must be subclass of Plugin") + logger.critical("plugin_class must be subclass of Plugin") sys.exit(4) async def coroutine(): reader, writer = await asyncio.open_connection("127.0.0.1", port) extra_info = writer.get_extra_info("sockname") - logging.info("Using local address: %s:%u", *extra_info) + logger.info("Using local address: %s:%u", *extra_info) async with plugin_class(reader, writer, token) as plugin: await plugin.run() @@ -1069,5 +1071,5 @@ def create_and_run_plugin(plugin_class, argv): asyncio.run(coroutine()) except Exception: - logging.exception("Error while running plugin") + logger.exception("Error while running plugin") sys.exit(5) diff --git a/src/galaxy/http.py b/src/galaxy/http.py index 0c5f8f6..b68c7cd 100644 --- a/src/galaxy/http.py +++ b/src/galaxy/http.py @@ -44,6 +44,8 @@ from galaxy.api.errors import ( ) +logger = logging.getLogger(__name__) + #: Default limit of the simultaneous connections for ssl connector. DEFAULT_LIMIT = 20 #: Default timeout in seconds used for client session. @@ -136,11 +138,11 @@ def handle_exception(): if error.status >= 500: raise BackendError() if error.status >= 400: - logging.warning( + logger.warning( "Got status %d while performing %s request for %s", error.status, error.request_info.method, str(error.request_info.url) ) raise UnknownError() except aiohttp.ClientError: - logging.exception("Caught exception while performing request") + logger.exception("Caught exception while performing request") raise UnknownError() diff --git a/src/galaxy/proc_tools.py b/src/galaxy/proc_tools.py index b0de0bc..4c2df33 100644 --- a/src/galaxy/proc_tools.py +++ b/src/galaxy/proc_tools.py @@ -3,7 +3,6 @@ from dataclasses import dataclass from typing import Iterable, NewType, Optional, List, cast - ProcessId = NewType("ProcessId", int) diff --git a/src/galaxy/task_manager.py b/src/galaxy/task_manager.py index 1f6d457..e7bb517 100644 --- a/src/galaxy/task_manager.py +++ b/src/galaxy/task_manager.py @@ -3,6 +3,10 @@ import logging from collections import OrderedDict from itertools import count + +logger = logging.getLogger(__name__) + + class TaskManager: def __init__(self, name): self._name = name @@ -15,23 +19,23 @@ class TaskManager: async def task_wrapper(task_id): try: result = await coro - logging.debug("Task manager %s: finished task %d (%s)", self._name, task_id, description) + logger.debug("Task manager %s: finished task %d (%s)", self._name, task_id, description) return result except asyncio.CancelledError: if handle_exceptions: - logging.debug("Task manager %s: canceled task %d (%s)", self._name, task_id, description) + logger.debug("Task manager %s: canceled task %d (%s)", self._name, task_id, description) else: raise except Exception: if handle_exceptions: - logging.exception("Task manager %s: exception raised in task %d (%s)", self._name, task_id, description) + logger.exception("Task manager %s: exception raised in task %d (%s)", self._name, task_id, description) else: raise finally: del self._tasks[task_id] task_id = next(self._task_counter) - logging.debug("Task manager %s: creating task %d (%s)", self._name, task_id, description) + logger.debug("Task manager %s: creating task %d (%s)", self._name, task_id, description) task = asyncio.create_task(task_wrapper(task_id)) self._tasks[task_id] = task return task