messaging: Microsoft driver migrate p-limit to native batching (#21132)

This PR migrates the p-limit library to Native graph SDK batching fixing
the concurrency and rate limit issues in production seen for some larger
accounts
This commit is contained in:
neo773
2026-06-08 22:07:37 +05:30
committed by GitHub
parent 772b807490
commit 296c202be4
8 changed files with 204 additions and 65 deletions

View File

@@ -55,9 +55,8 @@ export class MicrosoftGetAllFoldersService implements MessageFolderDriver {
this.logger.error(
`Connected account ${connectedAccount.id}: Error fetching folders: ${error.message}`,
);
this.microsoftMessageListFetchErrorHandler.handleError(error);
return { value: [] };
return this.microsoftMessageListFetchErrorHandler.handleError(error);
});
const folders = (response.value as MicrosoftGraphFolder[]) || [];

View File

@@ -42,10 +42,21 @@ describe('MicrosoftGetMessageListService', () => {
api: jest.fn().mockReturnThis(),
version: jest.fn().mockReturnThis(),
headers: jest.fn().mockReturnThis(),
get: jest.fn().mockResolvedValue({
value: [{ id: 'msg-1' }, { id: 'msg-2' }],
'@odata.deltaLink': 'https://graph.microsoft.com/delta?token=abc',
}),
post: jest
.fn()
.mockImplementation((batchRequestBody: { requests: { id: string }[] }) =>
Promise.resolve({
responses: batchRequestBody.requests.map((request) => ({
id: request.id,
status: 200,
body: {
value: [{ id: 'msg-1' }, { id: 'msg-2' }],
'@odata.deltaLink':
'https://graph.microsoft.com/beta/delta?token=abc',
},
})),
}),
),
});
beforeEach(async () => {

View File

@@ -1,29 +1,37 @@
import { Injectable, Logger } from '@nestjs/common';
import {
type BatchRequestData,
type Client,
PageIterator,
type PageCollection,
type PageIteratorCallback,
} from '@microsoft/microsoft-graph-client';
import { isNonEmptyString } from '@sniptt/guards';
import pLimit from 'p-limit';
import { MessageFolderImportPolicy } from 'twenty-shared/types';
import { MicrosoftOAuth2ClientProvider } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client.provider';
import { type ConnectedAccountEntity } from 'src/engine/metadata-modules/connected-account/entities/connected-account.entity';
import { type MessageFolderEntity } from 'src/engine/metadata-modules/message-folder/entities/message-folder.entity';
import { MicrosoftMessageListFetchErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-message-list-fetch-error-handler.service';
import { type MicrosoftGraphBatchResponse } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-batch-response.type';
import { type MicrosoftGraphDeltaListResponseBody } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-delta-list-response-body.type';
import { toRelativeGraphUrl } from 'src/modules/messaging/message-import-manager/drivers/microsoft/utils/to-relative-graph-url.util';
import { type GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type';
import {
type GetMessageListsResponse,
type GetOneMessageListResponse,
} from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type';
import { isDefined } from 'twenty-shared/utils';
// Microsoft API limit is 999 messages per request on this endpoint
const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 999;
const MESSAGE_LIST_PREFER_HEADER = `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`;
const MICROSOFT_GRAPH_BATCH_LIMIT = 20;
/* reference: https://learn.microsoft.com/en-us/graph/throttling-limits#limits-per-mailbox */
const FOLDER_PROCESSING_CONCURRENCY = 4;
type FolderToProcess = Pick<
MessageFolderEntity,
'id' | 'name' | 'syncCursor' | 'externalId'
>;
@Injectable()
export class MicrosoftGetMessageListService {
@@ -52,59 +60,108 @@ export class MicrosoftGetMessageListService {
return [];
}
const limit = pLimit(FOLDER_PROCESSING_CONCURRENCY);
const results = await Promise.all(
foldersToProcess.map((folder) =>
limit(async () => {
const response = await this.getMessageList(connectedAccount, folder);
return {
...response,
folderId: folder.id,
};
}),
),
);
return results;
}
public async getMessageList(
connectedAccount: Pick<ConnectedAccountEntity, 'provider' | 'id'>,
messageFolder: Pick<
MessageFolderEntity,
'name' | 'syncCursor' | 'externalId'
>,
): Promise<GetOneMessageListResponse> {
const messageExternalIds: string[] = [];
const messageExternalIdsToDelete: string[] = [];
const microsoftClient = await this.microsoftOAuth2ClientProvider.getClient(
connectedAccount.id,
);
const folderId = messageFolder.externalId || messageFolder.name;
const apiUrl = isNonEmptyString(messageFolder.syncCursor)
? messageFolder.syncCursor
: `/me/mailfolders/${folderId}/messages/delta?$select=id`;
const results: GetMessageListsResponse = [];
const response: PageCollection = await microsoftClient
.api(apiUrl)
.version('beta')
.headers({
Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`,
})
.get()
.catch((error) => {
this.logger.error(
`Connected account ${connectedAccount.id}: Error fetching message list: ${JSON.stringify(error)}`,
for (
let batchStart = 0;
batchStart < foldersToProcess.length;
batchStart += MICROSOFT_GRAPH_BATCH_LIMIT
) {
const foldersBatch = foldersToProcess.slice(
batchStart,
batchStart + MICROSOFT_GRAPH_BATCH_LIMIT,
);
const batchResults = await this.getMessageListBatch(
microsoftClient,
foldersBatch,
);
results.push(...batchResults);
}
return results;
}
private async getMessageListBatch(
microsoftClient: Client,
foldersBatch: FolderToProcess[],
): Promise<GetOneMessageListResponse[]> {
const folderByRequestId = new Map<string, FolderToProcess>();
const requests: BatchRequestData[] = foldersBatch.map((folder, index) => {
const requestId = (index + 1).toString();
folderByRequestId.set(requestId, folder);
return {
id: requestId,
method: 'GET',
url: this.buildInitialDeltaUrl(folder),
headers: { Prefer: MESSAGE_LIST_PREFER_HEADER },
};
});
const batchResponse: MicrosoftGraphBatchResponse<MicrosoftGraphDeltaListResponseBody> =
await microsoftClient
.api('/$batch')
.version('beta')
.post({ requests })
.catch((error: unknown) =>
this.microsoftMessageListFetchErrorHandler.handleError(error),
);
this.microsoftMessageListFetchErrorHandler.handleError(error);
});
const callback: PageIteratorCallback = (data) => {
if (data['@removed']) {
const results: GetOneMessageListResponse[] = [];
for (const response of batchResponse.responses) {
const folder = folderByRequestId.get(response.id);
if (!isDefined(folder)) {
throw new Error(
`Microsoft batch response references unknown request id ${response.id}`,
);
}
if (response.status !== 200) {
this.microsoftMessageListFetchErrorHandler.handleError({
statusCode: response.status,
message: response.body?.error?.message,
code: response.body?.error?.code,
});
}
results.push(
await this.iterateFolderPages(microsoftClient, folder, response.body),
);
}
return results;
}
private buildInitialDeltaUrl(folder: FolderToProcess): string {
if (isNonEmptyString(folder.syncCursor)) {
return toRelativeGraphUrl(folder.syncCursor);
}
const folderId = folder.externalId || folder.name;
return `/me/mailfolders/${folderId}/messages/delta?$select=id`;
}
private async iterateFolderPages(
microsoftClient: Client,
folder: FolderToProcess,
firstPage?: MicrosoftGraphDeltaListResponseBody,
): Promise<GetOneMessageListResponse> {
const messageExternalIds: string[] = [];
const messageExternalIdsToDelete: string[] = [];
const callback: PageIteratorCallback = (data: { id: string }) => {
if ('@removed' in data) {
messageExternalIdsToDelete.push(data.id);
} else {
messageExternalIds.push(data.id);
@@ -113,22 +170,29 @@ export class MicrosoftGetMessageListService {
return true;
};
const pageIterator = new PageIterator(microsoftClient, response, callback, {
headers: {
Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`,
},
});
const pageCollection: PageCollection = {
value: firstPage?.value ?? [],
'@odata.nextLink': firstPage?.['@odata.nextLink'],
'@odata.deltaLink': firstPage?.['@odata.deltaLink'],
};
await pageIterator.iterate().catch((error) => {
const pageIterator = new PageIterator(
microsoftClient,
pageCollection,
callback,
{ headers: { Prefer: MESSAGE_LIST_PREFER_HEADER } },
);
await pageIterator.iterate().catch((error: unknown) => {
this.microsoftMessageListFetchErrorHandler.handleError(error);
});
return {
messageExternalIds,
messageExternalIdsToDelete,
previousSyncCursor: messageFolder.syncCursor,
previousSyncCursor: folder.syncCursor,
nextSyncCursor: pageIterator.getDeltaLink() || '',
folderId: undefined,
folderId: folder.id,
};
}
}

View File

@@ -14,7 +14,7 @@ export class MicrosoftMessageListFetchErrorHandler {
) {}
// oxlint-disable-next-line typescript/no-explicit-any
public handleError(error: any): void {
public handleError(error: any): never {
this.logger.error(`Error fetching message list: ${JSON.stringify(error)}`);
const networkError = this.microsoftNetworkErrorHandler.handleError(error);

View File

@@ -0,0 +1,7 @@
export type MicrosoftGraphBatchResponse<TBody> = {
responses: {
id: string;
status: number;
body?: TBody;
}[];
};

View File

@@ -0,0 +1,6 @@
export type MicrosoftGraphDeltaListResponseBody = {
value?: { id?: string; '@removed'?: { reason?: string } }[];
'@odata.nextLink'?: string;
'@odata.deltaLink'?: string;
error?: { code?: string; message?: string };
};

View File

@@ -0,0 +1,31 @@
import { toRelativeGraphUrl } from 'src/modules/messaging/message-import-manager/drivers/microsoft/utils/to-relative-graph-url.util';
describe('toRelativeGraphUrl', () => {
it('strips origin and beta version segment from an absolute deltaLink', () => {
expect(
toRelativeGraphUrl(
'https://graph.microsoft.com/beta/me/mailfolders/inbox-id/messages/delta?$deltatoken=abc',
),
).toBe('/me/mailfolders/inbox-id/messages/delta?$deltatoken=abc');
});
it('strips the v1.0 version segment', () => {
expect(
toRelativeGraphUrl('https://graph.microsoft.com/v1.0/me/messages'),
).toBe('/me/messages');
});
it('returns an already-relative url unchanged', () => {
expect(toRelativeGraphUrl('/me/mailfolders/inbox-id/messages/delta')).toBe(
'/me/mailfolders/inbox-id/messages/delta',
);
});
it('preserves query parameters on the relative url', () => {
expect(
toRelativeGraphUrl(
'https://graph.microsoft.com/beta/me/messages?$select=id&$top=999',
),
).toBe('/me/messages?$select=id&$top=999');
});
});

View File

@@ -0,0 +1,21 @@
import { isNonEmptyString } from '@sniptt/guards';
const GRAPH_VERSION_SEGMENTS = ['beta', 'v1.0'];
export const toRelativeGraphUrl = (url: string): string => {
if (!isNonEmptyString(url) || !url.startsWith('http')) {
return url;
}
const parsedUrl = new URL(url);
const pathSegments = parsedUrl.pathname.split('/').filter(isNonEmptyString);
if (
pathSegments.length > 0 &&
GRAPH_VERSION_SEGMENTS.includes(pathSegments[0])
) {
pathSegments.shift();
}
return `/${pathSegments.join('/')}${parsedUrl.search}`;
};