mirror of
https://github.com/vernu/textbee.git
synced 2026-02-19 23:26:14 -05:00
Merge pull request #172 from vernu/feat/delayedsms
delayed sms sending feature
This commit is contained in:
@@ -101,6 +101,14 @@ export class SMSData {
|
||||
})
|
||||
simSubscriptionId?: number
|
||||
|
||||
@ApiProperty({
|
||||
type: String,
|
||||
required: false,
|
||||
description: 'Optional ISO 8601 date string to schedule SMS for future delivery (e.g., "2024-01-15T10:30:00Z"). Must be a future date.',
|
||||
example: '2024-01-15T10:30:00Z',
|
||||
})
|
||||
scheduledAt?: string
|
||||
|
||||
// TODO: restructure the Payload such that it contains bactchId, smsId, recipients and message in an optimized way
|
||||
// message: string
|
||||
// bactchId: string
|
||||
|
||||
@@ -126,6 +126,55 @@ export class GatewayService {
|
||||
// return await this.deviceModel.findByIdAndDelete(deviceId)
|
||||
}
|
||||
|
||||
private calculateDelayFromScheduledAt(scheduledAt?: string): number | undefined {
|
||||
if (!scheduledAt) {
|
||||
return undefined
|
||||
}
|
||||
|
||||
try {
|
||||
const scheduledDate = new Date(scheduledAt)
|
||||
|
||||
// Check if date is valid
|
||||
if (isNaN(scheduledDate.getTime())) {
|
||||
throw new HttpException(
|
||||
{
|
||||
success: false,
|
||||
error: 'Invalid scheduledAt format. Must be a valid ISO 8601 date string.',
|
||||
},
|
||||
HttpStatus.BAD_REQUEST,
|
||||
)
|
||||
}
|
||||
|
||||
const now = Date.now()
|
||||
const scheduledTime = scheduledDate.getTime()
|
||||
const delayMs = scheduledTime - now
|
||||
|
||||
// Reject past dates
|
||||
if (delayMs < 0) {
|
||||
throw new HttpException(
|
||||
{
|
||||
success: false,
|
||||
error: 'scheduledAt must be a future date',
|
||||
},
|
||||
HttpStatus.BAD_REQUEST,
|
||||
)
|
||||
}
|
||||
|
||||
return delayMs
|
||||
} catch (error) {
|
||||
if (error instanceof HttpException) {
|
||||
throw error
|
||||
}
|
||||
throw new HttpException(
|
||||
{
|
||||
success: false,
|
||||
error: 'Invalid scheduledAt format. Must be a valid ISO 8601 date string.',
|
||||
},
|
||||
HttpStatus.BAD_REQUEST,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async sendSMS(deviceId: string, smsData: SendSMSInputDTO): Promise<any> {
|
||||
const device = await this.deviceModel.findById(deviceId)
|
||||
|
||||
@@ -162,6 +211,20 @@ export class GatewayService {
|
||||
)
|
||||
}
|
||||
|
||||
// Calculate delay from scheduledAt if provided
|
||||
const delayMs = this.calculateDelayFromScheduledAt(smsData.scheduledAt)
|
||||
|
||||
// Validate that scheduling requires queue to be enabled
|
||||
if (delayMs !== undefined && !this.smsQueueService.isQueueEnabled()) {
|
||||
throw new HttpException(
|
||||
{
|
||||
success: false,
|
||||
error: 'SMS scheduling requires queue to be enabled',
|
||||
},
|
||||
HttpStatus.BAD_REQUEST,
|
||||
)
|
||||
}
|
||||
|
||||
await this.billingService.canPerformAction(
|
||||
device.user.toString(),
|
||||
'send_sms',
|
||||
@@ -247,6 +310,7 @@ export class GatewayService {
|
||||
deviceId,
|
||||
fcmMessages,
|
||||
smsBatch._id.toString(),
|
||||
delayMs,
|
||||
)
|
||||
|
||||
return {
|
||||
@@ -367,6 +431,18 @@ export class GatewayService {
|
||||
body.messages.map((m) => m.recipients).flat().length,
|
||||
)
|
||||
|
||||
// Check if any message has scheduledAt and validate queue is enabled
|
||||
const hasScheduledMessages = body.messages.some((m) => m.scheduledAt)
|
||||
if (hasScheduledMessages && !this.smsQueueService.isQueueEnabled()) {
|
||||
throw new HttpException(
|
||||
{
|
||||
success: false,
|
||||
error: 'SMS scheduling requires queue to be enabled',
|
||||
},
|
||||
HttpStatus.BAD_REQUEST,
|
||||
)
|
||||
}
|
||||
|
||||
const { messageTemplate, messages } = body
|
||||
|
||||
const smsBatch = await this.smsBatchModel.create({
|
||||
@@ -381,7 +457,8 @@ export class GatewayService {
|
||||
status: 'pending',
|
||||
})
|
||||
|
||||
const fcmMessages: Message[] = []
|
||||
// Track FCM messages with their calculated delays for grouping
|
||||
const fcmMessagesWithDelays: Array<{ message: Message; delayMs?: number }> = []
|
||||
|
||||
for (const smsData of messages) {
|
||||
const message = smsData.message
|
||||
@@ -395,6 +472,9 @@ export class GatewayService {
|
||||
continue
|
||||
}
|
||||
|
||||
// Calculate delay for this message's scheduledAt
|
||||
const delayMs = this.calculateDelayFromScheduledAt(smsData.scheduledAt)
|
||||
|
||||
for (let recipient of recipients) {
|
||||
recipient = recipient.replace(/\s+/g, "")
|
||||
const sms = await this.smsModel.create({
|
||||
@@ -433,19 +513,32 @@ export class GatewayService {
|
||||
priority: 'high',
|
||||
},
|
||||
}
|
||||
fcmMessages.push(fcmMessage)
|
||||
fcmMessagesWithDelays.push({ message: fcmMessage, delayMs })
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we should use the queue
|
||||
if (this.smsQueueService.isQueueEnabled()) {
|
||||
try {
|
||||
// Add to queue
|
||||
await this.smsQueueService.addSendSmsJob(
|
||||
deviceId,
|
||||
fcmMessages,
|
||||
smsBatch._id.toString(),
|
||||
)
|
||||
// Group messages by delay (undefined delay means immediate, group together)
|
||||
const messagesByDelay = new Map<number | undefined, Message[]>()
|
||||
for (const { message, delayMs } of fcmMessagesWithDelays) {
|
||||
const delayKey = delayMs !== undefined ? delayMs : undefined
|
||||
if (!messagesByDelay.has(delayKey)) {
|
||||
messagesByDelay.set(delayKey, [])
|
||||
}
|
||||
messagesByDelay.get(delayKey)!.push(message)
|
||||
}
|
||||
|
||||
// Queue each group with its respective delay
|
||||
for (const [delayMs, messages] of messagesByDelay.entries()) {
|
||||
await this.smsQueueService.addSendSmsJob(
|
||||
deviceId,
|
||||
messages,
|
||||
smsBatch._id.toString(),
|
||||
delayMs,
|
||||
)
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
@@ -460,7 +553,7 @@ export class GatewayService {
|
||||
status: 'failed',
|
||||
error: e.message,
|
||||
successCount: 0,
|
||||
failureCount: fcmMessages.length,
|
||||
failureCount: fcmMessagesWithDelays.length,
|
||||
},
|
||||
})
|
||||
|
||||
@@ -481,6 +574,8 @@ export class GatewayService {
|
||||
}
|
||||
}
|
||||
|
||||
// For non-queue path, convert back to simple array
|
||||
const fcmMessages = fcmMessagesWithDelays.map(({ message }) => message)
|
||||
const fcmMessagesBatches = fcmMessages.map((m) => [m])
|
||||
const fcmResponses: BatchResponse[] = []
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ export class SmsQueueService {
|
||||
deviceId: string,
|
||||
fcmMessages: Message[],
|
||||
smsBatchId: string,
|
||||
delayMs?: number,
|
||||
) {
|
||||
this.logger.debug(`Adding send-sms job for batch ${smsBatchId}`)
|
||||
|
||||
@@ -41,8 +42,13 @@ export class SmsQueueService {
|
||||
batches.push(fcmMessages.slice(i, i + this.maxSmsBatchSize))
|
||||
}
|
||||
|
||||
// If delayMs is provided, use it for all batches (scheduled send)
|
||||
// Otherwise, use the existing delay multiplier logic
|
||||
const useScheduledDelay = delayMs !== undefined && delayMs >= 0
|
||||
|
||||
let delayMultiplier = 1;
|
||||
for (const batch of batches) {
|
||||
const delay = useScheduledDelay ? delayMs : 1000 * delayMultiplier++
|
||||
await this.smsQueue.add(
|
||||
'send-sms',
|
||||
{
|
||||
@@ -53,7 +59,7 @@ export class SmsQueueService {
|
||||
{
|
||||
priority: 1, // TODO: Make this dynamic based on users subscription plan
|
||||
attempts: 1,
|
||||
delay: 1000 * delayMultiplier++,
|
||||
delay: delay,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 5000, // 5 seconds
|
||||
|
||||
Reference in New Issue
Block a user