mirror of
https://github.com/vernu/textbee.git
synced 2026-04-25 09:20:07 -04:00
feat(api): implement optional job queue for sending sms with delay and scheduling
This commit is contained in:
@@ -19,3 +19,7 @@ MAIL_USER=
|
||||
MAIL_PASS=
|
||||
MAIL_FROM=
|
||||
MAIL_REPLY_TO=textbee.dev@gmail.com
|
||||
|
||||
# SMS Queue Configuration
|
||||
USE_SMS_QUEUE=false
|
||||
REDIS_URL=redis://localhost:6379 # if queue is enabled, redis url is required
|
||||
|
||||
@@ -21,9 +21,12 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@nest-modules/mailer": "^1.3.22",
|
||||
"@nestjs/bull": "^11.0.2",
|
||||
"@nestjs/common": "^10.4.5",
|
||||
"@nestjs/config": "^4.0.1",
|
||||
"@nestjs/core": "^10.4.5",
|
||||
"@nestjs/jwt": "^10.2.0",
|
||||
"@nestjs/mapped-types": "^2.1.0",
|
||||
"@nestjs/mongoose": "^10.0.10",
|
||||
"@nestjs/passport": "^10.0.3",
|
||||
"@nestjs/platform-express": "^10.4.5",
|
||||
@@ -33,10 +36,13 @@
|
||||
"@polar-sh/sdk": "^0.30.0",
|
||||
"axios": "^1.8.2",
|
||||
"bcryptjs": "^2.4.3",
|
||||
"bull": "^4.16.5",
|
||||
"class-validator": "^0.14.1",
|
||||
"dotenv": "^16.4.5",
|
||||
"express": "^4.21.2",
|
||||
"firebase-admin": "^12.6.0",
|
||||
"handlebars": "^4.7.8",
|
||||
"ioredis": "^5.6.0",
|
||||
"mongoose": "^8.12.1",
|
||||
"nodemailer": "^6.10.0",
|
||||
"passport": "^0.7.0",
|
||||
|
||||
536
api/pnpm-lock.yaml
generated
536
api/pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
@@ -1,4 +1,9 @@
|
||||
import { Module } from '@nestjs/common'
|
||||
import {
|
||||
MiddlewareConsumer,
|
||||
Module,
|
||||
NestModule,
|
||||
RequestMethod,
|
||||
} from '@nestjs/common'
|
||||
import { MongooseModule } from '@nestjs/mongoose'
|
||||
import { GatewayModule } from './gateway/gateway.module'
|
||||
import { AuthModule } from './auth/auth.module'
|
||||
@@ -7,12 +12,29 @@ import { ThrottlerModule } from '@nestjs/throttler'
|
||||
import { APP_GUARD } from '@nestjs/core/constants'
|
||||
import { WebhookModule } from './webhook/webhook.module'
|
||||
import { ThrottlerByIpGuard } from './auth/guards/throttle-by-ip.guard'
|
||||
import { Injectable, NestMiddleware } from '@nestjs/common'
|
||||
import { Request, Response, NextFunction } from 'express'
|
||||
import { ScheduleModule } from '@nestjs/schedule'
|
||||
import { BillingModule } from './billing/billing.module';
|
||||
import { BillingModule } from './billing/billing.module'
|
||||
import { ConfigModule, ConfigService } from '@nestjs/config'
|
||||
import { BullModule } from '@nestjs/bull'
|
||||
|
||||
@Injectable()
|
||||
export class LoggerMiddleware implements NestMiddleware {
|
||||
use(req: Request, res: Response, next: NextFunction) {
|
||||
console.log('req.originalUrl: ', req.originalUrl)
|
||||
if (next) {
|
||||
next()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
MongooseModule.forRoot(process.env.MONGO_URI),
|
||||
ConfigModule.forRoot({
|
||||
isGlobal: true,
|
||||
}),
|
||||
ThrottlerModule.forRoot([
|
||||
{
|
||||
ttl: 60000,
|
||||
@@ -20,6 +42,15 @@ import { BillingModule } from './billing/billing.module';
|
||||
},
|
||||
]),
|
||||
ScheduleModule.forRoot(),
|
||||
BullModule.forRootAsync({
|
||||
imports: [ConfigModule],
|
||||
inject: [ConfigService],
|
||||
useFactory: async (configService: ConfigService) => {
|
||||
return {
|
||||
redis: configService.get('REDIS_URL'),
|
||||
}
|
||||
},
|
||||
}),
|
||||
AuthModule,
|
||||
UsersModule,
|
||||
GatewayModule,
|
||||
@@ -34,4 +65,10 @@ import { BillingModule } from './billing/billing.module';
|
||||
},
|
||||
],
|
||||
})
|
||||
export class AppModule {}
|
||||
export class AppModule implements NestModule {
|
||||
configure(consumer: MiddlewareConsumer) {
|
||||
consumer
|
||||
.apply(LoggerMiddleware)
|
||||
.forRoutes({ path: '*', method: RequestMethod.ALL })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,10 @@ import { SMS, SMSSchema } from './schemas/sms.schema'
|
||||
import { SMSBatch, SMSBatchSchema } from './schemas/sms-batch.schema'
|
||||
import { WebhookModule } from 'src/webhook/webhook.module'
|
||||
import { BillingModule } from 'src/billing/billing.module'
|
||||
import { BullModule } from '@nestjs/bull'
|
||||
import { ConfigModule } from '@nestjs/config'
|
||||
import { SmsQueueService } from './queue/sms-queue.service'
|
||||
import { SmsQueueProcessor } from './queue/sms-queue.processor'
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@@ -26,13 +30,26 @@ import { BillingModule } from 'src/billing/billing.module'
|
||||
schema: SMSBatchSchema,
|
||||
},
|
||||
]),
|
||||
BullModule.registerQueue({
|
||||
name: 'sms',
|
||||
defaultJobOptions: {
|
||||
attempts: 2,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 1000,
|
||||
},
|
||||
removeOnComplete: false,
|
||||
removeOnFail: false,
|
||||
},
|
||||
}),
|
||||
AuthModule,
|
||||
UsersModule,
|
||||
WebhookModule,
|
||||
forwardRef(() => BillingModule),
|
||||
ConfigModule,
|
||||
],
|
||||
controllers: [GatewayController],
|
||||
providers: [GatewayService],
|
||||
exports: [MongooseModule, GatewayService],
|
||||
providers: [GatewayService, SmsQueueService, SmsQueueProcessor],
|
||||
exports: [MongooseModule, GatewayService, SmsQueueService],
|
||||
})
|
||||
export class GatewayModule {}
|
||||
|
||||
@@ -15,13 +15,12 @@ import { AuthService } from 'src/auth/auth.service'
|
||||
import { SMS } from './schemas/sms.schema'
|
||||
import { SMSType } from './sms-type.enum'
|
||||
import { SMSBatch } from './schemas/sms-batch.schema'
|
||||
import {
|
||||
BatchResponse,
|
||||
Message,
|
||||
} from 'firebase-admin/messaging'
|
||||
import { BatchResponse, Message } from 'firebase-admin/messaging'
|
||||
import { WebhookEvent } from 'src/webhook/webhook-event.enum'
|
||||
import { WebhookService } from 'src/webhook/webhook.service'
|
||||
import { BillingService } from 'src/billing/billing.service'
|
||||
import { SmsQueueService } from './queue/sms-queue.service'
|
||||
|
||||
@Injectable()
|
||||
export class GatewayService {
|
||||
constructor(
|
||||
@@ -31,6 +30,7 @@ export class GatewayService {
|
||||
private authService: AuthService,
|
||||
private webhookService: WebhookService,
|
||||
private billingService: BillingService,
|
||||
private smsQueueService: SmsQueueService,
|
||||
) {}
|
||||
|
||||
async registerDevice(
|
||||
@@ -151,6 +151,7 @@ export class GatewayService {
|
||||
message,
|
||||
recipientCount: recipients.length,
|
||||
recipientPreview: this.getRecipientsPreview(recipients),
|
||||
status: 'pending',
|
||||
})
|
||||
} catch (e) {
|
||||
throw new HttpException(
|
||||
@@ -173,6 +174,7 @@ export class GatewayService {
|
||||
type: SMSType.SENT,
|
||||
recipient,
|
||||
requestedAt: new Date(),
|
||||
status: 'pending',
|
||||
})
|
||||
const updatedSMSData = {
|
||||
smsId: sms._id,
|
||||
@@ -198,6 +200,50 @@ export class GatewayService {
|
||||
fcmMessages.push(fcmMessage)
|
||||
}
|
||||
|
||||
// Check if we should use the queue
|
||||
if (this.smsQueueService.isQueueEnabled()) {
|
||||
try {
|
||||
// Update batch status to processing
|
||||
await this.smsBatchModel.findByIdAndUpdate(smsBatch._id, {
|
||||
$set: { status: 'processing' },
|
||||
})
|
||||
|
||||
// Add to queue
|
||||
await this.smsQueueService.addSendSmsJob(
|
||||
deviceId,
|
||||
fcmMessages,
|
||||
smsBatch._id.toString(),
|
||||
)
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: 'SMS added to queue for processing',
|
||||
smsBatchId: smsBatch._id,
|
||||
recipientCount: recipients.length,
|
||||
}
|
||||
} catch (e) {
|
||||
// Update batch status to failed
|
||||
await this.smsBatchModel.findByIdAndUpdate(smsBatch._id, {
|
||||
$set: { status: 'failed', error: e.message },
|
||||
})
|
||||
|
||||
// Update all SMS in batch to failed
|
||||
await this.smsModel.updateMany(
|
||||
{ smsBatch: smsBatch._id },
|
||||
{ $set: { status: 'failed', error: e.message } },
|
||||
)
|
||||
|
||||
throw new HttpException(
|
||||
{
|
||||
success: false,
|
||||
error: 'Failed to add SMS to queue',
|
||||
additionalInfo: e,
|
||||
},
|
||||
HttpStatus.INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await firebaseAdmin.messaging().sendEach(fcmMessages)
|
||||
|
||||
@@ -223,8 +269,26 @@ export class GatewayService {
|
||||
console.log('Failed to update sentSMSCount')
|
||||
console.log(e)
|
||||
})
|
||||
|
||||
this.smsBatchModel
|
||||
.findByIdAndUpdate(smsBatch._id, {
|
||||
$set: { status: 'completed' },
|
||||
})
|
||||
.exec()
|
||||
.catch((e) => {
|
||||
console.error('failed to update sms batch status to completed')
|
||||
})
|
||||
|
||||
return response
|
||||
} catch (e) {
|
||||
this.smsBatchModel
|
||||
.findByIdAndUpdate(smsBatch._id, {
|
||||
$set: { status: 'failed', error: e.message },
|
||||
})
|
||||
.exec()
|
||||
.catch((e) => {
|
||||
console.error('failed to update sms batch status to failed')
|
||||
})
|
||||
throw new HttpException(
|
||||
{
|
||||
success: false,
|
||||
@@ -249,7 +313,6 @@ export class GatewayService {
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
if (
|
||||
!Array.isArray(body.messages) ||
|
||||
body.messages.length === 0 ||
|
||||
@@ -281,9 +344,11 @@ export class GatewayService {
|
||||
recipientPreview: this.getRecipientsPreview(
|
||||
messages.map((m) => m.recipients).flat(),
|
||||
),
|
||||
status: 'pending',
|
||||
})
|
||||
|
||||
const fcmResponses: BatchResponse[] = []
|
||||
const fcmMessages: Message[] = []
|
||||
|
||||
for (const smsData of messages) {
|
||||
const message = smsData.message
|
||||
const recipients = smsData.recipients
|
||||
@@ -296,8 +361,6 @@ export class GatewayService {
|
||||
continue
|
||||
}
|
||||
|
||||
const fcmMessages: Message[] = []
|
||||
|
||||
for (const recipient of recipients) {
|
||||
const sms = await this.smsModel.create({
|
||||
device: device._id,
|
||||
@@ -306,6 +369,7 @@ export class GatewayService {
|
||||
type: SMSType.SENT,
|
||||
recipient,
|
||||
requestedAt: new Date(),
|
||||
status: 'pending',
|
||||
})
|
||||
const updatedSMSData = {
|
||||
smsId: sms._id,
|
||||
@@ -330,9 +394,58 @@ export class GatewayService {
|
||||
}
|
||||
fcmMessages.push(fcmMessage)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we should use the queue
|
||||
if (this.smsQueueService.isQueueEnabled()) {
|
||||
try {
|
||||
const response = await firebaseAdmin.messaging().sendEach(fcmMessages)
|
||||
// Add to queue
|
||||
await this.smsQueueService.addSendSmsJob(
|
||||
deviceId,
|
||||
fcmMessages,
|
||||
smsBatch._id.toString(),
|
||||
)
|
||||
|
||||
return {
|
||||
success: true,
|
||||
message: 'Bulk SMS added to queue for processing',
|
||||
smsBatchId: smsBatch._id,
|
||||
recipientCount: messages.map((m) => m.recipients).flat().length,
|
||||
}
|
||||
} catch (e) {
|
||||
// Update batch status to failed
|
||||
await this.smsBatchModel.findByIdAndUpdate(smsBatch._id, {
|
||||
$set: {
|
||||
status: 'failed',
|
||||
error: e.message,
|
||||
successCount: 0,
|
||||
failureCount: fcmMessages.length,
|
||||
},
|
||||
})
|
||||
|
||||
// Update all SMS in batch to failed
|
||||
await this.smsModel.updateMany(
|
||||
{ smsBatch: smsBatch._id },
|
||||
{ $set: { status: 'failed', error: e.message } },
|
||||
)
|
||||
|
||||
throw new HttpException(
|
||||
{
|
||||
success: false,
|
||||
error: 'Failed to add bulk SMS to queue',
|
||||
additionalInfo: e,
|
||||
},
|
||||
HttpStatus.INTERNAL_SERVER_ERROR,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const fcmMessagesBatches = fcmMessages.map((m) => [m])
|
||||
const fcmResponses: BatchResponse[] = []
|
||||
|
||||
for (const batch of fcmMessagesBatches) {
|
||||
try {
|
||||
const response = await firebaseAdmin.messaging().sendEach(batch)
|
||||
|
||||
console.log(response)
|
||||
fcmResponses.push(response)
|
||||
@@ -346,9 +459,27 @@ export class GatewayService {
|
||||
console.log('Failed to update sentSMSCount')
|
||||
console.log(e)
|
||||
})
|
||||
|
||||
this.smsBatchModel
|
||||
.findByIdAndUpdate(smsBatch._id, {
|
||||
$set: { status: 'completed' },
|
||||
})
|
||||
.exec()
|
||||
.catch((e) => {
|
||||
console.error('failed to update sms batch status to completed')
|
||||
})
|
||||
} catch (e) {
|
||||
console.log('Failed to send SMS: FCM')
|
||||
console.log(e)
|
||||
|
||||
this.smsBatchModel
|
||||
.findByIdAndUpdate(smsBatch._id, {
|
||||
$set: { status: 'failed', error: e.message },
|
||||
})
|
||||
.exec()
|
||||
.catch((e) => {
|
||||
console.error('failed to update sms batch status to failed')
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -438,7 +569,11 @@ export class GatewayService {
|
||||
return sms
|
||||
}
|
||||
|
||||
async getReceivedSMS(deviceId: string, page = 1, limit = 50): Promise<{ data: any[], meta: any }> {
|
||||
async getReceivedSMS(
|
||||
deviceId: string,
|
||||
page = 1,
|
||||
limit = 50,
|
||||
): Promise<{ data: any[]; meta: any }> {
|
||||
const device = await this.deviceModel.findById(deviceId)
|
||||
|
||||
if (!device) {
|
||||
@@ -452,13 +587,13 @@ export class GatewayService {
|
||||
}
|
||||
|
||||
// Calculate skip value for pagination
|
||||
const skip = (page - 1) * limit;
|
||||
const skip = (page - 1) * limit
|
||||
|
||||
// Get total count for pagination metadata
|
||||
const total = await this.smsModel.countDocuments({
|
||||
device: device._id,
|
||||
type: SMSType.RECEIVED,
|
||||
});
|
||||
})
|
||||
|
||||
// @ts-ignore
|
||||
const data = await this.smsModel
|
||||
@@ -468,10 +603,10 @@ export class GatewayService {
|
||||
type: SMSType.RECEIVED,
|
||||
},
|
||||
null,
|
||||
{
|
||||
sort: { receivedAt: -1 },
|
||||
{
|
||||
sort: { receivedAt: -1 },
|
||||
limit: limit,
|
||||
skip: skip
|
||||
skip: skip,
|
||||
},
|
||||
)
|
||||
.populate({
|
||||
@@ -481,8 +616,8 @@ export class GatewayService {
|
||||
.lean() // Use lean() to return plain JavaScript objects instead of Mongoose documents
|
||||
|
||||
// Calculate pagination metadata
|
||||
const totalPages = Math.ceil(total / limit);
|
||||
|
||||
const totalPages = Math.ceil(total / limit)
|
||||
|
||||
return {
|
||||
meta: {
|
||||
page,
|
||||
@@ -491,10 +626,15 @@ export class GatewayService {
|
||||
totalPages,
|
||||
},
|
||||
data,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async getMessages(deviceId: string, type = '', page = 1, limit = 50): Promise<{ data: any[], meta: any }> {
|
||||
async getMessages(
|
||||
deviceId: string,
|
||||
type = '',
|
||||
page = 1,
|
||||
limit = 50,
|
||||
): Promise<{ data: any[]; meta: any }> {
|
||||
const device = await this.deviceModel.findById(deviceId)
|
||||
|
||||
if (!device) {
|
||||
@@ -508,32 +648,28 @@ export class GatewayService {
|
||||
}
|
||||
|
||||
// Calculate skip value for pagination
|
||||
const skip = (page - 1) * limit;
|
||||
const skip = (page - 1) * limit
|
||||
|
||||
// Build query based on type filter
|
||||
const query: any = { device: device._id };
|
||||
|
||||
const query: any = { device: device._id }
|
||||
|
||||
if (type === 'sent') {
|
||||
query.type = SMSType.SENT;
|
||||
query.type = SMSType.SENT
|
||||
} else if (type === 'received') {
|
||||
query.type = SMSType.RECEIVED;
|
||||
query.type = SMSType.RECEIVED
|
||||
}
|
||||
|
||||
// Get total count for pagination metadata
|
||||
const total = await this.smsModel.countDocuments(query);
|
||||
const total = await this.smsModel.countDocuments(query)
|
||||
|
||||
// @ts-ignore
|
||||
const data = await this.smsModel
|
||||
.find(
|
||||
query,
|
||||
null,
|
||||
{
|
||||
// Sort by the most recent timestamp (receivedAt for received, sentAt for sent)
|
||||
sort: { createdAt: -1 },
|
||||
limit: limit,
|
||||
skip: skip
|
||||
},
|
||||
)
|
||||
.find(query, null, {
|
||||
// Sort by the most recent timestamp (receivedAt for received, sentAt for sent)
|
||||
sort: { createdAt: -1 },
|
||||
limit: limit,
|
||||
skip: skip,
|
||||
})
|
||||
.populate({
|
||||
path: 'device',
|
||||
select: '_id brand model buildId enabled',
|
||||
@@ -541,8 +677,8 @@ export class GatewayService {
|
||||
.lean() // Use lean() to return plain JavaScript objects instead of Mongoose documents
|
||||
|
||||
// Calculate pagination metadata
|
||||
const totalPages = Math.ceil(total / limit);
|
||||
|
||||
const totalPages = Math.ceil(total / limit)
|
||||
|
||||
return {
|
||||
meta: {
|
||||
page,
|
||||
@@ -551,7 +687,7 @@ export class GatewayService {
|
||||
totalPages,
|
||||
},
|
||||
data,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async getStatsForUser(user: User) {
|
||||
|
||||
102
api/src/gateway/queue/sms-queue.processor.ts
Normal file
102
api/src/gateway/queue/sms-queue.processor.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
import { Process, Processor } from '@nestjs/bull'
|
||||
import { InjectModel } from '@nestjs/mongoose'
|
||||
import { Job } from 'bull'
|
||||
import { Model } from 'mongoose'
|
||||
import * as firebaseAdmin from 'firebase-admin'
|
||||
import { Device } from '../schemas/device.schema'
|
||||
import { SMS } from '../schemas/sms.schema'
|
||||
import { SMSBatch } from '../schemas/sms-batch.schema'
|
||||
import { WebhookService } from 'src/webhook/webhook.service'
|
||||
import { Logger } from '@nestjs/common'
|
||||
|
||||
@Processor('sms')
|
||||
export class SmsQueueProcessor {
|
||||
private readonly logger = new Logger(SmsQueueProcessor.name)
|
||||
|
||||
constructor(
|
||||
@InjectModel(Device.name) private deviceModel: Model<Device>,
|
||||
@InjectModel(SMS.name) private smsModel: Model<SMS>,
|
||||
@InjectModel(SMSBatch.name) private smsBatchModel: Model<SMSBatch>,
|
||||
private webhookService: WebhookService,
|
||||
) {}
|
||||
|
||||
@Process({
|
||||
name: 'send-sms',
|
||||
concurrency: 10,
|
||||
})
|
||||
async handleSendSms(job: Job<any>) {
|
||||
this.logger.debug(`Processing send-sms job ${job.id}`)
|
||||
const { deviceId, fcmMessages, smsBatchId } = job.data
|
||||
|
||||
try {
|
||||
this.smsBatchModel
|
||||
.findByIdAndUpdate(smsBatchId, {
|
||||
$set: { status: 'processing' },
|
||||
})
|
||||
.exec()
|
||||
.catch((error) => {
|
||||
this.logger.error(
|
||||
`Failed to update sms batch status to processing ${smsBatchId}`,
|
||||
error,
|
||||
)
|
||||
throw error
|
||||
})
|
||||
|
||||
const response = await firebaseAdmin.messaging().sendEach(fcmMessages)
|
||||
|
||||
this.logger.debug(
|
||||
`SMS Job ${job.id} completed, success: ${response.successCount}, failures: ${response.failureCount}`,
|
||||
)
|
||||
|
||||
// Update device SMS count
|
||||
await this.deviceModel
|
||||
.findByIdAndUpdate(deviceId, {
|
||||
$inc: { sentSMSCount: response.successCount },
|
||||
})
|
||||
.exec()
|
||||
|
||||
// Update batch status
|
||||
const smsBatch = await this.smsBatchModel.findByIdAndUpdate(
|
||||
smsBatchId,
|
||||
{
|
||||
$inc: {
|
||||
successCount: response.successCount,
|
||||
failureCount: response.failureCount,
|
||||
},
|
||||
},
|
||||
{ returnDocument: 'after' },
|
||||
)
|
||||
|
||||
if (smsBatch.successCount === smsBatch.recipientCount) {
|
||||
await this.smsBatchModel.findByIdAndUpdate(smsBatchId, {
|
||||
$set: { status: 'completed' },
|
||||
})
|
||||
}
|
||||
|
||||
return response
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to process SMS job ${job.id}`, error)
|
||||
|
||||
const smsBatch = await this.smsBatchModel.findByIdAndUpdate(
|
||||
smsBatchId,
|
||||
{
|
||||
$inc: {
|
||||
failureCount: fcmMessages.length,
|
||||
},
|
||||
},
|
||||
{ returnDocument: 'after' },
|
||||
)
|
||||
|
||||
const newStatus =
|
||||
smsBatch.failureCount === smsBatch.recipientCount
|
||||
? 'failed'
|
||||
: 'partial_success'
|
||||
|
||||
await this.smsBatchModel.findByIdAndUpdate(smsBatchId, {
|
||||
$set: { status: newStatus },
|
||||
})
|
||||
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
66
api/src/gateway/queue/sms-queue.service.ts
Normal file
66
api/src/gateway/queue/sms-queue.service.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import { Injectable, Logger } from '@nestjs/common'
|
||||
import { InjectQueue } from '@nestjs/bull'
|
||||
import { Queue } from 'bull'
|
||||
import { ConfigService } from '@nestjs/config'
|
||||
import { Message } from 'firebase-admin/messaging'
|
||||
|
||||
@Injectable()
|
||||
export class SmsQueueService {
|
||||
private readonly logger = new Logger(SmsQueueService.name)
|
||||
private readonly useSmsQueue: boolean
|
||||
private readonly maxSmsBatchSize: number
|
||||
|
||||
constructor(
|
||||
@InjectQueue('sms') private readonly smsQueue: Queue,
|
||||
private readonly configService: ConfigService,
|
||||
) {
|
||||
this.useSmsQueue = this.configService.get<boolean>('USE_SMS_QUEUE', false)
|
||||
this.maxSmsBatchSize = this.configService.get<number>(
|
||||
'MAX_SMS_BATCH_SIZE',
|
||||
5,
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if queue is enabled based on environment variable
|
||||
*/
|
||||
isQueueEnabled(): boolean {
|
||||
return this.useSmsQueue
|
||||
}
|
||||
|
||||
async addSendSmsJob(
|
||||
deviceId: string,
|
||||
fcmMessages: Message[],
|
||||
smsBatchId: string,
|
||||
) {
|
||||
this.logger.debug(`Adding send-sms job for batch ${smsBatchId}`)
|
||||
|
||||
// Split messages into batches of max smsBatchSize messages
|
||||
const batches = []
|
||||
for (let i = 0; i < fcmMessages.length; i += this.maxSmsBatchSize) {
|
||||
batches.push(fcmMessages.slice(i, i + this.maxSmsBatchSize))
|
||||
}
|
||||
|
||||
for (const batch of batches) {
|
||||
await this.smsQueue.add(
|
||||
'send-sms',
|
||||
{
|
||||
deviceId,
|
||||
fcmMessages: batch,
|
||||
smsBatchId,
|
||||
},
|
||||
{
|
||||
priority: 1, // TODO: Make this dynamic based on users subscription plan
|
||||
attempts: 1,
|
||||
delay: 1000, // 1 second
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 5000, // 5 seconds
|
||||
},
|
||||
removeOnComplete: false,
|
||||
removeOnFail: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,21 @@ export class SMSBatch {
|
||||
@Prop({ type: String })
|
||||
recipientPreview: string
|
||||
|
||||
@Prop({ type: Number, default: 0 })
|
||||
successCount: number
|
||||
|
||||
@Prop({ type: Number, default: 0 })
|
||||
failureCount: number
|
||||
|
||||
@Prop({ type: String, default: 'pending', enum: ['pending', 'processing', 'completed', 'partial_success', 'failed'] })
|
||||
status: string
|
||||
|
||||
@Prop({ type: String })
|
||||
error: string
|
||||
|
||||
@Prop({ type: Date })
|
||||
completedAt: Date
|
||||
|
||||
// misc metadata for debugging
|
||||
@Prop({ type: Object })
|
||||
metadata: Record<string, any>
|
||||
|
||||
@@ -53,8 +53,11 @@ export class SMS {
|
||||
// @Prop({ type: String })
|
||||
// failureReason: string
|
||||
|
||||
// @Prop({ type: String })
|
||||
// status: string
|
||||
@Prop({ type: String, default: 'pending', enum: ['pending', 'sent', 'delivered', 'failed'] })
|
||||
status: string
|
||||
|
||||
@Prop({ type: String })
|
||||
error: string
|
||||
|
||||
// misc metadata for debugging
|
||||
@Prop({ type: Object })
|
||||
|
||||
Reference in New Issue
Block a user