This commit is contained in:
ehconitin
2026-04-17 17:43:23 +05:30
parent d2104b2fae
commit cece97924c
16 changed files with 554 additions and 432 deletions

View File

@@ -2,6 +2,7 @@ import { styled } from '@linaria/react';
import { plural, t } from '@lingui/core/macro';
import { useState } from 'react';
import { type ToolUIPart } from 'ai';
import { isToolPartErrored } from 'twenty-shared/ai';
import {
IconChevronRight,
IconCpu,
@@ -271,7 +272,7 @@ const ThinkingToolStepRow = ({
const ToolIcon = getToolIcon(resolvedToolName);
const label = getToolDisplayMessage(part.input, rawToolName, !isActive);
const hasError = (part.errorText?.trim().length ?? 0) > 0;
const hasError = isToolPartErrored(part.state);
const isExpandable = isToolOutputInspectable(part.output) || hasError;
const outputObj =
@@ -329,7 +330,12 @@ const ThinkingToolStepRow = ({
<AnimatedExpandableContainer isExpanded={isExpanded} mode="fit-content">
<StyledToolDetailsContainer>
{hasError ? (
<StyledToolErrorText>{part.errorText}</StyledToolErrorText>
// Legacy persisted tool parts can have an empty errorText while
// state === 'output-error' (old mapper stored errorMessage ?? '').
// Show a fallback label so the expanded panel isn't blank.
<StyledToolErrorText>
{part.errorText?.trim() || t`Unknown error`}
</StyledToolErrorText>
) : (
<StyledToolDetailsContent>
<StyledToolTabListContainer>

View File

@@ -16,6 +16,7 @@ import { getToolIcon } from '@/ai/utils/getToolIcon';
import { isToolOutputInspectable } from '@/ai/utils/isToolOutputInspectable';
import { useLingui } from '@lingui/react/macro';
import { type ToolUIPart } from 'ai';
import { isToolPartErrored } from 'twenty-shared/ai';
import { type JsonValue } from 'type-fest';
import { useCopyToClipboard } from '~/hooks/useCopyToClipboard';
@@ -146,7 +147,7 @@ export const ToolStepRenderer = ({
const { resolvedInput: toolInput, resolvedToolName: toolName } =
resolveToolInput(input, rawToolName);
const hasError = (errorText?.trim().length ?? 0) > 0;
const hasError = isToolPartErrored(toolPart.state);
const isExpandable = isToolOutputInspectable(output) || hasError;
const ToolIcon = getToolIcon(toolName);
@@ -258,7 +259,10 @@ export const ToolStepRenderer = ({
<AnimatedExpandableContainer isExpanded={isExpanded} mode="fit-content">
<StyledContentContainer>
{hasError ? (
errorText
// Legacy persisted tool parts can have an empty errorText while
// state === 'output-error' (old mapper stored errorMessage ?? '').
// Show a fallback label so the expanded panel isn't blank.
errorText?.trim() || t`Unknown error`
) : (
<>
<StyledTabContainer>

View File

@@ -23,12 +23,14 @@ const createToolPart = ({
errorText,
input = {},
output,
state = 'output-available',
type = 'tool-web_search',
}: {
type?: `tool-${string}`;
input?: Record<string, unknown>;
output?: unknown;
errorText?: string;
state?: string;
} = {}): ThinkingStepPart =>
({
type,
@@ -36,7 +38,7 @@ const createToolPart = ({
input,
output,
errorText,
state: 'output-available',
state,
}) as ThinkingStepPart;
describe('thinkingStepsDisplayState', () => {
@@ -102,8 +104,9 @@ describe('thinkingStepsDisplayState', () => {
expect(isThinkingStepPartActive(toolPart, false)).toBe(false);
});
it('should not treat empty persisted error text as a tool error', () => {
it('should treat non-error states as active regardless of errorText', () => {
const toolPart = createToolPart({
state: 'input-available',
output: undefined,
errorText: '',
});
@@ -111,17 +114,24 @@ describe('thinkingStepsDisplayState', () => {
expect(isThinkingStepPartActive(toolPart, true)).toBe(true);
});
it('should mark tool parts with output or error as inactive', () => {
it('should mark tool parts with output or error state as inactive', () => {
const completedToolPart = createToolPart({
state: 'output-available',
output: { result: { ok: true } },
});
const failedToolPart = createToolPart({
state: 'output-error',
output: undefined,
errorText: 'Tool failed',
});
const deniedToolPart = createToolPart({
state: 'output-denied',
output: undefined,
});
expect(isThinkingStepPartActive(completedToolPart, true)).toBe(false);
expect(isThinkingStepPartActive(failedToolPart, true)).toBe(false);
expect(isThinkingStepPartActive(deniedToolPart, true)).toBe(false);
});
});

View File

@@ -1,3 +1,4 @@
import { isToolPartErrored } from 'twenty-shared/ai';
import { isDefined } from 'twenty-shared/utils';
import { type ThinkingStepPart } from '@/ai/utils/thinkingStepPart';
@@ -10,7 +11,9 @@ export const isThinkingStepPartActive = (
return part.state === 'streaming';
}
const hasError = (part.errorText?.trim().length ?? 0) > 0;
return isLastMessageStreaming && !isDefined(part.output) && !hasError;
return (
isLastMessageStreaming &&
!isDefined(part.output) &&
!isToolPartErrored(part.state)
);
};

View File

@@ -2,6 +2,7 @@ import { type ReasoningUIPart, type ToolUIPart } from 'ai';
import {
type ExtendedFileUIPart,
type ExtendedUIMessagePart,
getToolApproval,
} from 'twenty-shared/ai';
import { type AgentMessagePart } from '~/generated-metadata/graphql';
@@ -9,28 +10,6 @@ import { type AgentMessagePart } from '~/generated-metadata/graphql';
// A parallel mapping for TypeORM entities exists in the server at:
// packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapDBPartsToUIMessageParts.ts
const getToolApproval = (errorDetails: unknown) => {
const approval =
errorDetails &&
typeof errorDetails === 'object' &&
'approval' in errorDetails &&
typeof errorDetails.approval === 'object' &&
errorDetails.approval !== null
? (errorDetails.approval as Record<string, unknown>)
: null;
if (!approval || typeof approval.id !== 'string') {
return undefined;
}
return {
id: approval.id,
approved:
typeof approval.approved === 'boolean' ? approval.approved : undefined,
reason: typeof approval.reason === 'string' ? approval.reason : undefined,
};
};
export const mapDBPartToUIMessagePart = (
part: AgentMessagePart,
): ExtendedUIMessagePart => {

View File

@@ -4,7 +4,7 @@ import { mapDBMessagesToUIMessages } from '@/ai/utils/mapDBMessagesToUIMessages'
import { useQuery } from '@apollo/client/react';
import { styled } from '@linaria/react';
import { t } from '@lingui/core/macro';
import { type ReactNode, useState } from 'react';
import { useState } from 'react';
import Skeleton from 'react-loading-skeleton';
import { IconChevronRight } from 'twenty-ui/display';
import { AnimatedExpandableContainer } from 'twenty-ui/layout';
@@ -24,13 +24,13 @@ const StyledContainer = styled.div`
gap: ${themeCssVariables.spacing[2]};
`;
const StyledCollapsibleSection = styled.div`
const StyledPromptSection = styled.div`
display: flex;
flex-direction: column;
gap: ${themeCssVariables.spacing[1]};
`;
const StyledSectionToggle = styled.button`
const StyledPromptToggle = styled.button`
align-items: center;
background: none;
border: none;
@@ -69,11 +69,6 @@ const StyledChevron = styled.span<{ isExpanded: boolean }>`
ease-in-out;
`;
const StyledSectionBody = styled.div`
color: ${themeCssVariables.font.color.primary};
padding-top: ${themeCssVariables.spacing[1]};
`;
const StyledPromptBody = styled.div`
background: ${themeCssVariables.background.transparent.lighter};
border: 1px solid ${themeCssVariables.border.color.light};
@@ -81,6 +76,7 @@ const StyledPromptBody = styled.div`
color: ${themeCssVariables.font.color.tertiary};
font-size: ${themeCssVariables.font.size.md};
line-height: ${themeCssVariables.text.lineHeight.lg};
margin-top: ${themeCssVariables.spacing[1]};
padding: ${themeCssVariables.spacing[3]};
white-space: pre-wrap;
`;
@@ -100,38 +96,6 @@ const StyledTraceUnavailable = styled.div`
font-size: ${themeCssVariables.font.size.md};
`;
type CollapsibleSectionProps = {
label: string;
defaultOpen?: boolean;
children: ReactNode;
};
const CollapsibleSection = ({
label,
defaultOpen = false,
children,
}: CollapsibleSectionProps) => {
const [isExpanded, setIsExpanded] = useState(defaultOpen);
return (
<StyledCollapsibleSection>
<StyledSectionToggle
type="button"
aria-expanded={isExpanded}
onClick={() => setIsExpanded((previous) => !previous)}
>
<StyledChevron isExpanded={isExpanded}>
<IconChevronRight size={14} />
</StyledChevron>
{label}
</StyledSectionToggle>
<AnimatedExpandableContainer isExpanded={isExpanded} mode="fit-content">
<StyledSectionBody>{children}</StyledSectionBody>
</AnimatedExpandableContainer>
</StyledCollapsibleSection>
);
};
const extractPromptText = (userMessage: AgentMessage | undefined): string => {
if (!userMessage) return '';
@@ -157,6 +121,7 @@ export const WorkflowRunAiAgentTraceDetail = ({
variables: { workflowRunId, workflowStepId },
},
);
const [isPromptExpanded, setIsPromptExpanded] = useState(false);
if (loading) {
return <Skeleton height={100} />;
@@ -180,9 +145,24 @@ export const WorkflowRunAiAgentTraceDetail = ({
return (
<StyledContainer>
{promptText.length > 0 && (
<CollapsibleSection label={t`Prompt`}>
<StyledPromptBody>{promptText}</StyledPromptBody>
</CollapsibleSection>
<StyledPromptSection>
<StyledPromptToggle
type="button"
aria-expanded={isPromptExpanded}
onClick={() => setIsPromptExpanded((previous) => !previous)}
>
<StyledChevron isExpanded={isPromptExpanded}>
<IconChevronRight size={14} />
</StyledChevron>
{t`Prompt`}
</StyledPromptToggle>
<AnimatedExpandableContainer
isExpanded={isPromptExpanded}
mode="fit-content"
>
<StyledPromptBody>{promptText}</StyledPromptBody>
</AnimatedExpandableContainer>
</StyledPromptSection>
)}
<StyledMessagesList>
{uiMessages.map((message) => (

View File

@@ -12,133 +12,216 @@ const createInsertResult = (id: string) =>
identifiers: [{ id }],
}) as never;
type TraceTestHarness = {
service: WorkflowAgentTracePersistenceService;
threadRepository: {
findOne: jest.Mock;
insert: jest.Mock;
manager: { transaction: jest.Mock };
};
threadRepositoryInTx: { update: jest.Mock };
turnRepository: { insert: jest.Mock };
messageRepository: { insert: jest.Mock };
messagePartRepository: { insert: jest.Mock };
fileAIChatService: jest.Mocked<FileAIChatService>;
};
const createHarness = (): TraceTestHarness => {
const threadRepositoryInTx = {
update: jest.fn().mockResolvedValue(undefined),
};
const turnRepository = {
insert: jest.fn().mockResolvedValue(createInsertResult('turn-1')),
};
const messageRepository = {
insert: jest
.fn()
.mockResolvedValueOnce(createInsertResult('user-message-1'))
.mockResolvedValueOnce(createInsertResult('assistant-message-1'))
.mockResolvedValueOnce(createInsertResult('user-message-2'))
.mockResolvedValueOnce(createInsertResult('assistant-message-2')),
};
const messagePartRepository = {
insert: jest.fn().mockResolvedValue(undefined),
};
const entityManager = {
getRepository: jest.fn((entity) => {
if (entity === AgentChatThreadEntity) {
return threadRepositoryInTx;
}
if (entity === AgentTurnEntity) {
return turnRepository;
}
if (entity === AgentMessageEntity) {
return messageRepository;
}
if (entity === AgentMessagePartEntity) {
return messagePartRepository;
}
throw new Error(`Unexpected entity: ${String(entity)}`);
}),
};
const threadRepository = {
findOne: jest.fn(),
insert: jest.fn(),
manager: {
transaction: jest.fn(async (callback) => callback(entityManager)),
},
};
const fileAIChatService = {
uploadFile: jest.fn(),
} as unknown as jest.Mocked<FileAIChatService>;
const service = new WorkflowAgentTracePersistenceService(
threadRepository as never,
fileAIChatService,
);
return {
service,
threadRepository,
threadRepositoryInTx,
turnRepository,
messageRepository,
messagePartRepository,
fileAIChatService,
};
};
const basePersistParams = {
userPrompt: 'Do the thing',
agentId: 'agent-1',
workspaceId: 'workspace-1',
workflowRunId: 'workflow-run-1',
workflowStepId: 'workflow-step-1',
totalInputTokens: 10,
totalOutputTokens: 20,
totalInputCredits: 30,
totalOutputCredits: 40,
contextWindowTokens: 50,
conversationSize: 60,
};
const oneTextStep = [
{ content: [{ type: 'text', text: 'Answer' }] },
] as StepResult<ToolSet>[];
describe('WorkflowAgentTracePersistenceService', () => {
it('reuses the workflow trace thread when the same step executes again', async () => {
const threadEntityRepository = {
findOne: jest
.fn()
.mockResolvedValueOnce(null)
.mockResolvedValueOnce({ id: 'thread-1' }),
insert: jest.fn().mockResolvedValue(createInsertResult('thread-1')),
update: jest.fn().mockResolvedValue(undefined),
};
const turnRepository = {
insert: jest
.fn()
.mockResolvedValueOnce(createInsertResult('turn-1'))
.mockResolvedValueOnce(createInsertResult('turn-2')),
};
const messageRepository = {
insert: jest
.fn()
.mockResolvedValueOnce(createInsertResult('user-message-1'))
.mockResolvedValueOnce(createInsertResult('assistant-message-1'))
.mockResolvedValueOnce(createInsertResult('user-message-2'))
.mockResolvedValueOnce(createInsertResult('assistant-message-2')),
};
const messagePartRepository = {
insert: jest.fn().mockResolvedValue(undefined),
};
it('creates a new thread with zero-initialized stats and applies the run stats inside the transaction', async () => {
const harness = createHarness();
const entityManager = {
getRepository: jest.fn((entity) => {
if (entity === AgentChatThreadEntity) {
return threadEntityRepository;
}
if (entity === AgentTurnEntity) {
return turnRepository;
}
if (entity === AgentMessageEntity) {
return messageRepository;
}
if (entity === AgentMessagePartEntity) {
return messagePartRepository;
}
throw new Error(`Unexpected entity: ${String(entity)}`);
}),
};
const threadRepository = {
manager: {
transaction: jest.fn(async (callback) => callback(entityManager)),
},
} as never;
const fileAIChatService = {
uploadFile: jest.fn(),
} as unknown as jest.Mocked<FileAIChatService>;
const service = new WorkflowAgentTracePersistenceService(
threadRepository,
fileAIChatService,
harness.threadRepository.findOne.mockResolvedValueOnce(null);
harness.threadRepository.insert.mockResolvedValueOnce(
createInsertResult('thread-1'),
);
const steps = [
{
content: [{ type: 'text', text: 'Answer' }],
},
] as StepResult<ToolSet>[];
const firstPersistedTrace = await service.persistTrace({
steps,
userPrompt: 'First prompt',
agentId: 'agent-1',
workspaceId: 'workspace-1',
workflowRunId: 'workflow-run-1',
workflowStepId: 'workflow-step-1',
totalInputTokens: 10,
totalOutputTokens: 20,
totalInputCredits: 30,
totalOutputCredits: 40,
contextWindowTokens: 50,
conversationSize: 60,
const persistedTrace = await harness.service.persistTrace({
...basePersistParams,
steps: oneTextStep,
});
const secondPersistedTrace = await service.persistTrace({
steps,
userPrompt: 'Second prompt',
agentId: 'agent-1',
workspaceId: 'workspace-1',
workflowRunId: 'workflow-run-1',
workflowStepId: 'workflow-step-1',
totalInputTokens: 11,
totalOutputTokens: 21,
totalInputCredits: 31,
totalOutputCredits: 41,
contextWindowTokens: 51,
conversationSize: 61,
});
expect(firstPersistedTrace).toEqual({
expect(persistedTrace).toEqual({
turnId: 'turn-1',
threadId: 'thread-1',
});
expect(secondPersistedTrace).toEqual({
turnId: 'turn-2',
threadId: 'thread-1',
});
expect(threadEntityRepository.insert).toHaveBeenCalledTimes(1);
expect(threadEntityRepository.update).toHaveBeenCalledWith(
expect(harness.threadRepository.insert).toHaveBeenCalledWith(
expect.objectContaining({
workspaceId: 'workspace-1',
userWorkspaceId: null,
workflowRunId: 'workflow-run-1',
workflowStepId: 'workflow-step-1',
title: 'Do the thing',
totalInputTokens: 0,
totalOutputTokens: 0,
totalInputCredits: 0,
totalOutputCredits: 0,
}),
);
expect(harness.threadRepositoryInTx.update).toHaveBeenCalledWith(
'thread-1',
expect.objectContaining({
title: 'Second prompt',
title: 'Do the thing',
totalInputTokens: expect.any(Function),
totalOutputTokens: expect.any(Function),
totalInputCredits: expect.any(Function),
totalOutputCredits: expect.any(Function),
contextWindowTokens: 51,
conversationSize: 61,
contextWindowTokens: 50,
conversationSize: 60,
}),
);
const updatePayload = threadEntityRepository.update.mock.calls[0][1];
const updatePayload = harness.threadRepositoryInTx.update.mock.calls[0][1];
expect(updatePayload.totalInputTokens()).toBe('"totalInputTokens" + 11');
expect(updatePayload.totalOutputTokens()).toBe('"totalOutputTokens" + 21');
expect(updatePayload.totalInputCredits()).toBe('"totalInputCredits" + 31');
expect(updatePayload.totalInputTokens()).toBe('"totalInputTokens" + 10');
expect(updatePayload.totalOutputTokens()).toBe('"totalOutputTokens" + 20');
expect(updatePayload.totalInputCredits()).toBe('"totalInputCredits" + 30');
expect(updatePayload.totalOutputCredits()).toBe(
'"totalOutputCredits" + 41',
'"totalOutputCredits" + 40',
);
expect(messagePartRepository.insert).toHaveBeenCalledTimes(4);
expect(fileAIChatService.uploadFile).not.toHaveBeenCalled();
expect(harness.messageRepository.insert).toHaveBeenCalledTimes(2);
expect(harness.messagePartRepository.insert).toHaveBeenCalledTimes(2);
expect(harness.fileAIChatService.uploadFile).not.toHaveBeenCalled();
});
it('reuses the existing thread when the same workflow step executes again', async () => {
const harness = createHarness();
harness.threadRepository.findOne.mockResolvedValueOnce({ id: 'thread-1' });
const persistedTrace = await harness.service.persistTrace({
...basePersistParams,
userPrompt: 'Second prompt',
totalInputTokens: 11,
steps: oneTextStep,
});
expect(persistedTrace.threadId).toBe('thread-1');
expect(harness.threadRepository.insert).not.toHaveBeenCalled();
const updatePayload = harness.threadRepositoryInTx.update.mock.calls[0][1];
expect(updatePayload.title).toBe('Second prompt');
expect(updatePayload.totalInputTokens()).toBe('"totalInputTokens" + 11');
});
it('recovers when a concurrent persist wins the unique-index race', async () => {
const harness = createHarness();
const uniqueViolation = Object.assign(new Error('duplicate'), {
code: '23505',
});
harness.threadRepository.findOne
.mockResolvedValueOnce(null)
.mockResolvedValueOnce({ id: 'thread-winner' });
harness.threadRepository.insert.mockRejectedValueOnce(uniqueViolation);
const persistedTrace = await harness.service.persistTrace({
...basePersistParams,
steps: oneTextStep,
});
expect(persistedTrace.threadId).toBe('thread-winner');
expect(harness.threadRepository.findOne).toHaveBeenCalledTimes(2);
});
it('skips assistant part inserts when the step produced no persistable parts', async () => {
const harness = createHarness();
harness.threadRepository.findOne.mockResolvedValueOnce(null);
harness.threadRepository.insert.mockResolvedValueOnce(
createInsertResult('thread-1'),
);
await harness.service.persistTrace({
...basePersistParams,
steps: [] as StepResult<ToolSet>[],
});
// Only the user prompt part is inserted; no assistant parts.
expect(harness.messagePartRepository.insert).toHaveBeenCalledTimes(1);
});
});

View File

@@ -12,11 +12,44 @@ import {
import { AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message-part.entity';
import { AgentTurnEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-turn.entity';
import { mapGenerateTextStepsToPersistableParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapGenerateTextStepsToPersistableParts';
import { mapPersistablePartsToDBParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDBParts';
import { mapPersistablePartsToDatabaseParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDatabaseParts';
import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity';
import { FileAIChatService } from 'src/engine/core-modules/file/file-ai-chat/services/file-ai-chat.service';
const MAX_THREAD_TITLE_LENGTH = 100;
const POSTGRES_UNIQUE_VIOLATION_CODE = '23505';
type WorkflowTraceThreadKey = {
workspaceId: string;
workflowRunId: string;
workflowStepId: string;
};
type WorkflowTraceThreadStats = {
totalInputTokens: number;
totalOutputTokens: number;
totalInputCredits: number;
totalOutputCredits: number;
contextWindowTokens: number;
conversationSize: number;
};
type PersistTraceParams = WorkflowTraceThreadKey &
WorkflowTraceThreadStats & {
steps: StepResult<ToolSet>[];
userPrompt: string;
agentId: string | null;
};
const isUniqueViolationError = (error: unknown): error is { code: string } =>
typeof error === 'object' &&
error !== null &&
'code' in error &&
(error as { code: unknown }).code === POSTGRES_UNIQUE_VIOLATION_CODE;
const buildColumnIncrementExpression =
(columnName: string, delta: number) => () =>
`"${columnName}" + ${delta}`;
@Injectable()
export class WorkflowAgentTracePersistenceService {
@@ -30,36 +63,11 @@ export class WorkflowAgentTracePersistenceService {
private readonly fileAIChatService: FileAIChatService,
) {}
async persistTrace({
steps,
userPrompt,
agentId,
workspaceId,
workflowRunId,
workflowStepId,
totalInputTokens,
totalOutputTokens,
totalInputCredits,
totalOutputCredits,
contextWindowTokens,
conversationSize,
}: {
steps: StepResult<ToolSet>[];
userPrompt: string;
agentId: string | null;
workspaceId: string;
workflowRunId: string;
workflowStepId: string;
totalInputTokens: number;
totalOutputTokens: number;
totalInputCredits: number;
totalOutputCredits: number;
contextWindowTokens: number;
conversationSize: number;
}): Promise<{ turnId: string; threadId: string }> {
const title = userPrompt.substring(0, MAX_THREAD_TITLE_LENGTH);
async persistTrace(
params: PersistTraceParams,
): Promise<{ turnId: string; threadId: string }> {
const persistableParts = await mapGenerateTextStepsToPersistableParts({
steps,
steps: params.steps,
uploadGeneratedFile: async ({ file, filename }) => {
// Files are uploaded before the DB transaction; a later transaction
// failure can leave orphan uploads, which is acceptable here because
@@ -68,7 +76,7 @@ export class WorkflowAgentTracePersistenceService {
const uploadedFile = await this.fileAIChatService.uploadFile({
file,
filename,
workspaceId,
workspaceId: params.workspaceId,
});
return {
@@ -78,220 +86,201 @@ export class WorkflowAgentTracePersistenceService {
},
});
const persistedTrace = await this.threadRepository.manager.transaction(
const threadId = await this.ensureWorkflowTraceThread(params);
const turnId = await this.threadRepository.manager.transaction(
async (entityManager) => {
const threadId = await this.getOrCreateWorkflowTraceThread({
entityManager,
workspaceId,
title,
workflowRunId,
workflowStepId,
totalInputTokens,
totalOutputTokens,
totalInputCredits,
totalOutputCredits,
contextWindowTokens,
conversationSize,
});
const turnId = await this.insertTurn({
entityManager,
await this.applyThreadAggregates(entityManager, threadId, params);
return this.insertTurnWithMessages(entityManager, {
threadId,
agentId,
workspaceId,
persistableParts,
userPrompt: params.userPrompt,
agentId: params.agentId,
workspaceId: params.workspaceId,
});
const userMessageId = await this.insertUserMessage({
entityManager,
threadId,
turnId,
workspaceId,
});
await entityManager.getRepository(AgentMessagePartEntity).insert({
messageId: userMessageId,
orderIndex: 0,
type: 'text',
textContent: userPrompt,
workspaceId,
});
const assistantMessageId = await this.insertAssistantMessage({
entityManager,
threadId,
turnId,
agentId,
workspaceId,
});
if (persistableParts.length > 0) {
const dbParts = mapPersistablePartsToDBParts(
persistableParts,
assistantMessageId,
workspaceId,
);
if (dbParts.length > 0) {
await entityManager
.getRepository(AgentMessagePartEntity)
.insert(
dbParts as QueryDeepPartialEntity<AgentMessagePartEntity>[],
);
}
}
return { turnId, threadId };
},
);
this.logger.log(
`Persisted workflow agent trace: turnId=${persistedTrace.turnId} threadId=${persistedTrace.threadId} steps=${steps.length} parts=${persistableParts.length}`,
`Persisted workflow agent trace: turnId=${turnId} threadId=${threadId} steps=${params.steps.length} parts=${persistableParts.length}`,
);
return persistedTrace;
return { turnId, threadId };
}
private async getOrCreateWorkflowTraceThread({
entityManager,
workspaceId,
title,
workflowRunId,
workflowStepId,
totalInputTokens,
totalOutputTokens,
totalInputCredits,
totalOutputCredits,
contextWindowTokens,
conversationSize,
}: {
entityManager: EntityManager;
workspaceId: string;
title: string;
workflowRunId: string;
workflowStepId: string;
totalInputTokens: number;
totalOutputTokens: number;
totalInputCredits: number;
totalOutputCredits: number;
contextWindowTokens: number;
conversationSize: number;
}) {
const threadRepository = entityManager.getRepository(AgentChatThreadEntity);
// Workflow actions currently execute sequentially for a given
// (workspaceId, workflowRunId, workflowStepId), so this read-then-insert
// flow is sufficient despite the usual TOCTOU caveat.
const existingThread = await threadRepository.findOne({
where: {
workspaceId,
workflowRunId,
workflowStepId,
},
select: ['id'],
private async ensureWorkflowTraceThread(
params: WorkflowTraceThreadKey & { userPrompt: string },
): Promise<string> {
const { workspaceId, workflowRunId, workflowStepId } = params;
const existingThread = await this.findWorkflowTraceThread({
workspaceId,
workflowRunId,
workflowStepId,
});
const threadInsertPayload = {
title,
totalInputTokens,
totalOutputTokens,
totalInputCredits,
totalOutputCredits,
contextWindowTokens,
conversationSize,
};
if (existingThread) {
await threadRepository.update(existingThread.id, {
title,
totalInputTokens: () => `"totalInputTokens" + ${totalInputTokens}`,
totalOutputTokens: () => `"totalOutputTokens" + ${totalOutputTokens}`,
totalInputCredits: () => `"totalInputCredits" + ${totalInputCredits}`,
totalOutputCredits: () =>
`"totalOutputCredits" + ${totalOutputCredits}`,
contextWindowTokens,
conversationSize,
});
return existingThread.id;
}
const insertResult = await threadRepository.insert({
try {
const insertResult = await this.threadRepository.insert({
workspaceId,
userWorkspaceId: null,
workflowRunId,
workflowStepId,
title: params.userPrompt.substring(0, MAX_THREAD_TITLE_LENGTH),
totalInputTokens: 0,
totalOutputTokens: 0,
totalInputCredits: 0,
totalOutputCredits: 0,
});
return insertResult.identifiers[0].id as string;
} catch (error) {
// A concurrent persistTrace for the same (workspaceId, workflowRunId,
// workflowStepId) can race past findOne and hit the partial unique
// index. Re-read to get the winning thread id instead of failing.
if (!isUniqueViolationError(error)) {
throw error;
}
const winningThread = await this.findWorkflowTraceThread({
workspaceId,
workflowRunId,
workflowStepId,
});
if (!winningThread) {
throw error;
}
return winningThread.id;
}
}
private findWorkflowTraceThread(key: WorkflowTraceThreadKey) {
return this.threadRepository.findOne({
where: key,
select: ['id'],
});
}
private async applyThreadAggregates(
entityManager: EntityManager,
threadId: string,
params: WorkflowTraceThreadStats & { userPrompt: string },
): Promise<void> {
await entityManager.getRepository(AgentChatThreadEntity).update(threadId, {
title: params.userPrompt.substring(0, MAX_THREAD_TITLE_LENGTH),
totalInputTokens: buildColumnIncrementExpression(
'totalInputTokens',
params.totalInputTokens,
),
totalOutputTokens: buildColumnIncrementExpression(
'totalOutputTokens',
params.totalOutputTokens,
),
totalInputCredits: buildColumnIncrementExpression(
'totalInputCredits',
params.totalInputCredits,
),
totalOutputCredits: buildColumnIncrementExpression(
'totalOutputCredits',
params.totalOutputCredits,
),
contextWindowTokens: params.contextWindowTokens,
conversationSize: params.conversationSize,
});
}
private async insertTurnWithMessages(
entityManager: EntityManager,
params: {
threadId: string;
persistableParts: Awaited<
ReturnType<typeof mapGenerateTextStepsToPersistableParts>
>;
userPrompt: string;
agentId: string | null;
workspaceId: string;
},
): Promise<string> {
const { threadId, persistableParts, userPrompt, agentId, workspaceId } =
params;
const turnId = await this.insertAndGetId(
entityManager.getRepository(AgentTurnEntity),
{ threadId, agentId, workspaceId },
);
const userMessageId = await this.insertMessage(entityManager, {
threadId,
turnId,
role: AgentMessageRole.USER,
agentId: null,
workspaceId,
userWorkspaceId: null,
workflowRunId,
workflowStepId,
...threadInsertPayload,
});
return insertResult.identifiers[0].id as string;
}
await entityManager.getRepository(AgentMessagePartEntity).insert({
messageId: userMessageId,
orderIndex: 0,
type: 'text',
textContent: userPrompt,
workspaceId,
});
private async insertTurn({
entityManager,
threadId,
agentId,
workspaceId,
}: {
entityManager: EntityManager;
threadId: string;
agentId: string | null;
workspaceId: string;
}) {
const insertResult = await entityManager
.getRepository(AgentTurnEntity)
.insert({
threadId,
agentId,
const assistantMessageId = await this.insertMessage(entityManager, {
threadId,
turnId,
role: AgentMessageRole.ASSISTANT,
agentId,
workspaceId,
});
if (persistableParts.length > 0) {
const databaseParts = mapPersistablePartsToDatabaseParts(
persistableParts,
assistantMessageId,
workspaceId,
});
);
return insertResult.identifiers[0].id as string;
await entityManager
.getRepository(AgentMessagePartEntity)
.insert(
databaseParts as QueryDeepPartialEntity<AgentMessagePartEntity>[],
);
}
return turnId;
}
private async insertUserMessage({
entityManager,
threadId,
turnId,
workspaceId,
}: {
entityManager: EntityManager;
threadId: string;
turnId: string;
workspaceId: string;
}) {
const insertResult = await entityManager
.getRepository(AgentMessageEntity)
.insert({
threadId,
turnId,
role: AgentMessageRole.USER,
private insertMessage(
entityManager: EntityManager,
params: {
threadId: string;
turnId: string;
role: AgentMessageRole;
agentId: string | null;
workspaceId: string;
},
): Promise<string> {
return this.insertAndGetId(
entityManager.getRepository(AgentMessageEntity),
{
threadId: params.threadId,
turnId: params.turnId,
role: params.role,
agentId: params.agentId,
processedAt: new Date(),
workspaceId,
});
return insertResult.identifiers[0].id as string;
workspaceId: params.workspaceId,
},
);
}
private async insertAssistantMessage({
entityManager,
threadId,
turnId,
agentId,
workspaceId,
}: {
entityManager: EntityManager;
threadId: string;
turnId: string;
agentId: string | null;
workspaceId: string;
}) {
const insertResult = await entityManager
.getRepository(AgentMessageEntity)
.insert({
threadId,
turnId,
role: AgentMessageRole.ASSISTANT,
agentId,
processedAt: new Date(),
workspaceId,
});
private async insertAndGetId<T extends { id: string }>(
repository: Repository<T>,
payload: QueryDeepPartialEntity<T>,
): Promise<string> {
const insertResult = await repository.insert(payload);
return insertResult.identifiers[0].id as string;
}

View File

@@ -1,6 +1,7 @@
import {
type ExtendedFileUIPart,
type ExtendedUIMessagePart,
getToolApproval,
} from 'twenty-shared/ai';
import { type AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message-part.entity';
@@ -9,36 +10,6 @@ import { type AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-a
// A parallel mapping for GraphQL DTOs exists in the frontend at:
// packages/twenty-front/src/modules/ai/utils/mapDBPartToUIMessagePart.ts
const getToolApproval = (
errorDetails: Record<string, unknown> | null,
):
| {
id: string;
approved?: boolean;
reason?: string;
}
| undefined => {
const approval =
errorDetails &&
typeof errorDetails === 'object' &&
'approval' in errorDetails &&
typeof errorDetails.approval === 'object' &&
errorDetails.approval !== null
? (errorDetails.approval as Record<string, unknown>)
: null;
if (!approval || typeof approval.id !== 'string') {
return undefined;
}
return {
id: approval.id,
approved:
typeof approval.approved === 'boolean' ? approval.approved : undefined,
reason: typeof approval.reason === 'string' ? approval.reason : undefined,
};
};
export const mapDBPartToUIMessagePart = (
part: AgentMessagePartEntity,
): ExtendedUIMessagePart | null => {

View File

@@ -11,7 +11,7 @@ const assertUnreachable = (value: never): never => {
throw new Error(`Unsupported persistable part: ${JSON.stringify(value)}`);
};
export const mapPersistablePartsToDBParts = (
export const mapPersistablePartsToDatabaseParts = (
parts: PersistableAgentMessagePart[],
messageId: string,
workspaceId: string,

View File

@@ -1,7 +1,7 @@
import { type ExtendedUIMessagePart } from 'twenty-shared/ai';
import { type AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message-part.entity';
import { mapPersistablePartsToDBParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDBParts';
import { mapPersistablePartsToDatabaseParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDatabaseParts';
import { mapUIMessagePartsToPersistableParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToPersistableParts';
export const mapUIMessagePartsToDBParts = (
@@ -9,7 +9,7 @@ export const mapUIMessagePartsToDBParts = (
messageId: string,
workspaceId: string,
): Partial<AgentMessagePartEntity>[] => {
return mapPersistablePartsToDBParts(
return mapPersistablePartsToDatabaseParts(
mapUIMessagePartsToPersistableParts(uiMessageParts),
messageId,
workspaceId,

View File

@@ -38,7 +38,10 @@ export type {
export type { ExtendedUIMessagePart } from './types/ExtendedUIMessagePart';
export type { ModelConfiguration } from './types/model-configuration.type';
export type { NavigateAppToolOutput } from './types/NavigateAppToolOutput';
export type { ToolApproval } from './utils/get-tool-approval.util';
export { getToolApproval } from './utils/get-tool-approval.util';
export { inferAiSdkPackage } from './utils/infer-ai-sdk-package.util';
export { isAgentCapabilityEnabled } from './utils/is-agent-capability-enabled.util';
export { isAiSdkPackage } from './utils/is-ai-sdk-package.util';
export { isDataResidency } from './utils/is-data-residency.util';
export { isToolPartErrored } from './utils/is-tool-part-errored.util';

View File

@@ -0,0 +1,45 @@
import { getToolApproval } from '../get-tool-approval.util';
describe('getToolApproval', () => {
it('returns undefined when errorDetails is missing or malformed', () => {
expect(getToolApproval(null)).toBeUndefined();
expect(getToolApproval(undefined)).toBeUndefined();
expect(getToolApproval('string')).toBeUndefined();
expect(getToolApproval({})).toBeUndefined();
expect(getToolApproval({ approval: null })).toBeUndefined();
expect(getToolApproval({ approval: 'not-an-object' })).toBeUndefined();
expect(getToolApproval({ approval: {} })).toBeUndefined();
});
it('returns the approval with only the id when optional fields are missing', () => {
expect(getToolApproval({ approval: { id: 'approval-1' } })).toEqual({
id: 'approval-1',
approved: undefined,
reason: undefined,
});
});
it('returns all fields when present and correctly typed', () => {
expect(
getToolApproval({
approval: { id: 'approval-1', approved: true, reason: 'Looks good' },
}),
).toEqual({
id: 'approval-1',
approved: true,
reason: 'Looks good',
});
});
it('omits optional fields that have the wrong runtime type', () => {
expect(
getToolApproval({
approval: { id: 'approval-1', approved: 'yes', reason: 42 },
}),
).toEqual({
id: 'approval-1',
approved: undefined,
reason: undefined,
});
});
});

View File

@@ -0,0 +1,16 @@
import { isToolPartErrored } from '../is-tool-part-errored.util';
describe('isToolPartErrored', () => {
it('returns true for error states', () => {
expect(isToolPartErrored('output-error')).toBe(true);
expect(isToolPartErrored('output-denied')).toBe(true);
});
it('returns false for non-error states', () => {
expect(isToolPartErrored('input-streaming')).toBe(false);
expect(isToolPartErrored('input-available')).toBe(false);
expect(isToolPartErrored('approval-requested')).toBe(false);
expect(isToolPartErrored('approval-responded')).toBe(false);
expect(isToolPartErrored('output-available')).toBe(false);
});
});

View File

@@ -0,0 +1,29 @@
export type ToolApproval = {
id: string;
approved?: boolean;
reason?: string;
};
export const getToolApproval = (
errorDetails: unknown,
): ToolApproval | undefined => {
const approval =
errorDetails &&
typeof errorDetails === 'object' &&
'approval' in errorDetails &&
typeof errorDetails.approval === 'object' &&
errorDetails.approval !== null
? (errorDetails.approval as Record<string, unknown>)
: null;
if (!approval || typeof approval.id !== 'string') {
return undefined;
}
return {
id: approval.id,
approved:
typeof approval.approved === 'boolean' ? approval.approved : undefined,
reason: typeof approval.reason === 'string' ? approval.reason : undefined,
};
};

View File

@@ -0,0 +1,4 @@
import { type ToolUIPart } from 'ai';
export const isToolPartErrored = (state: ToolUIPart['state']): boolean =>
state === 'output-error' || state === 'output-denied';