mirror of
https://github.com/twentyhq/twenty.git
synced 2026-05-24 08:22:01 -04:00
fix(server): drop correlated subquery in getWorkspaceLastAttemptedCommandName (#20591)
## Summary
- The upgrade runner calls `getWorkspaceLastAttemptedCommandName` twice
per workspace step. Grafana showed it averaging ~4.4s and trending
upward as the `core.upgradeMigration` table grows during an in-flight
upgrade.
- The old query joined every outer row against a correlated subquery
(`attempt = (SELECT MAX(sub.attempt) ... WHERE sub.name = m.name AND
sub."workspaceId" = m."workspaceId")`). Even with the `(workspaceId,
name, attempt)` index added in 2.3, each outer row triggers an index
lookup — fine for a few rows, painful at production scale.
- Replaced with a two-level `DISTINCT ON`:
- Inner `DISTINCT ON ("workspaceId", name) ORDER BY "workspaceId", name,
attempt DESC` walks `IDX_UPGRADE_MIGRATION_WORKSPACE_ID_NAME_ATTEMPT`
directly and yields one row per `(workspaceId, name)` at max attempt.
- Outer `DISTINCT ON ("workspaceId") ORDER BY "workspaceId", "createdAt"
DESC` picks the most recent row per workspace.
- Semantically identical; planner now does a single index walk + one
sort instead of N correlated lookups.
The same correlated-subquery shape exists in
`getLastAttemptedCommandNameOrThrow`, `areAllWorkspacesAtCommand`, and
`getLastAttemptedInstanceCommand`. They run far less often during an
upgrade (per instance step, not per workspace step), so they're out of
scope for this hotfix — happy to follow up if we want them too.
## Benchmark (prod)
Run over all distinct workspaceIds in `core."upgradeMigration"`:
| Variant | Execution Time |
| --- | --- |
| Before (correlated subquery) | **2979.659 ms** |
| After (two-level DISTINCT ON) | **1225.690 ms** |
~2.4× faster, and the gap widens as the table grows over the course of
an upgrade.
Equivalence confirmed: the diff query below returned `0` divergent
workspaces on prod.
### Variant A — original (correlated subquery)
```sql
SELECT DISTINCT ON (m."workspaceId")
m."workspaceId", m.name, m.status, m."executedByVersion",
m."errorMessage", m."createdAt", m."isInitial"
FROM core."upgradeMigration" m
WHERE m."workspaceId" IN ($1, $2, ...)
AND m.attempt = (
SELECT MAX(sub.attempt)
FROM core."upgradeMigration" sub
WHERE sub.name = m.name
AND sub."workspaceId" = m."workspaceId"
)
ORDER BY m."workspaceId", m."createdAt" DESC;
```
### Variant B — new (two-level DISTINCT ON)
```sql
SELECT DISTINCT ON (latest_per_name."workspaceId")
latest_per_name."workspaceId",
latest_per_name.name,
latest_per_name.status,
latest_per_name."executedByVersion",
latest_per_name."errorMessage",
latest_per_name."createdAt",
latest_per_name."isInitial"
FROM (
SELECT DISTINCT ON ("workspaceId", name)
"workspaceId", name, status, "executedByVersion",
"errorMessage", "createdAt", "isInitial"
FROM core."upgradeMigration"
WHERE "workspaceId" = ANY($1)
ORDER BY "workspaceId", name, attempt DESC
) latest_per_name
ORDER BY latest_per_name."workspaceId", latest_per_name."createdAt" DESC;
```
### Equivalence check (returned 0 on prod)
```sql
WITH target_ids AS (
SELECT DISTINCT "workspaceId"
FROM core."upgradeMigration"
WHERE "workspaceId" IS NOT NULL
),
old_result AS (
SELECT DISTINCT ON (m."workspaceId")
m."workspaceId", m.name, m.status, m."executedByVersion",
m."errorMessage", m."createdAt", m."isInitial"
FROM core."upgradeMigration" m
WHERE m."workspaceId" IN (SELECT "workspaceId" FROM target_ids)
AND m.attempt = (
SELECT MAX(sub.attempt)
FROM core."upgradeMigration" sub
WHERE sub.name = m.name
AND sub."workspaceId" = m."workspaceId"
)
ORDER BY m."workspaceId", m."createdAt" DESC
),
new_result AS (
SELECT DISTINCT ON (latest_per_name."workspaceId")
latest_per_name."workspaceId", latest_per_name.name, latest_per_name.status,
latest_per_name."executedByVersion", latest_per_name."errorMessage",
latest_per_name."createdAt", latest_per_name."isInitial"
FROM (
SELECT DISTINCT ON ("workspaceId", name)
"workspaceId", name, status, "executedByVersion",
"errorMessage", "createdAt", "isInitial"
FROM core."upgradeMigration"
WHERE "workspaceId" IN (SELECT "workspaceId" FROM target_ids)
ORDER BY "workspaceId", name, attempt DESC
) latest_per_name
ORDER BY latest_per_name."workspaceId", latest_per_name."createdAt" DESC
),
diffs AS (
SELECT 'only_in_old' AS bucket, o."workspaceId", o.name, o.status, o."createdAt"
FROM old_result o
LEFT JOIN new_result n ON n."workspaceId" = o."workspaceId"
WHERE n."workspaceId" IS NULL OR n.name <> o.name OR n.status <> o.status
UNION ALL
SELECT 'only_in_new', n."workspaceId", n.name, n.status, n."createdAt"
FROM new_result n
LEFT JOIN old_result o ON o."workspaceId" = n."workspaceId"
WHERE o."workspaceId" IS NULL OR o.name <> n.name OR o.status <> n.status
)
SELECT COUNT(*) AS divergent_workspaces FROM diffs;
```
## Test plan
- [ ] `npx nx test twenty-server --testPathPattern upgrade-migration`
- [ ] Integration tests: `npx nx run
twenty-server:test:integration:with-db-reset --testPathPattern
sequence-runner`
- [ ] Verify on staging that the slow query disappears from the
PostgreSQL Grafana board during the next upgrade run
This commit is contained in:
@@ -231,48 +231,55 @@ export class UpgradeMigrationService {
|
||||
return new Map();
|
||||
}
|
||||
|
||||
const migrations = await this.upgradeMigrationRepository
|
||||
.createQueryBuilder('migration')
|
||||
.select([
|
||||
'migration.workspaceId',
|
||||
'migration.name',
|
||||
'migration.status',
|
||||
'migration.executedByVersion',
|
||||
'migration.errorMessage',
|
||||
'migration.createdAt',
|
||||
'migration.isInitial',
|
||||
])
|
||||
.where({
|
||||
workspaceId: In(workspaceIds),
|
||||
})
|
||||
.andWhere(
|
||||
`migration.attempt = (
|
||||
SELECT MAX(sub.attempt)
|
||||
FROM core."upgradeMigration" sub
|
||||
WHERE sub.name = migration.name
|
||||
AND sub."workspaceId" = migration."workspaceId"
|
||||
)`,
|
||||
)
|
||||
.orderBy('migration.workspaceId')
|
||||
.addOrderBy('migration.createdAt', 'DESC')
|
||||
.distinctOn(['migration.workspaceId'])
|
||||
.getMany();
|
||||
const rows = await this.upgradeMigrationRepository.manager.query<
|
||||
Array<{
|
||||
workspaceId: string;
|
||||
name: string;
|
||||
status: UpgradeMigrationStatus;
|
||||
executedByVersion: string;
|
||||
errorMessage: string | null;
|
||||
createdAt: Date;
|
||||
isInitial: boolean;
|
||||
}>
|
||||
>(
|
||||
`
|
||||
SELECT DISTINCT ON (latest_per_name."workspaceId")
|
||||
latest_per_name."workspaceId",
|
||||
latest_per_name.name,
|
||||
latest_per_name.status,
|
||||
latest_per_name."executedByVersion",
|
||||
latest_per_name."errorMessage",
|
||||
latest_per_name."createdAt",
|
||||
latest_per_name."isInitial"
|
||||
FROM (
|
||||
SELECT DISTINCT ON ("workspaceId", name)
|
||||
"workspaceId",
|
||||
name,
|
||||
status,
|
||||
"executedByVersion",
|
||||
"errorMessage",
|
||||
"createdAt",
|
||||
"isInitial"
|
||||
FROM core."upgradeMigration"
|
||||
WHERE "workspaceId" = ANY($1)
|
||||
ORDER BY "workspaceId", name, attempt DESC
|
||||
) latest_per_name
|
||||
ORDER BY latest_per_name."workspaceId", latest_per_name."createdAt" DESC
|
||||
`,
|
||||
[workspaceIds],
|
||||
);
|
||||
|
||||
const cursors = new Map<string, WorkspaceLastAttemptedCommand>();
|
||||
|
||||
for (const migration of migrations) {
|
||||
if (migration.workspaceId === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
cursors.set(migration.workspaceId, {
|
||||
workspaceId: migration.workspaceId,
|
||||
name: migration.name,
|
||||
status: migration.status,
|
||||
executedByVersion: migration.executedByVersion,
|
||||
errorMessage: migration.errorMessage,
|
||||
createdAt: migration.createdAt,
|
||||
isInitial: migration.isInitial,
|
||||
for (const row of rows) {
|
||||
cursors.set(row.workspaceId, {
|
||||
workspaceId: row.workspaceId,
|
||||
name: row.name,
|
||||
status: row.status,
|
||||
executedByVersion: row.executedByVersion,
|
||||
errorMessage: row.errorMessage,
|
||||
createdAt: row.createdAt,
|
||||
isInitial: row.isInitial,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user