mirror of
https://github.com/gogcom/galaxy-integrations-python-api.git
synced 2026-01-02 03:48:16 -05:00
48 lines
1.7 KiB
Python
48 lines
1.7 KiB
Python
import asyncio
|
|
import ssl
|
|
from http import HTTPStatus
|
|
|
|
import aiohttp
|
|
import certifi
|
|
|
|
from galaxy.api.errors import (
|
|
AccessDenied, AuthenticationRequired,
|
|
BackendTimeout, BackendNotAvailable, BackendError, NetworkError, UnknownBackendResponse, UnknownError
|
|
)
|
|
|
|
class HttpClient:
|
|
def __init__(self, limit=20, timeout=aiohttp.ClientTimeout(total=60), cookie_jar=None):
|
|
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ssl_context.load_verify_locations(certifi.where())
|
|
connector = aiohttp.TCPConnector(limit=limit, ssl=ssl_context)
|
|
self._session = aiohttp.ClientSession(connector=connector, timeout=timeout, cookie_jar=cookie_jar)
|
|
|
|
async def close(self):
|
|
await self._session.close()
|
|
|
|
async def request(self, method, *args, **kwargs):
|
|
try:
|
|
response = await self._session.request(method, *args, **kwargs)
|
|
except asyncio.TimeoutError:
|
|
raise BackendTimeout()
|
|
except aiohttp.ServerDisconnectedError:
|
|
raise BackendNotAvailable()
|
|
except aiohttp.ClientConnectionError:
|
|
raise NetworkError()
|
|
except aiohttp.ContentTypeError:
|
|
raise UnknownBackendResponse()
|
|
except aiohttp.ClientError:
|
|
raise UnknownError()
|
|
if response.status == HTTPStatus.UNAUTHORIZED:
|
|
raise AuthenticationRequired()
|
|
if response.status == HTTPStatus.FORBIDDEN:
|
|
raise AccessDenied()
|
|
if response.status == HTTPStatus.SERVICE_UNAVAILABLE:
|
|
raise BackendNotAvailable()
|
|
if response.status >= 500:
|
|
raise BackendError()
|
|
if response.status >= 400:
|
|
raise UnknownError()
|
|
|
|
return response
|