diff --git a/api/package.json b/api/package.json index 7850e92..463fe3d 100644 --- a/api/package.json +++ b/api/package.json @@ -27,6 +27,7 @@ "@nestjs/mongoose": "^10.0.10", "@nestjs/passport": "^10.0.3", "@nestjs/platform-express": "^10.4.5", + "@nestjs/schedule": "^4.1.1", "@nestjs/swagger": "^7.4.2", "@nestjs/throttler": "^6.2.1", "axios": "^1.7.7", diff --git a/api/pnpm-lock.yaml b/api/pnpm-lock.yaml index 5f7b043..f50b029 100644 --- a/api/pnpm-lock.yaml +++ b/api/pnpm-lock.yaml @@ -29,6 +29,9 @@ importers: '@nestjs/platform-express': specifier: ^10.4.5 version: 10.4.5(@nestjs/common@10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/core@10.4.5) + '@nestjs/schedule': + specifier: ^4.1.1 + version: 4.1.1(@nestjs/common@10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/core@10.4.5(@nestjs/common@10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/platform-express@10.4.5)(reflect-metadata@0.2.2)(rxjs@7.8.1)) '@nestjs/swagger': specifier: ^7.4.2 version: 7.4.2(@nestjs/common@10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/core@10.4.5(@nestjs/common@10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/platform-express@10.4.5)(reflect-metadata@0.2.2)(rxjs@7.8.1))(reflect-metadata@0.2.2) @@ -806,6 +809,12 @@ packages: '@nestjs/common': ^10.0.0 '@nestjs/core': ^10.0.0 + '@nestjs/schedule@4.1.1': + resolution: {integrity: sha512-VxAnCiU4HP0wWw8IdWAVfsGC/FGjyToNjjUtXDEQL6oj+w/N5QDd2VT9k6d7Jbr8PlZuBZNdWtDKSkH5bZ+RXQ==} + peerDependencies: + '@nestjs/common': ^8.0.0 || ^9.0.0 || ^10.0.0 + '@nestjs/core': ^8.0.0 || ^9.0.0 || ^10.0.0 + '@nestjs/schematics@10.2.2': resolution: {integrity: sha512-D4pJ46E8llCA7WPr3cV6sfRqDlvnTjQWnF1fLyKYD3Ldl+KPtlLyIcxaqlLTB0YR9ItKNKIZTJzUehRxR7UUsQ==} peerDependencies: @@ -1178,6 +1187,9 @@ packages: '@types/long@4.0.2': resolution: {integrity: sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA==} + '@types/luxon@3.4.2': + resolution: {integrity: sha512-TifLZlFudklWlMBfhubvgqTXRzLDI5pCbGa4P8a3wPyUQSW+1xQ5eDsreP9DWHX3tjq1ke96uYG/nwundroWcA==} + '@types/methods@1.1.4': resolution: {integrity: sha512-ymXWVrDiCxTBE3+RIrrP533E70eA+9qu7zdWoHuOmGujkYtzf4HQF96b8nwHLqhuf4ykX61IGRIB38CC6/sImQ==} @@ -1807,6 +1819,9 @@ packages: create-require@1.1.1: resolution: {integrity: sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==} + cron@3.1.7: + resolution: {integrity: sha512-tlBg7ARsAMQLzgwqVxy8AZl/qlTc5nibqYwtNGoCrd+cV+ugI+tvZC1oT/8dFH8W455YrywGykx/KMmAqOr7Jw==} + cross-spawn@7.0.3: resolution: {integrity: sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==} engines: {node: '>= 8'} @@ -2985,6 +3000,10 @@ packages: lru-memoizer@2.3.0: resolution: {integrity: sha512-GXn7gyHAMhO13WSKrIiNfztwxodVsP8IoZ3XfrJV4yH2x0/OeTO/FIaAHTY5YekdGgW94njfuKmyyt1E0mR6Ug==} + luxon@3.4.4: + resolution: {integrity: sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==} + engines: {node: '>=12'} + magic-string@0.30.8: resolution: {integrity: sha512-ISQTe55T2ao7XtlAStud6qwYPZjE4GK1S/BeVPus4jrq6JuOnQ00YKQC581RWhR122W7msZV263KzVeLoqidyQ==} engines: {node: '>=12'} @@ -5373,6 +5392,13 @@ snapshots: transitivePeerDependencies: - supports-color + '@nestjs/schedule@4.1.1(@nestjs/common@10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/core@10.4.5(@nestjs/common@10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/platform-express@10.4.5)(reflect-metadata@0.2.2)(rxjs@7.8.1))': + dependencies: + '@nestjs/common': 10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1) + '@nestjs/core': 10.4.5(@nestjs/common@10.4.5(reflect-metadata@0.2.2)(rxjs@7.8.1))(@nestjs/platform-express@10.4.5)(reflect-metadata@0.2.2)(rxjs@7.8.1) + cron: 3.1.7 + uuid: 10.0.0 + '@nestjs/schematics@10.2.2(chokidar@3.6.0)(typescript@5.3.3)': dependencies: '@angular-devkit/core': 17.3.10(chokidar@3.6.0) @@ -5927,6 +5953,8 @@ snapshots: '@types/long@4.0.2': optional: true + '@types/luxon@3.4.2': {} + '@types/methods@1.1.4': {} '@types/mime@1.3.5': {} @@ -6691,6 +6719,11 @@ snapshots: create-require@1.1.1: {} + cron@3.1.7: + dependencies: + '@types/luxon': 3.4.2 + luxon: 3.4.4 + cross-spawn@7.0.3: dependencies: path-key: 3.1.1 @@ -8219,6 +8252,8 @@ snapshots: lodash.clonedeep: 4.5.0 lru-cache: 6.0.0 + luxon@3.4.4: {} + magic-string@0.30.8: dependencies: '@jridgewell/sourcemap-codec': 1.5.0 diff --git a/api/src/app.module.ts b/api/src/app.module.ts index ffa951e..fce66fe 100644 --- a/api/src/app.module.ts +++ b/api/src/app.module.ts @@ -5,7 +5,9 @@ import { AuthModule } from './auth/auth.module' import { UsersModule } from './users/users.module' import { ThrottlerModule } from '@nestjs/throttler' import { APP_GUARD } from '@nestjs/core/constants' +import { WebhookModule } from './webhook/webhook.module' import { ThrottlerByIpGuard } from './auth/guards/throttle-by-ip.guard' +import { ScheduleModule } from '@nestjs/schedule' @Module({ imports: [ @@ -16,9 +18,11 @@ import { ThrottlerByIpGuard } from './auth/guards/throttle-by-ip.guard' limit: 30, }, ]), + ScheduleModule.forRoot(), AuthModule, UsersModule, GatewayModule, + WebhookModule, ], controllers: [], providers: [ diff --git a/api/src/webhook/schemas/webhook-notification.schema.ts b/api/src/webhook/schemas/webhook-notification.schema.ts new file mode 100644 index 0000000..3e366dc --- /dev/null +++ b/api/src/webhook/schemas/webhook-notification.schema.ts @@ -0,0 +1,41 @@ +import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose' +import { Document, Types } from 'mongoose' +import { WebhookSubscription } from './webhook-subscription.schema' +import { SMS } from 'src/gateway/schemas/sms.schema' + +export type WebhookNotificationDocument = WebhookNotification & Document + +@Schema({ timestamps: true }) +export class WebhookNotification { + _id?: Types.ObjectId + + @Prop({ type: Types.ObjectId, ref: WebhookSubscription.name, required: true }) + webhookSubscription: WebhookSubscription + + @Prop({ type: String, required: true }) + event: string + + @Prop({ type: Object, required: true }) + payload: object + + @Prop({ type: Types.ObjectId, ref: SMS.name }) + sms: SMS + + @Prop({ type: String }) + deliveredAt: Date + + @Prop({ type: Date }) + lastDeliveryAttemptAt: Date + + @Prop({ type: Date }) + nextDeliveryAttemptAt: Date + + @Prop({ type: Number, default: 0 }) + deliveryAttemptCount: number + + @Prop({ type: Date }) + deliveryAttemptAbortedAt: Date +} + +export const WebhookNotificationSchema = + SchemaFactory.createForClass(WebhookNotification) diff --git a/api/src/webhook/schemas/webhook-subscription.schema.ts b/api/src/webhook/schemas/webhook-subscription.schema.ts new file mode 100644 index 0000000..3e69d04 --- /dev/null +++ b/api/src/webhook/schemas/webhook-subscription.schema.ts @@ -0,0 +1,43 @@ +import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose' +import { Document, Types } from 'mongoose' +import { User } from 'src/users/schemas/user.schema' +import { WebhookEvent } from '../webhook-event.enum' + +export type WebhookSubscriptionDocument = WebhookSubscription & Document + +@Schema({ timestamps: true }) +export class WebhookSubscription { + _id?: Types.ObjectId + + @Prop({ type: Types.ObjectId, ref: User.name }) + user: User + + @Prop({ type: Boolean, default: true }) + isActive: boolean + + @Prop({ type: [String], default: [WebhookEvent.MESSAGE_RECEIVED] }) + events: string[] + + @Prop({ type: String, required: true }) + deliveryUrl: string + + @Prop({ type: String, required: true }) + signingSecret: string + + @Prop({ type: Number, default: 0 }) + successfulDeliveryCount: number + + @Prop({ type: Number, default: 0 }) + deliveryAttemptCount: number + + @Prop({ type: Date }) + lastDeliveryAttemptAt: Date + + @Prop({ type: Date }) + lastDeliverySuccessAt: Date +} + +export const WebhookSubscriptionSchema = + SchemaFactory.createForClass(WebhookSubscription) + +WebhookSubscriptionSchema.index({ user: 1, events: 1 }, { unique: true }) diff --git a/api/src/webhook/webhook-event.enum.ts b/api/src/webhook/webhook-event.enum.ts new file mode 100644 index 0000000..b3b1ecd --- /dev/null +++ b/api/src/webhook/webhook-event.enum.ts @@ -0,0 +1,3 @@ +export enum WebhookEvent { + MESSAGE_RECEIVED = 'MESSAGE_RECEIVED', +} diff --git a/api/src/webhook/webhook.controller.ts b/api/src/webhook/webhook.controller.ts new file mode 100644 index 0000000..2539e57 --- /dev/null +++ b/api/src/webhook/webhook.controller.ts @@ -0,0 +1,68 @@ +import { + Body, + Request, + Param, + Post, + Patch, + Controller, + Get, + UseGuards, +} from '@nestjs/common' +import { WebhookService } from './webhook.service' +import { ApiBearerAuth, ApiTags } from '@nestjs/swagger' +import { CreateWebhookDto, UpdateWebhookDto } from './webhook.dto' +import { AuthGuard } from 'src/auth/guards/auth.guard' + +@ApiTags('webhooks') +@ApiBearerAuth() +@Controller('webhooks') +export class WebhookController { + constructor(private readonly webhookService: WebhookService) {} + + @Get() + @UseGuards(AuthGuard) + async getWebhooks(@Request() req) { + const data = await this.webhookService.findWebhooksForUser({ + user: req.user, + }) + return { data } + } + + @Get(':webhookId') + @UseGuards(AuthGuard) + async getWebhook(@Request() req, @Param('webhookId') webhookId: string) { + const data = await this.webhookService.findOne({ + user: req.user, + webhookId, + }) + return { data } + } + + @Post() + @UseGuards(AuthGuard) + async createWebhook( + @Request() req, + @Body() createWebhookDto: CreateWebhookDto, + ) { + const data = await this.webhookService.create({ + user: req.user, + createWebhookDto, + }) + return { data } + } + + @Patch(':webhookId') + @UseGuards(AuthGuard) + async updateWebhook( + @Request() req, + @Param('webhookId') webhookId: string, + @Body() updateWebhookDto: UpdateWebhookDto, + ) { + const data = await this.webhookService.update({ + user: req.user, + webhookId, + updateWebhookDto, + }) + return { data } + } +} diff --git a/api/src/webhook/webhook.dto.ts b/api/src/webhook/webhook.dto.ts new file mode 100644 index 0000000..e71f9ad --- /dev/null +++ b/api/src/webhook/webhook.dto.ts @@ -0,0 +1,14 @@ +import { WebhookEvent } from './webhook-event.enum' + +export class CreateWebhookDto { + deliveryUrl: string + signingSecret?: string + events: WebhookEvent[] +} + +export class UpdateWebhookDto { + isActive: boolean + deliveryUrl: string + signingSecret: string + events: WebhookEvent[] +} diff --git a/api/src/webhook/webhook.module.ts b/api/src/webhook/webhook.module.ts new file mode 100644 index 0000000..ad7ca92 --- /dev/null +++ b/api/src/webhook/webhook.module.ts @@ -0,0 +1,35 @@ +import { Module } from '@nestjs/common' +import { MongooseModule } from '@nestjs/mongoose' +import { WebhookController } from './webhook.controller' +import { WebhookService } from './webhook.service' +import { + WebhookSubscription, + WebhookSubscriptionSchema, +} from './schemas/webhook-subscription.schema' +import { + WebhookNotification, + WebhookNotificationSchema, +} from './schemas/webhook-notification.schema' +import { AuthModule } from 'src/auth/auth.module' +import { UsersModule } from 'src/users/users.module' + +@Module({ + imports: [ + MongooseModule.forFeature([ + { + name: WebhookSubscription.name, + schema: WebhookSubscriptionSchema, + }, + { + name: WebhookNotification.name, + schema: WebhookNotificationSchema, + }, + ]), + AuthModule, + UsersModule, + ], + controllers: [WebhookController], + providers: [WebhookService], + exports: [MongooseModule, WebhookService], +}) +export class WebhookModule {} diff --git a/api/src/webhook/webhook.service.ts b/api/src/webhook/webhook.service.ts new file mode 100644 index 0000000..2fae90b --- /dev/null +++ b/api/src/webhook/webhook.service.ts @@ -0,0 +1,251 @@ +import { HttpException, HttpStatus, Injectable } from '@nestjs/common' +import { Model } from 'mongoose' +import { + WebhookSubscription, + WebhookSubscriptionDocument, +} from './schemas/webhook-subscription.schema' +import { InjectModel } from '@nestjs/mongoose' +import { WebhookEvent } from './webhook-event.enum' +import { + WebhookNotification, + WebhookNotificationDocument, +} from './schemas/webhook-notification.schema' +import axios from 'axios' +import { v4 as uuidv4 } from 'uuid' +import { Cron } from '@nestjs/schedule' +import { CronExpression } from '@nestjs/schedule' +import * as crypto from 'crypto' + +@Injectable() +export class WebhookService { + constructor( + @InjectModel(WebhookSubscription.name) + private webhookSubscriptionModel: Model, + @InjectModel(WebhookNotification.name) + private webhookNotificationModel: Model, + ) {} + + async findOne({ user, webhookId }) { + const webhook = await this.webhookSubscriptionModel.findOne({ + _id: webhookId, + user: user._id, + }) + + if (!webhook) { + throw new HttpException('Subscription not found', HttpStatus.NOT_FOUND) + } + return webhook + } + + async findWebhooksForUser({ user }) { + return await this.webhookSubscriptionModel.find({ user: user._id }) + } + + async create({ user, createWebhookDto }) { + const { events, deliveryUrl } = createWebhookDto + + // Add URL validation + try { + new URL(deliveryUrl) + } catch (e) { + throw new HttpException('Invalid delivery URL', HttpStatus.BAD_REQUEST) + } + + const existingSubscription = await this.webhookSubscriptionModel.findOne({ + user: user._id, + events, + }) + + if (existingSubscription) { + throw new HttpException( + 'You have already subscribed to this event', + HttpStatus.BAD_REQUEST, + ) + } + + if (!events.every((event) => Object.values(WebhookEvent).includes(event))) { + throw new HttpException('Invalid event type', HttpStatus.BAD_REQUEST) + } + + const signingSecret = uuidv4() + + // TODO: Encrypt signing secret + // const webhookSignatureKey = process.env.WEBHOOK_SIGNATURE_KEY + // const encryptedSigningSecret = encrypt(signingSecret, webhookSignatureKey) + + const webhookSubscription = await this.webhookSubscriptionModel.create({ + user: user._id, + events, + deliveryUrl, + signingSecret, + }) + + return webhookSubscription + } + + async update({ user, webhookId, updateWebhookDto }) { + const webhookSubscription = await this.webhookSubscriptionModel.findOne({ + _id: webhookId, + user: user._id, + }) + + if (!webhookSubscription) { + throw new HttpException('Subscription not found', HttpStatus.NOT_FOUND) + } + + if (updateWebhookDto.hasOwnProperty('isActive')) { + webhookSubscription.isActive = updateWebhookDto.isActive + } + + if (updateWebhookDto.hasOwnProperty('deliveryUrl')) { + webhookSubscription.deliveryUrl = updateWebhookDto.deliveryUrl + } + + // if there is a valid uuid signing secret, update it + if ( + updateWebhookDto.hasOwnProperty('signingSecret') && + updateWebhookDto.signingSecret.length < 20 + ) { + throw new HttpException('Invalid signing secret', HttpStatus.BAD_REQUEST) + } else if (updateWebhookDto.hasOwnProperty('signingSecret')) { + webhookSubscription.signingSecret = updateWebhookDto.signingSecret + } + + await webhookSubscription.save() + + return webhookSubscription + } + + async deliverNotification({ sms, user, event }) { + console.log('deliverNotification') + console.log(sms) + console.log(user) + console.log(event) + const webhookSubscription = await this.webhookSubscriptionModel.findOne({ + user: user._id, + events: { $in: [event] }, + }) + + if (!webhookSubscription || !webhookSubscription.isActive) { + return + } + + if (event === WebhookEvent.MESSAGE_RECEIVED) { + const payload = { + smsId: sms._id, + sender: sms.sender, + message: sms.message, + receivedAt: sms.receivedAt, + deviceId: sms.device, + webhookSubscriptionId: webhookSubscription._id, + webhookEvent: event, + } + const webhookNotification = await this.webhookNotificationModel.create({ + webhookSubscription: webhookSubscription._id, + event, + payload, + sms, + }) + + await this.attemptWebhookDelivery(webhookNotification) + } else { + throw new HttpException('Invalid event type', HttpStatus.BAD_REQUEST) + } + } + + private async attemptWebhookDelivery( + webhookNotification: WebhookNotificationDocument, + ) { + const now = new Date() + + const webhookSubscription = await this.webhookSubscriptionModel.findById( + webhookNotification.webhookSubscription, + ) + const deliveryUrl = webhookSubscription?.deliveryUrl + const signingSecret = webhookSubscription?.signingSecret + + const signature = crypto + .createHmac('sha256', signingSecret) + .update(JSON.stringify(webhookNotification.payload)) + .digest('hex') + + try { + await axios.post(deliveryUrl, webhookNotification.payload, { + headers: { + 'X-Signature': signature, + }, + timeout: 10000, + }) + webhookNotification.deliveryAttemptCount += 1 + webhookNotification.lastDeliveryAttemptAt = now + webhookNotification.nextDeliveryAttemptAt = this.getNextDeliveryAttemptAt( + webhookNotification.deliveryAttemptCount, + ) + webhookNotification.deliveredAt = now + await webhookNotification.save() + + + webhookSubscription.successfulDeliveryCount += 1 + webhookSubscription.lastDeliverySuccessAt = now + } catch (e) { + console.error( + `Failed to deliver webhook notification ${webhookNotification._id}: received response status code ${e.response.status}`, + ) + webhookNotification.deliveryAttemptCount += 1 + webhookNotification.lastDeliveryAttemptAt = now + webhookNotification.nextDeliveryAttemptAt = this.getNextDeliveryAttemptAt( + webhookNotification.deliveryAttemptCount, + ) + await webhookNotification.save() + + } finally { + webhookSubscription.deliveryAttemptCount += 1 + await webhookSubscription.save() + } + } + + private getNextDeliveryAttemptAt(deliveryAttemptCount: number): Date { + // Delays in minutes + const delaySequence = [ + 1, // 1 minute + 5, // 5 minutes + 30, // 30 minutes + 60, // 1 hour + 360, // 6 hours + 1440, // 1 day + 4320, // 3 days + 10080, // 7 days + 43200, // 30 days + ] + + // Get the delay in minutes (use last value if attempt count exceeds sequence length) + const delayInMinutes = + delaySequence[ + Math.min(deliveryAttemptCount - 1, delaySequence.length - 1) + ] || delaySequence[delaySequence.length - 1] + + // Convert minutes to milliseconds and add to current time + return new Date(Date.now() + delayInMinutes * 60 * 1000) + } + + // Check for notifications that need to be delivered every minute + @Cron(CronExpression.EVERY_MINUTE) + async checkForNotificationsToDeliver() { + const now = new Date() + const notifications = await this.webhookNotificationModel + .find({ + nextDeliveryAttemptAt: { $lte: now }, + deliveredAt: null, + deliveryAttemptCount: { $lt: 10 }, + deliveryAttemptAbortedAt: null, + }) + .sort({ nextDeliveryAttemptAt: 1 }) + .limit(50) + + console.log(`delivering ${notifications.length} webhook notifications`) + + for (const notification of notifications) { + await this.attemptWebhookDelivery(notification) + } + } +}