From bfb63a42bd37bdfb7e9227b745dc3c6616d62588 Mon Sep 17 00:00:00 2001 From: Romuald Juchnowicz-Bierbasz Date: Tue, 23 Jul 2019 17:14:09 +0200 Subject: [PATCH] SDK-2951: Add create_task method --- src/galaxy/api/plugin.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/galaxy/api/plugin.py b/src/galaxy/api/plugin.py index 96bdd7c..31e4da2 100644 --- a/src/galaxy/api/plugin.py +++ b/src/galaxy/api/plugin.py @@ -5,6 +5,7 @@ import logging.handlers import dataclasses from enum import Enum from collections import OrderedDict +from itertools import count import sys from typing import Any, List, Dict, Optional, Union @@ -56,6 +57,9 @@ class Plugin: self._persistent_cache = dict() + self._tasks = OrderedDict() + self._task_counter = count() + # internal self._register_method("shutdown", self._shutdown, internal=True) self._register_method("get_capabilities", self._get_capabilities, internal=True) @@ -220,6 +224,24 @@ class Plugin: if self._pass_control_task is not None: await self._pass_control_task + def create_task(self, coro, description): + """Wrapper around asyncio.create_task - takes care of canceling tasks on shutdown""" + async def task_wrapper(task_id): + try: + return await coro + except asyncio.CancelledError: + logging.debug("Canceled task %d (%s)", task_id, description) + except Exception: + logging.exception("Exception raised in task %d (%s)", task_id, description) + finally: + del self._tasks[task_id] + + task_id = next(self._task_counter) + logging.debug("Creating task %d (%s)", task_id, description) + task = asyncio.create_task(task_wrapper(task_id)) + self._tasks[task_id] = task + return task + async def _pass_control(self): while self._active: try: @@ -233,6 +255,8 @@ class Plugin: self._server.stop() self._active = False self.shutdown() + for task in self._tasks.values(): + task.cancel() def _get_capabilities(self): return {