mirror of
https://github.com/twentyhq/twenty.git
synced 2026-05-23 15:59:30 -04:00
fix(server): decouple SDK client generation from workspace activation (#20514)
`activateWorkspace` enqueues SDK gen job inside `WorkspaceManagerService.init()` introduced by https://github.com/twentyhq/twenty/pull/19271 But if enqueue call fails it crashes cuz it doesn't have try catch so created workspace is in corrupted state <img width="636" height="812" alt="image" src="https://github.com/user-attachments/assets/09acd042-46d0-4225-adc0-c74ea770785d" /> FIx: Move SDK enqueue out of `init()` Call after `activateAndInitializeUpgradeState` succeeds, wrap in try catch. Mirror preInstalledAppsService.installOnWorkspace pattern. Assuming enqueue failure if Redis is unavailable we fallback to `SdkClientArchiveService.downloadArchiveBufferOrGenerate` which generates it on the fly Around 19 workspaces in prod affected with status `ONGOING_CREATION`
This commit is contained in:
@@ -0,0 +1,125 @@
|
||||
import { Test, type TestingModule } from '@nestjs/testing';
|
||||
import { getRepositoryToken } from '@nestjs/typeorm';
|
||||
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { WorkspaceSchemaFactory } from 'src/engine/api/graphql/workspace-schema.factory';
|
||||
import { ApplicationEntity } from 'src/engine/core-modules/application/application.entity';
|
||||
import { ApplicationService } from 'src/engine/core-modules/application/application.service';
|
||||
import { FileStorageService } from 'src/engine/core-modules/file-storage/file-storage.service';
|
||||
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
import { GENERATE_SDK_CLIENT_JOB_NAME } from 'src/engine/core-modules/sdk-client/jobs/generate-sdk-client.job-constants';
|
||||
import { SdkClientGenerationService } from 'src/engine/core-modules/sdk-client/sdk-client-generation.service';
|
||||
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { WorkspaceCacheService } from 'src/engine/workspace-cache/services/workspace-cache.service';
|
||||
|
||||
describe('SdkClientGenerationService', () => {
|
||||
let service: SdkClientGenerationService;
|
||||
let applicationService: jest.Mocked<
|
||||
Pick<
|
||||
ApplicationService,
|
||||
'findWorkspaceTwentyStandardAndCustomApplicationOrThrow'
|
||||
>
|
||||
>;
|
||||
let messageQueueService: jest.Mocked<Pick<MessageQueueService, 'add'>>;
|
||||
|
||||
beforeEach(async () => {
|
||||
applicationService = {
|
||||
findWorkspaceTwentyStandardAndCustomApplicationOrThrow: jest.fn(),
|
||||
};
|
||||
messageQueueService = {
|
||||
add: jest.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
SdkClientGenerationService,
|
||||
{ provide: FileStorageService, useValue: {} },
|
||||
{
|
||||
provide: getRepositoryToken(ApplicationEntity),
|
||||
useValue: {} as Repository<ApplicationEntity>,
|
||||
},
|
||||
{
|
||||
provide: getRepositoryToken(WorkspaceEntity),
|
||||
useValue: {} as Repository<WorkspaceEntity>,
|
||||
},
|
||||
{ provide: WorkspaceCacheService, useValue: {} },
|
||||
{ provide: WorkspaceSchemaFactory, useValue: {} },
|
||||
{ provide: ApplicationService, useValue: applicationService },
|
||||
{
|
||||
provide: getQueueToken(MessageQueue.workspaceQueue),
|
||||
useValue: messageQueueService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<SdkClientGenerationService>(
|
||||
SdkClientGenerationService,
|
||||
);
|
||||
});
|
||||
|
||||
describe('enqueueSdkClientGenerationForWorkspace', () => {
|
||||
const workspaceId = 'workspace-1';
|
||||
const apps = {
|
||||
twentyStandardFlatApplication: {
|
||||
id: 'std-app-id',
|
||||
universalIdentifier: 'twenty-standard',
|
||||
},
|
||||
workspaceCustomFlatApplication: {
|
||||
id: 'custom-app-id',
|
||||
universalIdentifier: 'workspace-custom',
|
||||
},
|
||||
};
|
||||
|
||||
it('enqueues one job per application with dedup id and retry limit', async () => {
|
||||
applicationService.findWorkspaceTwentyStandardAndCustomApplicationOrThrow.mockResolvedValue(
|
||||
apps as never,
|
||||
);
|
||||
|
||||
await service.enqueueSdkClientGenerationForWorkspace(workspaceId);
|
||||
|
||||
expect(messageQueueService.add).toHaveBeenCalledTimes(2);
|
||||
expect(messageQueueService.add).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
GENERATE_SDK_CLIENT_JOB_NAME,
|
||||
{
|
||||
workspaceId,
|
||||
applicationId: 'std-app-id',
|
||||
applicationUniversalIdentifier: 'twenty-standard',
|
||||
},
|
||||
{
|
||||
id: `sdk-client:${workspaceId}:std-app-id`,
|
||||
retryLimit: 3,
|
||||
},
|
||||
);
|
||||
expect(messageQueueService.add).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
GENERATE_SDK_CLIENT_JOB_NAME,
|
||||
{
|
||||
workspaceId,
|
||||
applicationId: 'custom-app-id',
|
||||
applicationUniversalIdentifier: 'workspace-custom',
|
||||
},
|
||||
{
|
||||
id: `sdk-client:${workspaceId}:custom-app-id`,
|
||||
retryLimit: 3,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
it('propagates errors thrown by the message queue service', async () => {
|
||||
applicationService.findWorkspaceTwentyStandardAndCustomApplicationOrThrow.mockResolvedValue(
|
||||
apps as never,
|
||||
);
|
||||
const failure = new Error('Redis unavailable');
|
||||
|
||||
messageQueueService.add.mockRejectedValueOnce(failure);
|
||||
|
||||
await expect(
|
||||
service.enqueueSdkClientGenerationForWorkspace(workspaceId),
|
||||
).rejects.toBe(failure);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,7 @@
|
||||
export const GENERATE_SDK_CLIENT_JOB_NAME = 'GenerateSdkClientJob';
|
||||
|
||||
export type GenerateSdkClientJobData = {
|
||||
workspaceId: string;
|
||||
applicationId: string;
|
||||
applicationUniversalIdentifier: string;
|
||||
};
|
||||
@@ -1,21 +1,19 @@
|
||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||
import {
|
||||
GENERATE_SDK_CLIENT_JOB_NAME,
|
||||
type GenerateSdkClientJobData,
|
||||
} from 'src/engine/core-modules/sdk-client/jobs/generate-sdk-client.job-constants';
|
||||
import { SdkClientGenerationService } from 'src/engine/core-modules/sdk-client/sdk-client-generation.service';
|
||||
|
||||
export type GenerateSdkClientJobData = {
|
||||
workspaceId: string;
|
||||
applicationId: string;
|
||||
applicationUniversalIdentifier: string;
|
||||
};
|
||||
|
||||
@Processor(MessageQueue.workspaceQueue)
|
||||
export class GenerateSdkClientJob {
|
||||
constructor(
|
||||
private readonly sdkClientGenerationService: SdkClientGenerationService,
|
||||
) {}
|
||||
|
||||
@Process(GenerateSdkClientJob.name)
|
||||
@Process(GENERATE_SDK_CLIENT_JOB_NAME)
|
||||
async handle(data: GenerateSdkClientJobData): Promise<void> {
|
||||
await this.sdkClientGenerationService.generateSdkClientForApplication({
|
||||
workspaceId: data.workspaceId,
|
||||
|
||||
@@ -11,19 +11,28 @@ import { Repository } from 'typeorm';
|
||||
|
||||
import { WorkspaceSchemaFactory } from 'src/engine/api/graphql/workspace-schema.factory';
|
||||
import { ApplicationEntity } from 'src/engine/core-modules/application/application.entity';
|
||||
import { ApplicationService } from 'src/engine/core-modules/application/application.service';
|
||||
import { FileStorageService } from 'src/engine/core-modules/file-storage/file-storage.service';
|
||||
import { createZipFile } from 'src/engine/core-modules/logic-function/logic-function-drivers/utils/create-zip-file';
|
||||
import { TemporaryDirManager } from 'src/engine/core-modules/logic-function/logic-function-drivers/utils/temporary-dir-manager';
|
||||
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
import { SDK_CLIENT_PACKAGE_DIRNAME } from 'src/engine/core-modules/sdk-client/constants/sdk-client-package-dirname';
|
||||
import {
|
||||
SdkClientException,
|
||||
SdkClientExceptionCode,
|
||||
} from 'src/engine/core-modules/sdk-client/exceptions/sdk-client.exception';
|
||||
import {
|
||||
GENERATE_SDK_CLIENT_JOB_NAME,
|
||||
type GenerateSdkClientJobData,
|
||||
} from 'src/engine/core-modules/sdk-client/jobs/generate-sdk-client.job-constants';
|
||||
import { fromWorkspaceEntityToFlat } from 'src/engine/core-modules/workspace/utils/from-workspace-entity-to-flat.util';
|
||||
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { WorkspaceCacheService } from 'src/engine/workspace-cache/services/workspace-cache.service';
|
||||
|
||||
const SDK_CLIENT_ARCHIVE_NAME = 'twenty-client-sdk.zip';
|
||||
const SDK_CLIENT_GENERATION_RETRY_LIMIT = 3;
|
||||
|
||||
@Injectable()
|
||||
export class SdkClientGenerationService {
|
||||
@@ -37,8 +46,38 @@ export class SdkClientGenerationService {
|
||||
private readonly workspaceRepository: Repository<WorkspaceEntity>,
|
||||
private readonly workspaceCacheService: WorkspaceCacheService,
|
||||
private readonly workspaceSchemaFactory: WorkspaceSchemaFactory,
|
||||
private readonly applicationService: ApplicationService,
|
||||
@InjectMessageQueue(MessageQueue.workspaceQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
) {}
|
||||
|
||||
async enqueueSdkClientGenerationForWorkspace(
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
const { twentyStandardFlatApplication, workspaceCustomFlatApplication } =
|
||||
await this.applicationService.findWorkspaceTwentyStandardAndCustomApplicationOrThrow(
|
||||
{ workspaceId },
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
[twentyStandardFlatApplication, workspaceCustomFlatApplication].map(
|
||||
(application) =>
|
||||
this.messageQueueService.add<GenerateSdkClientJobData>(
|
||||
GENERATE_SDK_CLIENT_JOB_NAME,
|
||||
{
|
||||
workspaceId,
|
||||
applicationId: application.id,
|
||||
applicationUniversalIdentifier: application.universalIdentifier,
|
||||
},
|
||||
{
|
||||
id: `sdk-client:${workspaceId}:${application.id}`,
|
||||
retryLimit: SDK_CLIENT_GENERATION_RETRY_LIMIT,
|
||||
},
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
async generateSdkClientForApplication({
|
||||
workspaceId,
|
||||
applicationId,
|
||||
|
||||
@@ -3,6 +3,7 @@ import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
import { CoreGraphQLApiModule } from 'src/engine/api/graphql/core-graphql-api.module';
|
||||
|
||||
import { ApplicationEntity } from 'src/engine/core-modules/application/application.entity';
|
||||
import { ApplicationModule } from 'src/engine/core-modules/application/application.module';
|
||||
import { SdkClientController } from 'src/engine/core-modules/sdk-client/controllers/sdk-client.controller';
|
||||
import { SdkClientArchiveService } from 'src/engine/core-modules/sdk-client/sdk-client-archive.service';
|
||||
import { SdkClientGenerationService } from 'src/engine/core-modules/sdk-client/sdk-client-generation.service';
|
||||
@@ -14,6 +15,7 @@ import { WorkspaceCacheModule } from 'src/engine/workspace-cache/workspace-cache
|
||||
TypeOrmModule.forFeature([ApplicationEntity, WorkspaceEntity]),
|
||||
WorkspaceCacheModule,
|
||||
CoreGraphQLApiModule,
|
||||
ApplicationModule,
|
||||
],
|
||||
controllers: [SdkClientController],
|
||||
providers: [SdkClientGenerationService, SdkClientArchiveService],
|
||||
|
||||
@@ -39,6 +39,7 @@ import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/work
|
||||
import { PrefillLogicFunctionService } from 'src/engine/workspace-manager/standard-objects-prefill-data/services/prefill-logic-function.service';
|
||||
import { ApplicationService } from 'src/engine/core-modules/application/application.service';
|
||||
import { PreInstalledAppsService } from 'src/engine/core-modules/application/pre-installed-apps/pre-installed-apps.service';
|
||||
import { SdkClientGenerationService } from 'src/engine/core-modules/sdk-client/sdk-client-generation.service';
|
||||
import { WorkspaceMigrationValidateBuildAndRunService } from 'src/engine/workspace-manager/workspace-migration/services/workspace-migration-validate-build-and-run-service';
|
||||
import { WorkspaceManagerService } from 'src/engine/workspace-manager/workspace-manager.service';
|
||||
|
||||
@@ -128,6 +129,7 @@ describe('WorkspaceService', () => {
|
||||
AiModelRegistryService,
|
||||
ApplicationService,
|
||||
PreInstalledAppsService,
|
||||
SdkClientGenerationService,
|
||||
PrefillLogicFunctionService,
|
||||
WorkspaceMigrationValidateBuildAndRunService,
|
||||
UpgradeMigrationService,
|
||||
|
||||
@@ -58,6 +58,7 @@ import { PermissionsService } from 'src/engine/metadata-modules/permissions/perm
|
||||
import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service';
|
||||
import { getWorkspaceSchemaName } from 'src/engine/workspace-datasource/utils/get-workspace-schema-name.util';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import { SdkClientGenerationService } from 'src/engine/core-modules/sdk-client/sdk-client-generation.service';
|
||||
import { PrefillLogicFunctionService } from 'src/engine/workspace-manager/standard-objects-prefill-data/services/prefill-logic-function.service';
|
||||
import { prefillCompanies } from 'src/engine/workspace-manager/standard-objects-prefill-data/utils/prefill-companies.util';
|
||||
import { prefillDashboards } from 'src/engine/workspace-manager/standard-objects-prefill-data/utils/prefill-dashboards.util';
|
||||
@@ -136,6 +137,7 @@ export class WorkspaceService extends TypeOrmQueryService<WorkspaceEntity> {
|
||||
private readonly coreEntityCacheService: CoreEntityCacheService,
|
||||
private readonly upgradeMigrationService: UpgradeMigrationService,
|
||||
private readonly upgradeSequenceReaderService: UpgradeSequenceReaderService,
|
||||
private readonly sdkClientGenerationService: SdkClientGenerationService,
|
||||
) {
|
||||
super(workspaceRepository);
|
||||
}
|
||||
@@ -369,6 +371,18 @@ export class WorkspaceService extends TypeOrmQueryService<WorkspaceEntity> {
|
||||
displayName: data.displayName,
|
||||
});
|
||||
|
||||
try {
|
||||
await this.sdkClientGenerationService.enqueueSdkClientGenerationForWorkspace(
|
||||
workspace.id,
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`failed to enqueue SDK client generation jobs for workspace ${workspace.id}`,
|
||||
error,
|
||||
);
|
||||
this.exceptionHandlerService.captureExceptions([error as Error]);
|
||||
}
|
||||
|
||||
await this.coreEntityCacheService.invalidate(
|
||||
'workspaceEntity',
|
||||
workspace.id,
|
||||
|
||||
@@ -21,6 +21,7 @@ import { FileModule } from 'src/engine/core-modules/file/file.module';
|
||||
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
|
||||
import { OnboardingModule } from 'src/engine/core-modules/onboarding/onboarding.module';
|
||||
import { PublicDomainEntity } from 'src/engine/core-modules/public-domain/public-domain.entity';
|
||||
import { SdkClientModule } from 'src/engine/core-modules/sdk-client/sdk-client.module';
|
||||
import { UserWorkspaceEntity } from 'src/engine/core-modules/user-workspace/user-workspace.entity';
|
||||
import { UserWorkspaceModule } from 'src/engine/core-modules/user-workspace/user-workspace.module';
|
||||
import { UserEntity } from 'src/engine/core-modules/user/user.entity';
|
||||
@@ -89,6 +90,7 @@ import { StandardObjectsPrefillModule } from 'src/engine/workspace-manager/stand
|
||||
WorkspaceMigrationModule,
|
||||
CoreEntityCacheModule,
|
||||
UpgradeModule,
|
||||
SdkClientModule,
|
||||
],
|
||||
services: [WorkspaceService],
|
||||
resolvers: workspaceAutoResolverOpts,
|
||||
|
||||
@@ -5,13 +5,6 @@ import { Repository } from 'typeorm';
|
||||
|
||||
import { ApplicationService } from 'src/engine/core-modules/application/application.service';
|
||||
import { FlatApplication } from 'src/engine/core-modules/application/types/flat-application.type';
|
||||
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
import {
|
||||
GenerateSdkClientJob,
|
||||
GenerateSdkClientJobData,
|
||||
} from 'src/engine/core-modules/sdk-client/jobs/generate-sdk-client.job';
|
||||
import { UserWorkspaceEntity } from 'src/engine/core-modules/user-workspace/user-workspace.entity';
|
||||
import { WorkspaceEntity } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { RoleEntity } from 'src/engine/metadata-modules/role/role.entity';
|
||||
@@ -37,8 +30,6 @@ export class WorkspaceManagerService {
|
||||
@InjectRepository(RoleEntity)
|
||||
private readonly roleRepository: Repository<RoleEntity>,
|
||||
private readonly applicationService: ApplicationService,
|
||||
@InjectMessageQueue(MessageQueue.workspaceQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
) {}
|
||||
|
||||
public async init({
|
||||
@@ -83,34 +74,13 @@ export class WorkspaceManagerService {
|
||||
`Metadata creation took ${dataSourceMetadataCreationEnd - dataSourceMetadataCreationStart}ms`,
|
||||
);
|
||||
|
||||
const { workspaceCustomFlatApplication, twentyStandardFlatApplication } =
|
||||
const { workspaceCustomFlatApplication } =
|
||||
await this.applicationService.findWorkspaceTwentyStandardAndCustomApplicationOrThrow(
|
||||
{
|
||||
workspaceId,
|
||||
},
|
||||
);
|
||||
|
||||
await Promise.all([
|
||||
this.messageQueueService.add<GenerateSdkClientJobData>(
|
||||
GenerateSdkClientJob.name,
|
||||
{
|
||||
workspaceId,
|
||||
applicationId: twentyStandardFlatApplication.id,
|
||||
applicationUniversalIdentifier:
|
||||
twentyStandardFlatApplication.universalIdentifier,
|
||||
},
|
||||
),
|
||||
this.messageQueueService.add<GenerateSdkClientJobData>(
|
||||
GenerateSdkClientJob.name,
|
||||
{
|
||||
workspaceId,
|
||||
applicationId: workspaceCustomFlatApplication.id,
|
||||
applicationUniversalIdentifier:
|
||||
workspaceCustomFlatApplication.universalIdentifier,
|
||||
},
|
||||
),
|
||||
]);
|
||||
|
||||
await this.setupDefaultRoles({
|
||||
workspaceId,
|
||||
userId,
|
||||
|
||||
Reference in New Issue
Block a user