Félix Malfait f39fccc3c4 fix(messaging): split thread create/update statements and stop clobbering accumulator (#19388)
## Summary

Fixes two bugs in
`MessagingMessageService.saveMessagesWithinTransaction`.

### Bug 1 — `ON CONFLICT DO UPDATE command cannot affect row a second
time`

Reported in production logs from `MessagingMessagesImportService`.
Postgres rejects a single `INSERT … ON CONFLICT DO UPDATE` when the same
conflict target appears twice in the values list, and that's exactly
what was happening to `messageThread`.

Where it came from: #19351 added the message-thread subject refresh
feature and, in doing so, switched the existing thread `insert` to a
bulk `upsert(['id'])` over a list built by concatenating two sources:

```ts
const threadsToUpsert = [
  ...messageThreadsToCreate,        // brand-new thread rows
  ...threadSubjectUpdates entries,  // subject refreshes for existing threads
];
await messageThreadRepository.upsert(threadsToUpsert, ['id'], txManager);
```

Each list is internally unique, but they are **not disjoint**.
`enrichMessageAccumulatorWithMessageThreadToCreate`, when it sees two
messages in the same batch sharing a brand-new thread external id,
copies the first sibling's freshly-minted thread id into the second
sibling's `existingThreadInDB`. The subject-update gate later in the
loop then trusts that field and queues a subject refresh for that id —
which is also already in `messageThreadsToCreate`. Same id, same
statement, two rows → Postgres aborts the transaction and the import
retries forever on the same batch.

**Fix:** stop merging the two lists. Issue creates and subject updates
as two separate statements within the same transaction:

```ts
if (messageThreadsToCreate.length > 0) {
  await messageThreadRepository.insert(messageThreadsToCreate, txManager);
}
if (threadSubjectUpdates.size > 0) {
  await messageThreadRepository.upsert(
    Array.from(threadSubjectUpdates.entries()).map(([id, { subject }]) => ({ id, subject })),
    ['id'],
    txManager,
  );
}
```

This is closer to the pre-#19351 shape (`insert` for new rows) and
side-steps the duplicate-row constraint entirely: each statement is
internally unique (creates use freshly minted UUIDs; updates are keyed
by a `Map<id, …>`), and within the same transaction Postgres happily
applies a subject update to a row inserted by a previous statement.

### Bug 2 —
`enrichMessageAccumulatorWithExistingMessageChannelMessageAssociations`
clobbers the accumulator

Independent latent bug spotted while tracing the flow. The helper did:

```ts
if (existingMessageChannelMessageAssociation) {
  messageAccumulatorMap.set(message.externalId, {
    existingMessageInDB: existingMessage,
    existingMessageChannelMessageAssociationInDB: existingMessageChannelMessageAssociation,
  });
}
```

i.e. it **replaces** the accumulator object, dropping the
`existingThreadInDB` set just before by
`enrichMessageAccumulatorWithExistingMessageThreadIds`.

The branch only fires when re-encountering a message that's already been
fully synced on this channel (matched on `headerMessageId` AND already
has an association row) — i.e. routinely on Gmail/IMAP incremental syncs
whenever the connector re-delivers an existing message (label change,
read/unread, archive, full-resync after error, …).

When it fires, the next enrichment step sees `existingThreadInDB` as
`undefined`, falls into the "create a new thread" branch, mints a fresh
`threadToCreate`, but the main loop never queues a `messageToCreate` or
association for it (because both `existingMessageInDB` and the existing
association are still set). Net effect: **one orphan `messageThread` row
inserted per re-encountered message, with nothing referencing it.**

The existing message in the DB keeps pointing at its real thread, so
this is invisible to users — no thread fragmentation, no UI symptoms, no
error logs. Just slow accumulation of orphan thread rows that no query
joins onto. Probably worth running

```sql
SELECT COUNT(*)
FROM "messageThread" mt
WHERE NOT EXISTS (
  SELECT 1 FROM message m WHERE m."messageThreadId" = mt.id
);
```

on a busy production workspace once this lands to size whether a cleanup
migration is warranted.

**Fix:** mutate the existing accumulator in place instead of replacing
it.

## Test plan

- [x] `oxlint --type-aware` clean on touched file
- [x] `prettier` clean

🤖 Generated with [Claude Code](https://claude.com/claude-code)
2026-04-07 13:23:31 +02:00

Twenty logo

The #1 Open-Source CRM

🌐 Website · 📚 Documentation · Roadmap · Discord · Figma


Cover


Installation

See: 🚀 Self-hosting 🖥️ Local Setup

Why Twenty

We built Twenty for three reasons:

CRMs are too expensive, and users are trapped. Companies use locked-in customer data to hike prices. It shouldn't be that way.

A fresh start is required to build a better experience. We can learn from past mistakes and craft a cohesive experience inspired by new UX patterns from tools like Notion, Airtable or Linear.

We believe in open-source and community. Hundreds of developers are already building Twenty together. Once we have plugin capabilities, a whole ecosystem will grow around it.


What You Can Do With Twenty

Please feel free to flag any specific needs you have by creating an issue.

Below are a few features we have implemented to date:

Personalize layouts with filters, sort, group by, kanban and table views

Companies Kanban Views

Customize your objects and fields

Setting Custom Objects

Create and manage permissions with custom roles

Permissions

Automate workflow with triggers and actions

Workflows

Emails, calendar events, files, and more

Other Features


Stack

Thanks

Chromatic Greptile Sentry Crowdin E2B

Thanks to these amazing services that we use and recommend for UI testing (Chromatic), code review (Greptile), catching bugs (Sentry) and translating (Crowdin).

Join the Community

Description
No description provided
Readme AGPL-3.0 1.6 GiB
Languages
TypeScript 78.7%
MDX 17.6%
JavaScript 3.2%
Python 0.3%