mirror of
https://github.com/morpheus65535/bazarr.git
synced 2026-04-18 13:19:12 -04:00
952 lines
33 KiB
Python
952 lines
33 KiB
Python
# BSD 2-Clause License
|
|
#
|
|
# Apprise - Push Notification Library.
|
|
# Copyright (c) 2026, Chris Caron <lead2gold@gmail.com>
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions and the following disclaimer.
|
|
#
|
|
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions and the following disclaimer in the documentation
|
|
# and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
# Simple API Reference:
|
|
# - https://www.notificationapi.com/docs/reference/server#send
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
from email.utils import formataddr
|
|
from itertools import chain
|
|
from json import dumps, loads
|
|
import re
|
|
|
|
import requests
|
|
|
|
from ..common import NotifyFormat, NotifyImageSize, NotifyType
|
|
from ..conversion import convert_between
|
|
from ..locale import gettext_lazy as _
|
|
from ..utils.parse import (
|
|
is_email,
|
|
is_phone_no,
|
|
parse_emails,
|
|
parse_list,
|
|
validate_regex,
|
|
)
|
|
from .base import NotifyBase
|
|
|
|
# Used to detect ID
|
|
IS_VALID_ID_RE = re.compile(
|
|
r"^\s*(@|%40)?(?P<id>[\w_-]+)\s*$", re.I)
|
|
|
|
|
|
class NotificationAPIRegion:
|
|
"""Regions."""
|
|
|
|
CA = "ca"
|
|
US = "us"
|
|
EU = "eu"
|
|
|
|
|
|
# NotificationAPI endpoints
|
|
NOTIFICATIONAPI_API_LOOKUP = {
|
|
NotificationAPIRegion.US: "https://api.notificationapi.com",
|
|
NotificationAPIRegion.CA: "https://api.ca.notificationapi.com",
|
|
NotificationAPIRegion.EU: "https://api.eu.notificationapi.com",
|
|
}
|
|
|
|
# A List of our regions we can use for verification
|
|
NOTIFICATIONAPI_REGIONS = (
|
|
NotificationAPIRegion.US,
|
|
NotificationAPIRegion.CA,
|
|
NotificationAPIRegion.EU,
|
|
)
|
|
|
|
|
|
class NotificationAPIChannel:
|
|
"""Channels"""
|
|
|
|
EMAIL = "email"
|
|
SMS = "sms"
|
|
INAPP = "inapp"
|
|
WEB_PUSH = "web_push"
|
|
MOBILE_PUSH = "mobile_push"
|
|
SLACK = "slack"
|
|
|
|
|
|
# A List of our channels we can use for verification
|
|
NOTIFICATIONAPI_CHANNELS: frozenset[str] = frozenset([
|
|
NotificationAPIChannel.EMAIL,
|
|
NotificationAPIChannel.SMS,
|
|
NotificationAPIChannel.INAPP,
|
|
NotificationAPIChannel.WEB_PUSH,
|
|
NotificationAPIChannel.MOBILE_PUSH,
|
|
NotificationAPIChannel.SLACK,
|
|
])
|
|
|
|
|
|
class NotificationAPIMode:
|
|
"""Modes"""
|
|
|
|
TEMPLATE = "template"
|
|
MESSAGE = "message"
|
|
|
|
|
|
# A List of our channels we can use for verification
|
|
NOTIFICATIONAPI_MODES: frozenset[str] = frozenset([
|
|
NotificationAPIMode.TEMPLATE,
|
|
NotificationAPIMode.MESSAGE,
|
|
])
|
|
|
|
|
|
class NotifyNotificationAPI(NotifyBase):
|
|
"""
|
|
A wrapper for NotificationAPI Notifications
|
|
"""
|
|
|
|
# The default descriptive name associated with the Notification
|
|
service_name = "NotificationAPI"
|
|
|
|
# The services URL
|
|
service_url = "https://www.notificationapi.com/"
|
|
|
|
# The default secure protocol
|
|
secure_protocol = ("napi", "notificationapi")
|
|
|
|
# A URL that takes you to the setup/help of the specific protocol
|
|
setup_url = "https://appriseit.com/services/notificationapi/"
|
|
|
|
# If no NotificationAPI Message Type is specified, then the following is
|
|
# used
|
|
default_message_type = "apprise"
|
|
|
|
# Allow 300 requests per minute.
|
|
# 60/300 = 0.2
|
|
request_rate_per_sec = 0.2
|
|
|
|
# Allows the user to specify the NotifyImageSize object
|
|
image_size = NotifyImageSize.XY_72
|
|
|
|
# Define object templates
|
|
templates = (
|
|
"{schema}://{client_id}/{client_secret}/{targets}",
|
|
"{schema}://{type}@{client_id}/{client_secret}/{targets}",
|
|
)
|
|
|
|
# Explicit URL tokens we care about (all others from base are ignored)
|
|
template_tokens = dict(NotifyBase.template_tokens, **{
|
|
"type": {
|
|
"name": _("Message Type"),
|
|
"type": "string",
|
|
"regex": (r"^[A-Z0-9_-]+$", "i"),
|
|
"required": True,
|
|
"map_to": "message_type",
|
|
},
|
|
"client_id": {
|
|
"name": _("Client ID"),
|
|
"type": "string",
|
|
"required": True,
|
|
},
|
|
"client_secret": {
|
|
"name": _("Client Secret"),
|
|
"type": "string",
|
|
"required": True,
|
|
"private": True,
|
|
},
|
|
"target_email": {
|
|
"name": _("Target Email"),
|
|
"type": "string",
|
|
"map_to": "targets",
|
|
},
|
|
"target_id": {
|
|
"name": _("Target ID"),
|
|
"type": "string",
|
|
"map_to": "targets",
|
|
},
|
|
"target_sms": {
|
|
"name": _("Target SMS"),
|
|
"type": "string",
|
|
"map_to": "targets",
|
|
},
|
|
"targets": {
|
|
"name": _("Targets"),
|
|
"type": "list:string",
|
|
},
|
|
})
|
|
|
|
# Supported query args
|
|
template_args = dict(NotifyBase.template_args, **{
|
|
"type": {
|
|
"alias_of": "type",
|
|
},
|
|
"channels": {
|
|
"name": _("Channels"),
|
|
"type": "list:string",
|
|
"values": NOTIFICATIONAPI_CHANNELS,
|
|
},
|
|
"region": {
|
|
"name": _("Region Name"),
|
|
"type": "choice:string",
|
|
"values": NOTIFICATIONAPI_REGIONS,
|
|
"default": NotificationAPIRegion.US,
|
|
},
|
|
"mode": {
|
|
"name": _("Mode"),
|
|
"type": "choice:string",
|
|
"values": NOTIFICATIONAPI_MODES,
|
|
},
|
|
"to": {
|
|
"alias_of": "targets",
|
|
},
|
|
# Email Values
|
|
"cc": {
|
|
"name": _("Carbon Copy"),
|
|
"type": "list:string",
|
|
},
|
|
"bcc": {
|
|
"name": _("Blind Carbon Copy"),
|
|
"type": "list:string",
|
|
},
|
|
"reply": {
|
|
"name": _("Reply To"),
|
|
"type": "string",
|
|
"map_to": "reply_to",
|
|
},
|
|
"from": {
|
|
"name": _("From Email"),
|
|
"type": "string",
|
|
"map_to": "from_addr",
|
|
},
|
|
"id": {
|
|
"alias_of": "client_id",
|
|
},
|
|
"secret": {
|
|
"alias_of": "client_secret",
|
|
}
|
|
})
|
|
|
|
# Define our token control
|
|
template_kwargs = {
|
|
"tokens": {
|
|
"name": _("Template Tokens"),
|
|
"prefix": ":",
|
|
},
|
|
}
|
|
|
|
def __init__(self, client_id, client_secret, message_type=None,
|
|
targets=None, cc=None, bcc=None, reply_to=None,
|
|
channels=None, region=None, mode=None, from_addr=None,
|
|
tokens=None, **kwargs):
|
|
"""
|
|
Initialize Notify NotificationAPI Object
|
|
"""
|
|
super().__init__(**kwargs)
|
|
|
|
# Client ID
|
|
self.client_id = validate_regex(client_id)
|
|
if not self.client_id:
|
|
msg = "An invalid NotificationAPI Client ID " \
|
|
"({}) was specified.".format(client_id)
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg)
|
|
|
|
# Client Secret
|
|
self.client_secret = validate_regex(client_secret)
|
|
if not self.client_secret:
|
|
msg = "An invalid NotificationAPI Client Secret " \
|
|
"({}) was specified.".format(client_secret)
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg)
|
|
|
|
# For tracking our email -> name lookups
|
|
self.names = {}
|
|
|
|
# Prepare our From Address
|
|
from_addr_ = [self.app_id, ""]
|
|
self.from_addr = None
|
|
if isinstance(from_addr, str):
|
|
result = is_email(from_addr)
|
|
if result:
|
|
from_addr_ = (
|
|
result["name"] if result["name"] else from_addr_[0],
|
|
result["full_email"])
|
|
else:
|
|
# Only update the string but use the already detected info
|
|
from_addr_[0] = from_addr
|
|
|
|
# Store our lookup
|
|
self.from_addr = from_addr_[1]
|
|
self.names[from_addr_[1]] = from_addr_[0]
|
|
|
|
# Prepare our Reply-To Address
|
|
self.reply_to = {}
|
|
if isinstance(reply_to, str):
|
|
result = is_email(reply_to)
|
|
if result and "full_email" in result:
|
|
self.reply_to = {
|
|
"senderName": result["name"]
|
|
if result["name"] else from_addr_[0],
|
|
"senderEmail": result["full_email"],
|
|
}
|
|
|
|
# Our Targets are delimited by found ids
|
|
self.targets = []
|
|
if mode and isinstance(mode, str):
|
|
self.mode = next(
|
|
(a for a in NOTIFICATIONAPI_MODES if a.startswith(mode)), None
|
|
)
|
|
if self.mode not in NOTIFICATIONAPI_MODES:
|
|
msg = \
|
|
f"The NotificationAPI mode specified ({mode}) is invalid."
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg)
|
|
else:
|
|
# Detect mode based on whether or not a message_type was provided
|
|
self.mode = NotificationAPIMode.MESSAGE if not message_type else \
|
|
NotificationAPIMode.TEMPLATE
|
|
|
|
if not message_type:
|
|
# Assign a default message type
|
|
self.message_type = self.default_message_type
|
|
|
|
else:
|
|
self.message_type = validate_regex(
|
|
message_type, *self.template_tokens["type"]["regex"])
|
|
if not self.message_type:
|
|
msg = "An invalid NotificationAPI Message Type " \
|
|
"({}) was specified.".format(message_type)
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg)
|
|
|
|
# Precompute auth header
|
|
# Ruby/docs show POST "/{client_id}/sender" with:
|
|
# Basic base64(client_id:client_secret)
|
|
# https://www.notificationapi.com/docs/reference/server
|
|
token = base64.b64encode(
|
|
f"{self.client_id}:{self.client_secret}".encode()).decode("ascii")
|
|
self.auth_header = f"Basic {token}"
|
|
|
|
# Acquire Carbon Copies
|
|
self.cc = set()
|
|
|
|
# Acquire Blind Carbon Copies
|
|
self.bcc = set()
|
|
|
|
# Store our region
|
|
try:
|
|
self.region = (
|
|
self.template_args["region"]["default"]
|
|
if region is None else region.lower()
|
|
)
|
|
|
|
if self.region not in NOTIFICATIONAPI_REGIONS:
|
|
# allow the outer except to handle this common response
|
|
raise IndexError()
|
|
|
|
except (AttributeError, IndexError, TypeError):
|
|
# Invalid region specified
|
|
msg = \
|
|
f"The NotificationAPI region specified ({region}) is invalid."
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg) from None
|
|
|
|
# Initialize an empty set of channels
|
|
self.channels = set()
|
|
for channel_ in parse_list(channels):
|
|
channel = channel_.lower()
|
|
if channel not in NOTIFICATIONAPI_CHANNELS:
|
|
# Invalid channel specified
|
|
msg = (
|
|
"The NotificationAPI forced channel specified "
|
|
f"({channel}) is invalid.")
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg) from None
|
|
self.channels.add(channel)
|
|
|
|
# Used for URL generation afterwards only
|
|
self._invalid_targets = []
|
|
|
|
if targets:
|
|
current_target = {}
|
|
for entry in parse_list(targets, sort=False):
|
|
result = is_email(entry)
|
|
if result:
|
|
if "email" not in current_target:
|
|
current_target["email"] = result["full_email"]
|
|
if not self.channels:
|
|
self.channels.add(NotificationAPIChannel.EMAIL)
|
|
self.logger.info(
|
|
"The NotificationAPI default channel of "
|
|
f"{NotificationAPIChannel.EMAIL} was set.")
|
|
continue
|
|
|
|
elif "id" in current_target:
|
|
# Store and move on
|
|
self.targets.append(current_target)
|
|
current_target = {
|
|
"email": result["full_email"]
|
|
}
|
|
continue
|
|
|
|
# if we got here, we have to many emails making it now
|
|
# ambiguous as to who the sender intended to notify
|
|
msg = (
|
|
"The NotificationAPI received too many emails "
|
|
"creating an ambiguous situation; aborted at "
|
|
f"'{entry}'.")
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg) from None
|
|
|
|
result = is_phone_no(entry)
|
|
if result:
|
|
if "number" not in current_target:
|
|
current_target["number"] = \
|
|
("+" if entry[0] == "+" else "") + result["full"]
|
|
if not self.channels:
|
|
self.channels.add(NotificationAPIChannel.SMS)
|
|
self.logger.info(
|
|
"The NotificationAPI default channel of "
|
|
f"{NotificationAPIChannel.SMS} was set.")
|
|
continue
|
|
|
|
elif "id" in current_target:
|
|
# Store and move on
|
|
self.targets.append(current_target)
|
|
current_target = {
|
|
"number": result["full"]
|
|
}
|
|
continue
|
|
|
|
# if we got here, we have to many emails making it now
|
|
# ambiguous as to who the sender intended to notify
|
|
msg = (
|
|
"The NotificationAPI received too many phone no's "
|
|
"creating an ambiguous situation; aborted at "
|
|
f"'{entry}'.")
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg) from None
|
|
|
|
result = IS_VALID_ID_RE.match(entry)
|
|
if result:
|
|
if "id" not in current_target:
|
|
current_target["id"] = result.group("id")
|
|
continue
|
|
|
|
# Store id in next target and move on
|
|
self.targets.append(current_target)
|
|
current_target = {
|
|
"id": result.group("id")
|
|
}
|
|
continue
|
|
|
|
self.logger.warning(
|
|
"Dropped invalid NotificationAPI target "
|
|
f"({entry}) specified")
|
|
self._invalid_targets.append(entry)
|
|
continue
|
|
|
|
if "id" in current_target:
|
|
# Store our final entry
|
|
self.targets.append(current_target)
|
|
current_target = {}
|
|
|
|
if current_target:
|
|
# we have email or sms, but no id to go with it
|
|
msg = (
|
|
"The NotificationAPI did not detect an id to "
|
|
"correlate the following with {}".format(
|
|
str(current_target)))
|
|
self.logger.warning(msg)
|
|
raise TypeError(msg) from None
|
|
|
|
# Validate recipients (cc:) and drop bad ones:
|
|
for recipient in parse_emails(cc):
|
|
result = is_email(recipient)
|
|
if result:
|
|
self.cc.add(result["full_email"])
|
|
if result["name"]:
|
|
self.names[result["full_email"]] = result["name"]
|
|
continue
|
|
|
|
self.logger.warning(
|
|
"Dropped invalid Carbon Copy email "
|
|
"({}) specified.".format(recipient),
|
|
)
|
|
|
|
# Validate recipients (bcc:) and drop bad ones:
|
|
for recipient in parse_emails(bcc):
|
|
|
|
result = is_email(recipient)
|
|
if result:
|
|
self.bcc.add(result["full_email"])
|
|
if result["name"]:
|
|
self.names[result["full_email"]] = result["name"]
|
|
continue
|
|
|
|
self.logger.warning(
|
|
"Dropped invalid Blind Carbon Copy email "
|
|
"({}) specified.".format(recipient),
|
|
)
|
|
|
|
# Template functionality
|
|
self.tokens = {}
|
|
if isinstance(tokens, dict):
|
|
self.tokens.update(tokens)
|
|
|
|
return
|
|
|
|
@property
|
|
def url_identifier(self):
|
|
"""
|
|
Returns all of the identifiers that make this URL unique from
|
|
another similar one. Targets or end points should never be identified
|
|
here.
|
|
"""
|
|
return (self.secure_protocol[0], self.client_id, self.client_secret)
|
|
|
|
def url(self, privacy=False, *args, **kwargs):
|
|
"""
|
|
Returns the URL built dynamically based on specified arguments.
|
|
"""
|
|
|
|
# Define any URL parameters
|
|
params = {
|
|
"mode": self.mode,
|
|
}
|
|
|
|
# Extend our parameters
|
|
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
|
|
|
|
if len(self.cc) > 0:
|
|
# Handle our Carbon Copy Addresses
|
|
params["cc"] = ",".join([
|
|
formataddr(
|
|
(self.names.get(e, False), e),
|
|
# Swap comma for it's escaped url code (if detected) since
|
|
# we're using that as a delimiter
|
|
charset="utf-8").replace(",", "%2C")
|
|
for e in self.cc])
|
|
|
|
if len(self.bcc) > 0:
|
|
# Handle our Blind Carbon Copy Addresses
|
|
params["bcc"] = ",".join([
|
|
formataddr(
|
|
(self.names.get(e, False), e),
|
|
# Swap comma for it's escaped url code (if detected) since
|
|
# we're using that as a delimiter
|
|
charset="utf-8").replace(",", "%2C")
|
|
for e in self.bcc])
|
|
|
|
if self.reply_to:
|
|
# Handle our Reply-To Address
|
|
params["reply"] = formataddr(
|
|
(self.reply_to["senderName"], self.reply_to["senderEmail"]),
|
|
# Swap comma for its escaped url code (if detected) since
|
|
# we're using that as a delimiter
|
|
charset="utf-8",
|
|
)
|
|
|
|
if self.channels:
|
|
# Prepare our default channel
|
|
params["channels"] = ",".join(self.channels)
|
|
|
|
if self.region != self.template_args["region"]["default"]:
|
|
# Prepare our default region
|
|
params["region"] = self.region
|
|
|
|
# handle from=
|
|
if self.from_addr and self.names[self.from_addr] != self.app_id:
|
|
params["from"] = self.names[self.from_addr]
|
|
|
|
# Extend our parameters
|
|
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
|
|
# Store any template entries if specified
|
|
params.update({f":{k}": v for k, v in self.tokens.items()})
|
|
|
|
targets = []
|
|
for target in self.targets:
|
|
# ID is always present
|
|
targets.append(f"@{target['id']}")
|
|
if "number" in target:
|
|
targets.append(f"{target['number']}")
|
|
if "email" in target:
|
|
targets.append(f"{target['email']}")
|
|
|
|
mtype = f"{self.message_type}@" \
|
|
if self.message_type != self.default_message_type else ""
|
|
return "{schema}://{mtype}{cid}/{secret}/{targets}?{params}".format(
|
|
schema=self.secure_protocol[0],
|
|
mtype=mtype,
|
|
cid=self.pprint(self.client_id, privacy, safe=""),
|
|
secret=self.pprint(self.client_secret, privacy, safe=""),
|
|
targets=NotifyNotificationAPI.quote("/".join(
|
|
chain(targets, self._invalid_targets)), safe="/"),
|
|
params=NotifyNotificationAPI.urlencode(params),
|
|
)
|
|
|
|
def __len__(self):
|
|
"""
|
|
Returns the number of targets associated with this notification
|
|
"""
|
|
|
|
return max(1, len(self.targets))
|
|
|
|
def gen_payload(self, body, title="", notify_type=NotifyType.INFO,
|
|
**kwargs):
|
|
"""
|
|
generates our NotificationAPI payload
|
|
"""
|
|
|
|
payload_ = {
|
|
"type": self.message_type,
|
|
}
|
|
if self.mode == NotificationAPIMode.TEMPLATE:
|
|
# Take a copy of our token dictionary
|
|
parameters = self.tokens.copy()
|
|
|
|
# Apply some defaults template values
|
|
parameters["appBody"] = body
|
|
parameters["appTitle"] = title
|
|
parameters["appType"] = notify_type.value
|
|
parameters["appId"] = self.app_id
|
|
parameters["appDescription"] = self.app_desc
|
|
parameters["appColor"] = self.color(notify_type)
|
|
parameters["appImageUrl"] = self.image_url(notify_type)
|
|
parameters["appUrl"] = self.app_url
|
|
|
|
# A Simple Email Payload Template
|
|
payload_.update({
|
|
"parameters": {**parameters},
|
|
})
|
|
|
|
else:
|
|
# Acquire text version of body if provided
|
|
text_body = convert_between(
|
|
NotifyFormat.HTML, NotifyFormat.TEXT, body) \
|
|
if self.notify_format == NotifyFormat.HTML else body
|
|
|
|
for channel in self.channels:
|
|
# Python v3.10 supports `match/case` but since Apprise aims to
|
|
# be compatible with Python v3.9+, we must use if/else for the
|
|
# time being
|
|
if channel == NotificationAPIChannel.SMS:
|
|
payload_.update({
|
|
NotificationAPIChannel.SMS: {
|
|
"message": (title + "\n" + text_body)
|
|
if title else text_body,
|
|
},
|
|
})
|
|
|
|
elif channel == NotificationAPIChannel.EMAIL:
|
|
html_body = convert_between(
|
|
NotifyFormat.TEXT, NotifyFormat.HTML, body) \
|
|
if self.notify_format != NotifyFormat.HTML else body
|
|
|
|
payload_.update({
|
|
NotificationAPIChannel.EMAIL: {
|
|
"subject": title if title else self.app_id,
|
|
"html": html_body,
|
|
},
|
|
})
|
|
|
|
if self.from_addr:
|
|
payload_[NotificationAPIChannel.EMAIL].update({
|
|
"senderEmail": self.from_addr,
|
|
"senderName": self.names[self.from_addr],
|
|
})
|
|
|
|
elif channel == NotificationAPIChannel.INAPP:
|
|
payload_.update({
|
|
NotificationAPIChannel.INAPP: {
|
|
"title": title if title else self.app_id,
|
|
"image": self.image_url(notify_type),
|
|
},
|
|
})
|
|
|
|
elif channel == NotificationAPIChannel.WEB_PUSH:
|
|
payload_.update({
|
|
NotificationAPIChannel.WEB_PUSH: {
|
|
"title": title if title else self.app_id,
|
|
"message": text_body,
|
|
"icon": self.image_url(notify_type),
|
|
},
|
|
})
|
|
|
|
elif channel == NotificationAPIChannel.MOBILE_PUSH:
|
|
payload_.update({
|
|
NotificationAPIChannel.MOBILE_PUSH: {
|
|
"title": title if title else self.app_id,
|
|
"message": text_body,
|
|
},
|
|
})
|
|
|
|
else: # channel == NotificationAPIChannel.SLACK
|
|
payload_.update({
|
|
NotificationAPIChannel.SLACK: {
|
|
"text": (title + "\n" + text_body)
|
|
if title else text_body,
|
|
},
|
|
})
|
|
|
|
# Copy our list to work with
|
|
targets = list(self.targets)
|
|
if self.from_addr:
|
|
payload_.update({
|
|
"options": {
|
|
"email": {
|
|
"fromAddress": self.from_addr,
|
|
"fromName": self.names[self.from_addr]}}})
|
|
|
|
elif self.cc or self.bcc:
|
|
# Set up shell
|
|
payload_.update({"options": {"email": {}}})
|
|
|
|
while len(targets) > 0:
|
|
target = targets.pop(0)
|
|
|
|
# Create a copy of our template
|
|
payload = payload_.copy()
|
|
|
|
# the cc, bcc, to field must be unique or SendMail will fail,
|
|
# the below code prepares this by ensuring the target isn't in
|
|
# the cc list or bcc list. It also makes sure the cc list does
|
|
# not contain any of the bcc entries
|
|
if "email" in target:
|
|
cc = (self.cc - self.bcc - {target["email"]})
|
|
bcc = (self.bcc - {target["email"]})
|
|
|
|
else:
|
|
# Assume defaults
|
|
cc = self.cc
|
|
bcc = self.bcc
|
|
|
|
#
|
|
# Prepare our 'to'
|
|
#
|
|
payload["to"] = {**target}
|
|
|
|
# Support cc/bcc
|
|
if len(cc):
|
|
payload["options"]["email"]["ccAddresses"] = list(cc)
|
|
if len(bcc):
|
|
payload["options"]["email"]["bccAddresses"] = list(bcc)
|
|
|
|
yield payload
|
|
|
|
def send(self, body, title="", notify_type=NotifyType.INFO, **kwargs):
|
|
"""
|
|
Perform NotificationAPI Notification
|
|
"""
|
|
|
|
# error tracking (used for function return)
|
|
has_error = False
|
|
|
|
if not self.targets:
|
|
# There is no one to email or send an sms message to; we're done
|
|
self.logger.warning(
|
|
"There are no NotificationAPI recipients to notify"
|
|
)
|
|
return False
|
|
|
|
# Prepare our URL
|
|
url = (
|
|
f"{NOTIFICATIONAPI_API_LOOKUP[self.region]}/"
|
|
f"{self.client_id}/sender")
|
|
|
|
headers = {
|
|
"User-Agent": self.app_id,
|
|
"Content-Type": "application/json",
|
|
"Authorization": self.auth_header,
|
|
}
|
|
|
|
for payload in self.gen_payload(
|
|
body, title=title, notify_type=notify_type, **kwargs):
|
|
# Perform our post
|
|
self.logger.debug(
|
|
"NotificationAPI POST URL: {} (cert_verify={!r})".format(
|
|
url, self.verify_certificate))
|
|
self.logger.debug(
|
|
"NotificationAPI Payload: %s", payload["to"]["id"])
|
|
|
|
# Always call throttle before any remote server i/o is made
|
|
self.throttle()
|
|
try:
|
|
r = requests.post(
|
|
url,
|
|
data=dumps(payload),
|
|
headers=headers,
|
|
verify=self.verify_certificate,
|
|
timeout=self.request_timeout,
|
|
)
|
|
|
|
try:
|
|
loads(r.content)
|
|
|
|
except (AttributeError, TypeError, ValueError):
|
|
# This gets thrown if we can't parse our JSON Response
|
|
# - ValueError = r.content is Unparsable
|
|
# - TypeError = r.content is None
|
|
# - AttributeError = r is None
|
|
self.logger.warning(
|
|
"Invalid response from NotificationAPI server.")
|
|
self.logger.debug(
|
|
"Response Details:\r\n%r", (r.content or b"")[:2000])
|
|
|
|
# Record our failure
|
|
has_error = True
|
|
continue
|
|
|
|
# Reference status code
|
|
status_code = r.status_code
|
|
|
|
if status_code not in (
|
|
requests.codes.ok, requests.codes.accepted):
|
|
# We had a problem
|
|
status_str = \
|
|
NotifyNotificationAPI.http_response_code_lookup(
|
|
status_code)
|
|
|
|
self.logger.warning(
|
|
"Failed to send NotificationAPI notification to %s: "
|
|
"%s%serror=%d",
|
|
payload["to"]["id"],
|
|
status_str,
|
|
", " if status_str else "",
|
|
status_code)
|
|
|
|
self.logger.debug(
|
|
"Response Details:\r\n%r", (r.content or b"")[:2000])
|
|
|
|
# Record our failure
|
|
has_error = True
|
|
|
|
else:
|
|
self.logger.info(
|
|
"Sent NotificationAPI notification to %s.",
|
|
payload["to"]["id"])
|
|
|
|
except requests.RequestException as e:
|
|
self.logger.warning(
|
|
"A Connection error occurred sending NotificationAPI "
|
|
"notification to %s.", payload["to"]["id"])
|
|
self.logger.debug("Socket Exception: {}".format(str(e)))
|
|
|
|
# Record our failure
|
|
has_error = True
|
|
|
|
return not has_error
|
|
|
|
@staticmethod
|
|
def parse_url(url):
|
|
"""
|
|
Parses the URL and returns enough arguments that can allow
|
|
us to re-instantiate this object.
|
|
|
|
"""
|
|
|
|
results = NotifyBase.parse_url(url, verify_host=False)
|
|
if not results:
|
|
# We're done early as we couldn't load the results
|
|
return results
|
|
|
|
# Define our minimum requirements; defining them now saves us from
|
|
# having to if/else all kinds of branches below...
|
|
results["client_id"] = None
|
|
results["client_secret"] = None
|
|
|
|
# Prepare our targets (starting with our host)
|
|
results["targets"] = []
|
|
if results["host"]:
|
|
results["targets"].append(
|
|
NotifyNotificationAPI.unquote(results["host"]))
|
|
|
|
# For tracking email sources
|
|
results["from_addr"] = None
|
|
if "from" in results["qsd"] and len(results["qsd"]["from"]):
|
|
results["from_addr"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["from"].rstrip())
|
|
|
|
# First 2 elements are the client_id and client_secret
|
|
# Following are targets
|
|
results["targets"] += \
|
|
NotifyNotificationAPI.split_path(results["fullpath"])
|
|
# check for our client id
|
|
if "id" in results["qsd"] and len(results["qsd"]["id"]):
|
|
# Store our Client ID
|
|
results["client_id"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["id"])
|
|
|
|
elif results["targets"]:
|
|
# Store our Client ID
|
|
results["client_id"] = results["targets"].pop(0)
|
|
|
|
if "secret" in results["qsd"] and len(results["qsd"]["secret"]):
|
|
# Store our Client Secret
|
|
results["client_secret"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["secret"])
|
|
|
|
elif results["targets"]:
|
|
# Store our Client Secret
|
|
results["client_secret"] = results["targets"].pop(0)
|
|
|
|
if "region" in results["qsd"] and len(results["qsd"]["region"]):
|
|
results["region"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["region"])
|
|
|
|
if "channels" in results["qsd"] and len(results["qsd"]["channels"]):
|
|
results["channels"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["channels"])
|
|
|
|
if "mode" in results["qsd"] and len(results["qsd"]["mode"]):
|
|
results["mode"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["mode"])
|
|
|
|
if "reply" in results["qsd"] and len(results["qsd"]["reply"]):
|
|
results["reply_to"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["reply"])
|
|
|
|
# Handling of Message Type
|
|
if "type" in results["qsd"] and len(results["qsd"]["type"]):
|
|
results["message_type"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["type"])
|
|
|
|
elif results["user"]:
|
|
# Pull from user
|
|
results["message_type"] = \
|
|
NotifyNotificationAPI.unquote(results["user"])
|
|
|
|
# The 'to' makes it easier to use yaml configuration
|
|
if "to" in results["qsd"] and len(results["qsd"]["to"]):
|
|
results["targets"].append(
|
|
NotifyNotificationAPI.unquote(results["qsd"]["to"]))
|
|
|
|
# Handle Carbon Copy Addresses
|
|
if "cc" in results["qsd"] and len(results["qsd"]["cc"]):
|
|
results["cc"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["cc"])
|
|
|
|
# Handle Blind Carbon Copy Addresses
|
|
if "bcc" in results["qsd"] and len(results["qsd"]["bcc"]):
|
|
results["bcc"] = \
|
|
NotifyNotificationAPI.unquote(results["qsd"]["bcc"])
|
|
|
|
# Store our tokens
|
|
results["tokens"] = results["qsd:"]
|
|
|
|
return results
|