diff --git a/packages/twenty-server/src/modules/messaging/message-folder-manager/drivers/microsoft/services/microsoft-get-all-folders.service.ts b/packages/twenty-server/src/modules/messaging/message-folder-manager/drivers/microsoft/services/microsoft-get-all-folders.service.ts index d926125f078..60eab11e92e 100644 --- a/packages/twenty-server/src/modules/messaging/message-folder-manager/drivers/microsoft/services/microsoft-get-all-folders.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-folder-manager/drivers/microsoft/services/microsoft-get-all-folders.service.ts @@ -55,9 +55,8 @@ export class MicrosoftGetAllFoldersService implements MessageFolderDriver { this.logger.error( `Connected account ${connectedAccount.id}: Error fetching folders: ${error.message}`, ); - this.microsoftMessageListFetchErrorHandler.handleError(error); - return { value: [] }; + return this.microsoftMessageListFetchErrorHandler.handleError(error); }); const folders = (response.value as MicrosoftGraphFolder[]) || []; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.spec.ts index e8184ea0b44..6a1b35a753c 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.spec.ts @@ -42,10 +42,21 @@ describe('MicrosoftGetMessageListService', () => { api: jest.fn().mockReturnThis(), version: jest.fn().mockReturnThis(), headers: jest.fn().mockReturnThis(), - get: jest.fn().mockResolvedValue({ - value: [{ id: 'msg-1' }, { id: 'msg-2' }], - '@odata.deltaLink': 'https://graph.microsoft.com/delta?token=abc', - }), + post: jest + .fn() + .mockImplementation((batchRequestBody: { requests: { id: string }[] }) => + Promise.resolve({ + responses: batchRequestBody.requests.map((request) => ({ + id: request.id, + status: 200, + body: { + value: [{ id: 'msg-1' }, { id: 'msg-2' }], + '@odata.deltaLink': + 'https://graph.microsoft.com/beta/delta?token=abc', + }, + })), + }), + ), }); beforeEach(async () => { diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts index de4997b0af8..1cbe2707119 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts @@ -1,29 +1,37 @@ import { Injectable, Logger } from '@nestjs/common'; import { + type BatchRequestData, + type Client, PageIterator, type PageCollection, type PageIteratorCallback, } from '@microsoft/microsoft-graph-client'; import { isNonEmptyString } from '@sniptt/guards'; -import pLimit from 'p-limit'; import { MessageFolderImportPolicy } from 'twenty-shared/types'; import { MicrosoftOAuth2ClientProvider } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client.provider'; -import { type ConnectedAccountEntity } from 'src/engine/metadata-modules/connected-account/entities/connected-account.entity'; import { type MessageFolderEntity } from 'src/engine/metadata-modules/message-folder/entities/message-folder.entity'; import { MicrosoftMessageListFetchErrorHandler } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-message-list-fetch-error-handler.service'; +import { type MicrosoftGraphBatchResponse } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-batch-response.type'; +import { type MicrosoftGraphDeltaListResponseBody } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-delta-list-response-body.type'; +import { toRelativeGraphUrl } from 'src/modules/messaging/message-import-manager/drivers/microsoft/utils/to-relative-graph-url.util'; import { type GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type'; import { type GetMessageListsResponse, type GetOneMessageListResponse, } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; +import { isDefined } from 'twenty-shared/utils'; // Microsoft API limit is 999 messages per request on this endpoint const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 999; +const MESSAGE_LIST_PREFER_HEADER = `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`; +const MICROSOFT_GRAPH_BATCH_LIMIT = 20; -/* reference: https://learn.microsoft.com/en-us/graph/throttling-limits#limits-per-mailbox */ -const FOLDER_PROCESSING_CONCURRENCY = 4; +type FolderToProcess = Pick< + MessageFolderEntity, + 'id' | 'name' | 'syncCursor' | 'externalId' +>; @Injectable() export class MicrosoftGetMessageListService { @@ -52,59 +60,108 @@ export class MicrosoftGetMessageListService { return []; } - const limit = pLimit(FOLDER_PROCESSING_CONCURRENCY); - - const results = await Promise.all( - foldersToProcess.map((folder) => - limit(async () => { - const response = await this.getMessageList(connectedAccount, folder); - - return { - ...response, - folderId: folder.id, - }; - }), - ), - ); - - return results; - } - - public async getMessageList( - connectedAccount: Pick, - messageFolder: Pick< - MessageFolderEntity, - 'name' | 'syncCursor' | 'externalId' - >, - ): Promise { - const messageExternalIds: string[] = []; - const messageExternalIdsToDelete: string[] = []; - const microsoftClient = await this.microsoftOAuth2ClientProvider.getClient( connectedAccount.id, ); - const folderId = messageFolder.externalId || messageFolder.name; - const apiUrl = isNonEmptyString(messageFolder.syncCursor) - ? messageFolder.syncCursor - : `/me/mailfolders/${folderId}/messages/delta?$select=id`; + const results: GetMessageListsResponse = []; - const response: PageCollection = await microsoftClient - .api(apiUrl) - .version('beta') - .headers({ - Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, - }) - .get() - .catch((error) => { - this.logger.error( - `Connected account ${connectedAccount.id}: Error fetching message list: ${JSON.stringify(error)}`, + for ( + let batchStart = 0; + batchStart < foldersToProcess.length; + batchStart += MICROSOFT_GRAPH_BATCH_LIMIT + ) { + const foldersBatch = foldersToProcess.slice( + batchStart, + batchStart + MICROSOFT_GRAPH_BATCH_LIMIT, + ); + + const batchResults = await this.getMessageListBatch( + microsoftClient, + foldersBatch, + ); + + results.push(...batchResults); + } + + return results; + } + + private async getMessageListBatch( + microsoftClient: Client, + foldersBatch: FolderToProcess[], + ): Promise { + const folderByRequestId = new Map(); + + const requests: BatchRequestData[] = foldersBatch.map((folder, index) => { + const requestId = (index + 1).toString(); + + folderByRequestId.set(requestId, folder); + + return { + id: requestId, + method: 'GET', + url: this.buildInitialDeltaUrl(folder), + headers: { Prefer: MESSAGE_LIST_PREFER_HEADER }, + }; + }); + + const batchResponse: MicrosoftGraphBatchResponse = + await microsoftClient + .api('/$batch') + .version('beta') + .post({ requests }) + .catch((error: unknown) => + this.microsoftMessageListFetchErrorHandler.handleError(error), ); - this.microsoftMessageListFetchErrorHandler.handleError(error); - }); - const callback: PageIteratorCallback = (data) => { - if (data['@removed']) { + const results: GetOneMessageListResponse[] = []; + + for (const response of batchResponse.responses) { + const folder = folderByRequestId.get(response.id); + + if (!isDefined(folder)) { + throw new Error( + `Microsoft batch response references unknown request id ${response.id}`, + ); + } + + if (response.status !== 200) { + this.microsoftMessageListFetchErrorHandler.handleError({ + statusCode: response.status, + message: response.body?.error?.message, + code: response.body?.error?.code, + }); + } + + results.push( + await this.iterateFolderPages(microsoftClient, folder, response.body), + ); + } + + return results; + } + + private buildInitialDeltaUrl(folder: FolderToProcess): string { + if (isNonEmptyString(folder.syncCursor)) { + return toRelativeGraphUrl(folder.syncCursor); + } + + const folderId = folder.externalId || folder.name; + + return `/me/mailfolders/${folderId}/messages/delta?$select=id`; + } + + private async iterateFolderPages( + microsoftClient: Client, + folder: FolderToProcess, + firstPage?: MicrosoftGraphDeltaListResponseBody, + ): Promise { + const messageExternalIds: string[] = []; + const messageExternalIdsToDelete: string[] = []; + + const callback: PageIteratorCallback = (data: { id: string }) => { + if ('@removed' in data) { messageExternalIdsToDelete.push(data.id); } else { messageExternalIds.push(data.id); @@ -113,22 +170,29 @@ export class MicrosoftGetMessageListService { return true; }; - const pageIterator = new PageIterator(microsoftClient, response, callback, { - headers: { - Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, - }, - }); + const pageCollection: PageCollection = { + value: firstPage?.value ?? [], + '@odata.nextLink': firstPage?.['@odata.nextLink'], + '@odata.deltaLink': firstPage?.['@odata.deltaLink'], + }; - await pageIterator.iterate().catch((error) => { + const pageIterator = new PageIterator( + microsoftClient, + pageCollection, + callback, + { headers: { Prefer: MESSAGE_LIST_PREFER_HEADER } }, + ); + + await pageIterator.iterate().catch((error: unknown) => { this.microsoftMessageListFetchErrorHandler.handleError(error); }); return { messageExternalIds, messageExternalIdsToDelete, - previousSyncCursor: messageFolder.syncCursor, + previousSyncCursor: folder.syncCursor, nextSyncCursor: pageIterator.getDeltaLink() || '', - folderId: undefined, + folderId: folder.id, }; } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-message-list-fetch-error-handler.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-message-list-fetch-error-handler.service.ts index bd186e97289..a961e623e03 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-message-list-fetch-error-handler.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-message-list-fetch-error-handler.service.ts @@ -14,7 +14,7 @@ export class MicrosoftMessageListFetchErrorHandler { ) {} // oxlint-disable-next-line typescript/no-explicit-any - public handleError(error: any): void { + public handleError(error: any): never { this.logger.error(`Error fetching message list: ${JSON.stringify(error)}`); const networkError = this.microsoftNetworkErrorHandler.handleError(error); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-batch-response.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-batch-response.type.ts new file mode 100644 index 00000000000..729a8dd7c99 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-batch-response.type.ts @@ -0,0 +1,7 @@ +export type MicrosoftGraphBatchResponse = { + responses: { + id: string; + status: number; + body?: TBody; + }[]; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-delta-list-response-body.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-delta-list-response-body.type.ts new file mode 100644 index 00000000000..e930aa981f3 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/microsoft-graph-delta-list-response-body.type.ts @@ -0,0 +1,6 @@ +export type MicrosoftGraphDeltaListResponseBody = { + value?: { id?: string; '@removed'?: { reason?: string } }[]; + '@odata.nextLink'?: string; + '@odata.deltaLink'?: string; + error?: { code?: string; message?: string }; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/utils/__tests__/to-relative-graph-url.util.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/utils/__tests__/to-relative-graph-url.util.spec.ts new file mode 100644 index 00000000000..18b1358d3ee --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/utils/__tests__/to-relative-graph-url.util.spec.ts @@ -0,0 +1,31 @@ +import { toRelativeGraphUrl } from 'src/modules/messaging/message-import-manager/drivers/microsoft/utils/to-relative-graph-url.util'; + +describe('toRelativeGraphUrl', () => { + it('strips origin and beta version segment from an absolute deltaLink', () => { + expect( + toRelativeGraphUrl( + 'https://graph.microsoft.com/beta/me/mailfolders/inbox-id/messages/delta?$deltatoken=abc', + ), + ).toBe('/me/mailfolders/inbox-id/messages/delta?$deltatoken=abc'); + }); + + it('strips the v1.0 version segment', () => { + expect( + toRelativeGraphUrl('https://graph.microsoft.com/v1.0/me/messages'), + ).toBe('/me/messages'); + }); + + it('returns an already-relative url unchanged', () => { + expect(toRelativeGraphUrl('/me/mailfolders/inbox-id/messages/delta')).toBe( + '/me/mailfolders/inbox-id/messages/delta', + ); + }); + + it('preserves query parameters on the relative url', () => { + expect( + toRelativeGraphUrl( + 'https://graph.microsoft.com/beta/me/messages?$select=id&$top=999', + ), + ).toBe('/me/messages?$select=id&$top=999'); + }); +}); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/utils/to-relative-graph-url.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/utils/to-relative-graph-url.util.ts new file mode 100644 index 00000000000..3429d38d875 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/utils/to-relative-graph-url.util.ts @@ -0,0 +1,21 @@ +import { isNonEmptyString } from '@sniptt/guards'; + +const GRAPH_VERSION_SEGMENTS = ['beta', 'v1.0']; + +export const toRelativeGraphUrl = (url: string): string => { + if (!isNonEmptyString(url) || !url.startsWith('http')) { + return url; + } + + const parsedUrl = new URL(url); + const pathSegments = parsedUrl.pathname.split('/').filter(isNonEmptyString); + + if ( + pathSegments.length > 0 && + GRAPH_VERSION_SEGMENTS.includes(pathSegments[0]) + ) { + pathSegments.shift(); + } + + return `/${pathSegments.join('/')}${parsedUrl.search}`; +};