From cece97924c1937de7bf73f0d25bae7959f654f7b Mon Sep 17 00:00:00 2001 From: ehconitin Date: Fri, 17 Apr 2026 17:43:23 +0530 Subject: [PATCH] more --- .../ai/components/ThinkingStepsDisplay.tsx | 10 +- .../ai/components/ToolStepRenderer.tsx | 8 +- .../thinkingStepsDisplayState.test.ts | 16 +- .../ai/utils/isThinkingStepPartActive.ts | 9 +- .../ai/utils/mapDBPartToUIMessagePart.ts | 23 +- .../WorkflowRunAiAgentTraceDetail.tsx | 66 +-- ...ow-agent-trace-persistence.service.spec.ts | 297 +++++++----- ...orkflow-agent-trace-persistence.service.ts | 423 +++++++++--------- .../utils/mapDBPartToUIMessagePart.ts | 31 +- .../mapPersistablePartsToDatabaseParts.ts | 2 +- .../utils/mapUIMessagePartsToDBParts.ts | 4 +- packages/twenty-shared/src/ai/index.ts | 3 + .../__tests__/get-tool-approval.util.spec.ts | 45 ++ .../is-tool-part-errored.util.spec.ts | 16 + .../src/ai/utils/get-tool-approval.util.ts | 29 ++ .../src/ai/utils/is-tool-part-errored.util.ts | 4 + 16 files changed, 554 insertions(+), 432 deletions(-) create mode 100644 packages/twenty-shared/src/ai/utils/__tests__/get-tool-approval.util.spec.ts create mode 100644 packages/twenty-shared/src/ai/utils/__tests__/is-tool-part-errored.util.spec.ts create mode 100644 packages/twenty-shared/src/ai/utils/get-tool-approval.util.ts create mode 100644 packages/twenty-shared/src/ai/utils/is-tool-part-errored.util.ts diff --git a/packages/twenty-front/src/modules/ai/components/ThinkingStepsDisplay.tsx b/packages/twenty-front/src/modules/ai/components/ThinkingStepsDisplay.tsx index b78959bcbd4..2f6d65e50d3 100644 --- a/packages/twenty-front/src/modules/ai/components/ThinkingStepsDisplay.tsx +++ b/packages/twenty-front/src/modules/ai/components/ThinkingStepsDisplay.tsx @@ -2,6 +2,7 @@ import { styled } from '@linaria/react'; import { plural, t } from '@lingui/core/macro'; import { useState } from 'react'; import { type ToolUIPart } from 'ai'; +import { isToolPartErrored } from 'twenty-shared/ai'; import { IconChevronRight, IconCpu, @@ -271,7 +272,7 @@ const ThinkingToolStepRow = ({ const ToolIcon = getToolIcon(resolvedToolName); const label = getToolDisplayMessage(part.input, rawToolName, !isActive); - const hasError = (part.errorText?.trim().length ?? 0) > 0; + const hasError = isToolPartErrored(part.state); const isExpandable = isToolOutputInspectable(part.output) || hasError; const outputObj = @@ -329,7 +330,12 @@ const ThinkingToolStepRow = ({ {hasError ? ( - {part.errorText} + // Legacy persisted tool parts can have an empty errorText while + // state === 'output-error' (old mapper stored errorMessage ?? ''). + // Show a fallback label so the expanded panel isn't blank. + + {part.errorText?.trim() || t`Unknown error`} + ) : ( diff --git a/packages/twenty-front/src/modules/ai/components/ToolStepRenderer.tsx b/packages/twenty-front/src/modules/ai/components/ToolStepRenderer.tsx index dbf777406e0..6594d749602 100644 --- a/packages/twenty-front/src/modules/ai/components/ToolStepRenderer.tsx +++ b/packages/twenty-front/src/modules/ai/components/ToolStepRenderer.tsx @@ -16,6 +16,7 @@ import { getToolIcon } from '@/ai/utils/getToolIcon'; import { isToolOutputInspectable } from '@/ai/utils/isToolOutputInspectable'; import { useLingui } from '@lingui/react/macro'; import { type ToolUIPart } from 'ai'; +import { isToolPartErrored } from 'twenty-shared/ai'; import { type JsonValue } from 'type-fest'; import { useCopyToClipboard } from '~/hooks/useCopyToClipboard'; @@ -146,7 +147,7 @@ export const ToolStepRenderer = ({ const { resolvedInput: toolInput, resolvedToolName: toolName } = resolveToolInput(input, rawToolName); - const hasError = (errorText?.trim().length ?? 0) > 0; + const hasError = isToolPartErrored(toolPart.state); const isExpandable = isToolOutputInspectable(output) || hasError; const ToolIcon = getToolIcon(toolName); @@ -258,7 +259,10 @@ export const ToolStepRenderer = ({ {hasError ? ( - errorText + // Legacy persisted tool parts can have an empty errorText while + // state === 'output-error' (old mapper stored errorMessage ?? ''). + // Show a fallback label so the expanded panel isn't blank. + errorText?.trim() || t`Unknown error` ) : ( <> diff --git a/packages/twenty-front/src/modules/ai/utils/__tests__/thinkingStepsDisplayState.test.ts b/packages/twenty-front/src/modules/ai/utils/__tests__/thinkingStepsDisplayState.test.ts index 9a6fbc3417a..614ef067c53 100644 --- a/packages/twenty-front/src/modules/ai/utils/__tests__/thinkingStepsDisplayState.test.ts +++ b/packages/twenty-front/src/modules/ai/utils/__tests__/thinkingStepsDisplayState.test.ts @@ -23,12 +23,14 @@ const createToolPart = ({ errorText, input = {}, output, + state = 'output-available', type = 'tool-web_search', }: { type?: `tool-${string}`; input?: Record; output?: unknown; errorText?: string; + state?: string; } = {}): ThinkingStepPart => ({ type, @@ -36,7 +38,7 @@ const createToolPart = ({ input, output, errorText, - state: 'output-available', + state, }) as ThinkingStepPart; describe('thinkingStepsDisplayState', () => { @@ -102,8 +104,9 @@ describe('thinkingStepsDisplayState', () => { expect(isThinkingStepPartActive(toolPart, false)).toBe(false); }); - it('should not treat empty persisted error text as a tool error', () => { + it('should treat non-error states as active regardless of errorText', () => { const toolPart = createToolPart({ + state: 'input-available', output: undefined, errorText: '', }); @@ -111,17 +114,24 @@ describe('thinkingStepsDisplayState', () => { expect(isThinkingStepPartActive(toolPart, true)).toBe(true); }); - it('should mark tool parts with output or error as inactive', () => { + it('should mark tool parts with output or error state as inactive', () => { const completedToolPart = createToolPart({ + state: 'output-available', output: { result: { ok: true } }, }); const failedToolPart = createToolPart({ + state: 'output-error', output: undefined, errorText: 'Tool failed', }); + const deniedToolPart = createToolPart({ + state: 'output-denied', + output: undefined, + }); expect(isThinkingStepPartActive(completedToolPart, true)).toBe(false); expect(isThinkingStepPartActive(failedToolPart, true)).toBe(false); + expect(isThinkingStepPartActive(deniedToolPart, true)).toBe(false); }); }); diff --git a/packages/twenty-front/src/modules/ai/utils/isThinkingStepPartActive.ts b/packages/twenty-front/src/modules/ai/utils/isThinkingStepPartActive.ts index a138e2a69f7..368e5113a7c 100644 --- a/packages/twenty-front/src/modules/ai/utils/isThinkingStepPartActive.ts +++ b/packages/twenty-front/src/modules/ai/utils/isThinkingStepPartActive.ts @@ -1,3 +1,4 @@ +import { isToolPartErrored } from 'twenty-shared/ai'; import { isDefined } from 'twenty-shared/utils'; import { type ThinkingStepPart } from '@/ai/utils/thinkingStepPart'; @@ -10,7 +11,9 @@ export const isThinkingStepPartActive = ( return part.state === 'streaming'; } - const hasError = (part.errorText?.trim().length ?? 0) > 0; - - return isLastMessageStreaming && !isDefined(part.output) && !hasError; + return ( + isLastMessageStreaming && + !isDefined(part.output) && + !isToolPartErrored(part.state) + ); }; diff --git a/packages/twenty-front/src/modules/ai/utils/mapDBPartToUIMessagePart.ts b/packages/twenty-front/src/modules/ai/utils/mapDBPartToUIMessagePart.ts index d427a526cfb..77550bbee7c 100644 --- a/packages/twenty-front/src/modules/ai/utils/mapDBPartToUIMessagePart.ts +++ b/packages/twenty-front/src/modules/ai/utils/mapDBPartToUIMessagePart.ts @@ -2,6 +2,7 @@ import { type ReasoningUIPart, type ToolUIPart } from 'ai'; import { type ExtendedFileUIPart, type ExtendedUIMessagePart, + getToolApproval, } from 'twenty-shared/ai'; import { type AgentMessagePart } from '~/generated-metadata/graphql'; @@ -9,28 +10,6 @@ import { type AgentMessagePart } from '~/generated-metadata/graphql'; // A parallel mapping for TypeORM entities exists in the server at: // packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapDBPartsToUIMessageParts.ts -const getToolApproval = (errorDetails: unknown) => { - const approval = - errorDetails && - typeof errorDetails === 'object' && - 'approval' in errorDetails && - typeof errorDetails.approval === 'object' && - errorDetails.approval !== null - ? (errorDetails.approval as Record) - : null; - - if (!approval || typeof approval.id !== 'string') { - return undefined; - } - - return { - id: approval.id, - approved: - typeof approval.approved === 'boolean' ? approval.approved : undefined, - reason: typeof approval.reason === 'string' ? approval.reason : undefined, - }; -}; - export const mapDBPartToUIMessagePart = ( part: AgentMessagePart, ): ExtendedUIMessagePart => { diff --git a/packages/twenty-front/src/modules/workflow/workflow-steps/components/WorkflowRunAiAgentTraceDetail.tsx b/packages/twenty-front/src/modules/workflow/workflow-steps/components/WorkflowRunAiAgentTraceDetail.tsx index 4a747c9075b..2c1d4475e0f 100644 --- a/packages/twenty-front/src/modules/workflow/workflow-steps/components/WorkflowRunAiAgentTraceDetail.tsx +++ b/packages/twenty-front/src/modules/workflow/workflow-steps/components/WorkflowRunAiAgentTraceDetail.tsx @@ -4,7 +4,7 @@ import { mapDBMessagesToUIMessages } from '@/ai/utils/mapDBMessagesToUIMessages' import { useQuery } from '@apollo/client/react'; import { styled } from '@linaria/react'; import { t } from '@lingui/core/macro'; -import { type ReactNode, useState } from 'react'; +import { useState } from 'react'; import Skeleton from 'react-loading-skeleton'; import { IconChevronRight } from 'twenty-ui/display'; import { AnimatedExpandableContainer } from 'twenty-ui/layout'; @@ -24,13 +24,13 @@ const StyledContainer = styled.div` gap: ${themeCssVariables.spacing[2]}; `; -const StyledCollapsibleSection = styled.div` +const StyledPromptSection = styled.div` display: flex; flex-direction: column; gap: ${themeCssVariables.spacing[1]}; `; -const StyledSectionToggle = styled.button` +const StyledPromptToggle = styled.button` align-items: center; background: none; border: none; @@ -69,11 +69,6 @@ const StyledChevron = styled.span<{ isExpanded: boolean }>` ease-in-out; `; -const StyledSectionBody = styled.div` - color: ${themeCssVariables.font.color.primary}; - padding-top: ${themeCssVariables.spacing[1]}; -`; - const StyledPromptBody = styled.div` background: ${themeCssVariables.background.transparent.lighter}; border: 1px solid ${themeCssVariables.border.color.light}; @@ -81,6 +76,7 @@ const StyledPromptBody = styled.div` color: ${themeCssVariables.font.color.tertiary}; font-size: ${themeCssVariables.font.size.md}; line-height: ${themeCssVariables.text.lineHeight.lg}; + margin-top: ${themeCssVariables.spacing[1]}; padding: ${themeCssVariables.spacing[3]}; white-space: pre-wrap; `; @@ -100,38 +96,6 @@ const StyledTraceUnavailable = styled.div` font-size: ${themeCssVariables.font.size.md}; `; -type CollapsibleSectionProps = { - label: string; - defaultOpen?: boolean; - children: ReactNode; -}; - -const CollapsibleSection = ({ - label, - defaultOpen = false, - children, -}: CollapsibleSectionProps) => { - const [isExpanded, setIsExpanded] = useState(defaultOpen); - - return ( - - setIsExpanded((previous) => !previous)} - > - - - - {label} - - - {children} - - - ); -}; - const extractPromptText = (userMessage: AgentMessage | undefined): string => { if (!userMessage) return ''; @@ -157,6 +121,7 @@ export const WorkflowRunAiAgentTraceDetail = ({ variables: { workflowRunId, workflowStepId }, }, ); + const [isPromptExpanded, setIsPromptExpanded] = useState(false); if (loading) { return ; @@ -180,9 +145,24 @@ export const WorkflowRunAiAgentTraceDetail = ({ return ( {promptText.length > 0 && ( - - {promptText} - + + setIsPromptExpanded((previous) => !previous)} + > + + + + {t`Prompt`} + + + {promptText} + + )} {uiMessages.map((message) => ( diff --git a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/services/workflow-agent-trace-persistence.service.spec.ts b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/services/workflow-agent-trace-persistence.service.spec.ts index 56d3d8860e2..4463495cf18 100644 --- a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/services/workflow-agent-trace-persistence.service.spec.ts +++ b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/services/workflow-agent-trace-persistence.service.spec.ts @@ -12,133 +12,216 @@ const createInsertResult = (id: string) => identifiers: [{ id }], }) as never; +type TraceTestHarness = { + service: WorkflowAgentTracePersistenceService; + threadRepository: { + findOne: jest.Mock; + insert: jest.Mock; + manager: { transaction: jest.Mock }; + }; + threadRepositoryInTx: { update: jest.Mock }; + turnRepository: { insert: jest.Mock }; + messageRepository: { insert: jest.Mock }; + messagePartRepository: { insert: jest.Mock }; + fileAIChatService: jest.Mocked; +}; + +const createHarness = (): TraceTestHarness => { + const threadRepositoryInTx = { + update: jest.fn().mockResolvedValue(undefined), + }; + const turnRepository = { + insert: jest.fn().mockResolvedValue(createInsertResult('turn-1')), + }; + const messageRepository = { + insert: jest + .fn() + .mockResolvedValueOnce(createInsertResult('user-message-1')) + .mockResolvedValueOnce(createInsertResult('assistant-message-1')) + .mockResolvedValueOnce(createInsertResult('user-message-2')) + .mockResolvedValueOnce(createInsertResult('assistant-message-2')), + }; + const messagePartRepository = { + insert: jest.fn().mockResolvedValue(undefined), + }; + const entityManager = { + getRepository: jest.fn((entity) => { + if (entity === AgentChatThreadEntity) { + return threadRepositoryInTx; + } + if (entity === AgentTurnEntity) { + return turnRepository; + } + if (entity === AgentMessageEntity) { + return messageRepository; + } + if (entity === AgentMessagePartEntity) { + return messagePartRepository; + } + + throw new Error(`Unexpected entity: ${String(entity)}`); + }), + }; + const threadRepository = { + findOne: jest.fn(), + insert: jest.fn(), + manager: { + transaction: jest.fn(async (callback) => callback(entityManager)), + }, + }; + const fileAIChatService = { + uploadFile: jest.fn(), + } as unknown as jest.Mocked; + + const service = new WorkflowAgentTracePersistenceService( + threadRepository as never, + fileAIChatService, + ); + + return { + service, + threadRepository, + threadRepositoryInTx, + turnRepository, + messageRepository, + messagePartRepository, + fileAIChatService, + }; +}; + +const basePersistParams = { + userPrompt: 'Do the thing', + agentId: 'agent-1', + workspaceId: 'workspace-1', + workflowRunId: 'workflow-run-1', + workflowStepId: 'workflow-step-1', + totalInputTokens: 10, + totalOutputTokens: 20, + totalInputCredits: 30, + totalOutputCredits: 40, + contextWindowTokens: 50, + conversationSize: 60, +}; + +const oneTextStep = [ + { content: [{ type: 'text', text: 'Answer' }] }, +] as StepResult[]; + describe('WorkflowAgentTracePersistenceService', () => { - it('reuses the workflow trace thread when the same step executes again', async () => { - const threadEntityRepository = { - findOne: jest - .fn() - .mockResolvedValueOnce(null) - .mockResolvedValueOnce({ id: 'thread-1' }), - insert: jest.fn().mockResolvedValue(createInsertResult('thread-1')), - update: jest.fn().mockResolvedValue(undefined), - }; - const turnRepository = { - insert: jest - .fn() - .mockResolvedValueOnce(createInsertResult('turn-1')) - .mockResolvedValueOnce(createInsertResult('turn-2')), - }; - const messageRepository = { - insert: jest - .fn() - .mockResolvedValueOnce(createInsertResult('user-message-1')) - .mockResolvedValueOnce(createInsertResult('assistant-message-1')) - .mockResolvedValueOnce(createInsertResult('user-message-2')) - .mockResolvedValueOnce(createInsertResult('assistant-message-2')), - }; - const messagePartRepository = { - insert: jest.fn().mockResolvedValue(undefined), - }; + it('creates a new thread with zero-initialized stats and applies the run stats inside the transaction', async () => { + const harness = createHarness(); - const entityManager = { - getRepository: jest.fn((entity) => { - if (entity === AgentChatThreadEntity) { - return threadEntityRepository; - } - if (entity === AgentTurnEntity) { - return turnRepository; - } - if (entity === AgentMessageEntity) { - return messageRepository; - } - if (entity === AgentMessagePartEntity) { - return messagePartRepository; - } - - throw new Error(`Unexpected entity: ${String(entity)}`); - }), - }; - - const threadRepository = { - manager: { - transaction: jest.fn(async (callback) => callback(entityManager)), - }, - } as never; - const fileAIChatService = { - uploadFile: jest.fn(), - } as unknown as jest.Mocked; - - const service = new WorkflowAgentTracePersistenceService( - threadRepository, - fileAIChatService, + harness.threadRepository.findOne.mockResolvedValueOnce(null); + harness.threadRepository.insert.mockResolvedValueOnce( + createInsertResult('thread-1'), ); - const steps = [ - { - content: [{ type: 'text', text: 'Answer' }], - }, - ] as StepResult[]; - const firstPersistedTrace = await service.persistTrace({ - steps, - userPrompt: 'First prompt', - agentId: 'agent-1', - workspaceId: 'workspace-1', - workflowRunId: 'workflow-run-1', - workflowStepId: 'workflow-step-1', - totalInputTokens: 10, - totalOutputTokens: 20, - totalInputCredits: 30, - totalOutputCredits: 40, - contextWindowTokens: 50, - conversationSize: 60, + const persistedTrace = await harness.service.persistTrace({ + ...basePersistParams, + steps: oneTextStep, }); - const secondPersistedTrace = await service.persistTrace({ - steps, - userPrompt: 'Second prompt', - agentId: 'agent-1', - workspaceId: 'workspace-1', - workflowRunId: 'workflow-run-1', - workflowStepId: 'workflow-step-1', - totalInputTokens: 11, - totalOutputTokens: 21, - totalInputCredits: 31, - totalOutputCredits: 41, - contextWindowTokens: 51, - conversationSize: 61, - }); - - expect(firstPersistedTrace).toEqual({ + expect(persistedTrace).toEqual({ turnId: 'turn-1', threadId: 'thread-1', }); - expect(secondPersistedTrace).toEqual({ - turnId: 'turn-2', - threadId: 'thread-1', - }); - expect(threadEntityRepository.insert).toHaveBeenCalledTimes(1); - expect(threadEntityRepository.update).toHaveBeenCalledWith( + + expect(harness.threadRepository.insert).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: 'workspace-1', + userWorkspaceId: null, + workflowRunId: 'workflow-run-1', + workflowStepId: 'workflow-step-1', + title: 'Do the thing', + totalInputTokens: 0, + totalOutputTokens: 0, + totalInputCredits: 0, + totalOutputCredits: 0, + }), + ); + + expect(harness.threadRepositoryInTx.update).toHaveBeenCalledWith( 'thread-1', expect.objectContaining({ - title: 'Second prompt', + title: 'Do the thing', totalInputTokens: expect.any(Function), totalOutputTokens: expect.any(Function), totalInputCredits: expect.any(Function), totalOutputCredits: expect.any(Function), - contextWindowTokens: 51, - conversationSize: 61, + contextWindowTokens: 50, + conversationSize: 60, }), ); - const updatePayload = threadEntityRepository.update.mock.calls[0][1]; + const updatePayload = harness.threadRepositoryInTx.update.mock.calls[0][1]; - expect(updatePayload.totalInputTokens()).toBe('"totalInputTokens" + 11'); - expect(updatePayload.totalOutputTokens()).toBe('"totalOutputTokens" + 21'); - expect(updatePayload.totalInputCredits()).toBe('"totalInputCredits" + 31'); + expect(updatePayload.totalInputTokens()).toBe('"totalInputTokens" + 10'); + expect(updatePayload.totalOutputTokens()).toBe('"totalOutputTokens" + 20'); + expect(updatePayload.totalInputCredits()).toBe('"totalInputCredits" + 30'); expect(updatePayload.totalOutputCredits()).toBe( - '"totalOutputCredits" + 41', + '"totalOutputCredits" + 40', ); - expect(messagePartRepository.insert).toHaveBeenCalledTimes(4); - expect(fileAIChatService.uploadFile).not.toHaveBeenCalled(); + + expect(harness.messageRepository.insert).toHaveBeenCalledTimes(2); + expect(harness.messagePartRepository.insert).toHaveBeenCalledTimes(2); + expect(harness.fileAIChatService.uploadFile).not.toHaveBeenCalled(); + }); + + it('reuses the existing thread when the same workflow step executes again', async () => { + const harness = createHarness(); + + harness.threadRepository.findOne.mockResolvedValueOnce({ id: 'thread-1' }); + + const persistedTrace = await harness.service.persistTrace({ + ...basePersistParams, + userPrompt: 'Second prompt', + totalInputTokens: 11, + steps: oneTextStep, + }); + + expect(persistedTrace.threadId).toBe('thread-1'); + expect(harness.threadRepository.insert).not.toHaveBeenCalled(); + + const updatePayload = harness.threadRepositoryInTx.update.mock.calls[0][1]; + + expect(updatePayload.title).toBe('Second prompt'); + expect(updatePayload.totalInputTokens()).toBe('"totalInputTokens" + 11'); + }); + + it('recovers when a concurrent persist wins the unique-index race', async () => { + const harness = createHarness(); + const uniqueViolation = Object.assign(new Error('duplicate'), { + code: '23505', + }); + + harness.threadRepository.findOne + .mockResolvedValueOnce(null) + .mockResolvedValueOnce({ id: 'thread-winner' }); + harness.threadRepository.insert.mockRejectedValueOnce(uniqueViolation); + + const persistedTrace = await harness.service.persistTrace({ + ...basePersistParams, + steps: oneTextStep, + }); + + expect(persistedTrace.threadId).toBe('thread-winner'); + expect(harness.threadRepository.findOne).toHaveBeenCalledTimes(2); + }); + + it('skips assistant part inserts when the step produced no persistable parts', async () => { + const harness = createHarness(); + + harness.threadRepository.findOne.mockResolvedValueOnce(null); + harness.threadRepository.insert.mockResolvedValueOnce( + createInsertResult('thread-1'), + ); + + await harness.service.persistTrace({ + ...basePersistParams, + steps: [] as StepResult[], + }); + + // Only the user prompt part is inserted; no assistant parts. + expect(harness.messagePartRepository.insert).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/services/workflow-agent-trace-persistence.service.ts b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/services/workflow-agent-trace-persistence.service.ts index 59e8d00620c..17b279c6341 100644 --- a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/services/workflow-agent-trace-persistence.service.ts +++ b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/services/workflow-agent-trace-persistence.service.ts @@ -12,11 +12,44 @@ import { import { AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message-part.entity'; import { AgentTurnEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-turn.entity'; import { mapGenerateTextStepsToPersistableParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapGenerateTextStepsToPersistableParts'; -import { mapPersistablePartsToDBParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDBParts'; +import { mapPersistablePartsToDatabaseParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDatabaseParts'; import { AgentChatThreadEntity } from 'src/engine/metadata-modules/ai/ai-chat/entities/agent-chat-thread.entity'; import { FileAIChatService } from 'src/engine/core-modules/file/file-ai-chat/services/file-ai-chat.service'; const MAX_THREAD_TITLE_LENGTH = 100; +const POSTGRES_UNIQUE_VIOLATION_CODE = '23505'; + +type WorkflowTraceThreadKey = { + workspaceId: string; + workflowRunId: string; + workflowStepId: string; +}; + +type WorkflowTraceThreadStats = { + totalInputTokens: number; + totalOutputTokens: number; + totalInputCredits: number; + totalOutputCredits: number; + contextWindowTokens: number; + conversationSize: number; +}; + +type PersistTraceParams = WorkflowTraceThreadKey & + WorkflowTraceThreadStats & { + steps: StepResult[]; + userPrompt: string; + agentId: string | null; + }; + +const isUniqueViolationError = (error: unknown): error is { code: string } => + typeof error === 'object' && + error !== null && + 'code' in error && + (error as { code: unknown }).code === POSTGRES_UNIQUE_VIOLATION_CODE; + +const buildColumnIncrementExpression = + (columnName: string, delta: number) => () => + `"${columnName}" + ${delta}`; @Injectable() export class WorkflowAgentTracePersistenceService { @@ -30,36 +63,11 @@ export class WorkflowAgentTracePersistenceService { private readonly fileAIChatService: FileAIChatService, ) {} - async persistTrace({ - steps, - userPrompt, - agentId, - workspaceId, - workflowRunId, - workflowStepId, - totalInputTokens, - totalOutputTokens, - totalInputCredits, - totalOutputCredits, - contextWindowTokens, - conversationSize, - }: { - steps: StepResult[]; - userPrompt: string; - agentId: string | null; - workspaceId: string; - workflowRunId: string; - workflowStepId: string; - totalInputTokens: number; - totalOutputTokens: number; - totalInputCredits: number; - totalOutputCredits: number; - contextWindowTokens: number; - conversationSize: number; - }): Promise<{ turnId: string; threadId: string }> { - const title = userPrompt.substring(0, MAX_THREAD_TITLE_LENGTH); + async persistTrace( + params: PersistTraceParams, + ): Promise<{ turnId: string; threadId: string }> { const persistableParts = await mapGenerateTextStepsToPersistableParts({ - steps, + steps: params.steps, uploadGeneratedFile: async ({ file, filename }) => { // Files are uploaded before the DB transaction; a later transaction // failure can leave orphan uploads, which is acceptable here because @@ -68,7 +76,7 @@ export class WorkflowAgentTracePersistenceService { const uploadedFile = await this.fileAIChatService.uploadFile({ file, filename, - workspaceId, + workspaceId: params.workspaceId, }); return { @@ -78,220 +86,201 @@ export class WorkflowAgentTracePersistenceService { }, }); - const persistedTrace = await this.threadRepository.manager.transaction( + const threadId = await this.ensureWorkflowTraceThread(params); + + const turnId = await this.threadRepository.manager.transaction( async (entityManager) => { - const threadId = await this.getOrCreateWorkflowTraceThread({ - entityManager, - workspaceId, - title, - workflowRunId, - workflowStepId, - totalInputTokens, - totalOutputTokens, - totalInputCredits, - totalOutputCredits, - contextWindowTokens, - conversationSize, - }); - const turnId = await this.insertTurn({ - entityManager, + await this.applyThreadAggregates(entityManager, threadId, params); + + return this.insertTurnWithMessages(entityManager, { threadId, - agentId, - workspaceId, + persistableParts, + userPrompt: params.userPrompt, + agentId: params.agentId, + workspaceId: params.workspaceId, }); - const userMessageId = await this.insertUserMessage({ - entityManager, - threadId, - turnId, - workspaceId, - }); - - await entityManager.getRepository(AgentMessagePartEntity).insert({ - messageId: userMessageId, - orderIndex: 0, - type: 'text', - textContent: userPrompt, - workspaceId, - }); - - const assistantMessageId = await this.insertAssistantMessage({ - entityManager, - threadId, - turnId, - agentId, - workspaceId, - }); - - if (persistableParts.length > 0) { - const dbParts = mapPersistablePartsToDBParts( - persistableParts, - assistantMessageId, - workspaceId, - ); - - if (dbParts.length > 0) { - await entityManager - .getRepository(AgentMessagePartEntity) - .insert( - dbParts as QueryDeepPartialEntity[], - ); - } - } - - return { turnId, threadId }; }, ); this.logger.log( - `Persisted workflow agent trace: turnId=${persistedTrace.turnId} threadId=${persistedTrace.threadId} steps=${steps.length} parts=${persistableParts.length}`, + `Persisted workflow agent trace: turnId=${turnId} threadId=${threadId} steps=${params.steps.length} parts=${persistableParts.length}`, ); - return persistedTrace; + return { turnId, threadId }; } - private async getOrCreateWorkflowTraceThread({ - entityManager, - workspaceId, - title, - workflowRunId, - workflowStepId, - totalInputTokens, - totalOutputTokens, - totalInputCredits, - totalOutputCredits, - contextWindowTokens, - conversationSize, - }: { - entityManager: EntityManager; - workspaceId: string; - title: string; - workflowRunId: string; - workflowStepId: string; - totalInputTokens: number; - totalOutputTokens: number; - totalInputCredits: number; - totalOutputCredits: number; - contextWindowTokens: number; - conversationSize: number; - }) { - const threadRepository = entityManager.getRepository(AgentChatThreadEntity); - // Workflow actions currently execute sequentially for a given - // (workspaceId, workflowRunId, workflowStepId), so this read-then-insert - // flow is sufficient despite the usual TOCTOU caveat. - const existingThread = await threadRepository.findOne({ - where: { - workspaceId, - workflowRunId, - workflowStepId, - }, - select: ['id'], + private async ensureWorkflowTraceThread( + params: WorkflowTraceThreadKey & { userPrompt: string }, + ): Promise { + const { workspaceId, workflowRunId, workflowStepId } = params; + const existingThread = await this.findWorkflowTraceThread({ + workspaceId, + workflowRunId, + workflowStepId, }); - const threadInsertPayload = { - title, - totalInputTokens, - totalOutputTokens, - totalInputCredits, - totalOutputCredits, - contextWindowTokens, - conversationSize, - }; - if (existingThread) { - await threadRepository.update(existingThread.id, { - title, - totalInputTokens: () => `"totalInputTokens" + ${totalInputTokens}`, - totalOutputTokens: () => `"totalOutputTokens" + ${totalOutputTokens}`, - totalInputCredits: () => `"totalInputCredits" + ${totalInputCredits}`, - totalOutputCredits: () => - `"totalOutputCredits" + ${totalOutputCredits}`, - contextWindowTokens, - conversationSize, - }); - return existingThread.id; } - const insertResult = await threadRepository.insert({ + try { + const insertResult = await this.threadRepository.insert({ + workspaceId, + userWorkspaceId: null, + workflowRunId, + workflowStepId, + title: params.userPrompt.substring(0, MAX_THREAD_TITLE_LENGTH), + totalInputTokens: 0, + totalOutputTokens: 0, + totalInputCredits: 0, + totalOutputCredits: 0, + }); + + return insertResult.identifiers[0].id as string; + } catch (error) { + // A concurrent persistTrace for the same (workspaceId, workflowRunId, + // workflowStepId) can race past findOne and hit the partial unique + // index. Re-read to get the winning thread id instead of failing. + if (!isUniqueViolationError(error)) { + throw error; + } + + const winningThread = await this.findWorkflowTraceThread({ + workspaceId, + workflowRunId, + workflowStepId, + }); + + if (!winningThread) { + throw error; + } + + return winningThread.id; + } + } + + private findWorkflowTraceThread(key: WorkflowTraceThreadKey) { + return this.threadRepository.findOne({ + where: key, + select: ['id'], + }); + } + + private async applyThreadAggregates( + entityManager: EntityManager, + threadId: string, + params: WorkflowTraceThreadStats & { userPrompt: string }, + ): Promise { + await entityManager.getRepository(AgentChatThreadEntity).update(threadId, { + title: params.userPrompt.substring(0, MAX_THREAD_TITLE_LENGTH), + totalInputTokens: buildColumnIncrementExpression( + 'totalInputTokens', + params.totalInputTokens, + ), + totalOutputTokens: buildColumnIncrementExpression( + 'totalOutputTokens', + params.totalOutputTokens, + ), + totalInputCredits: buildColumnIncrementExpression( + 'totalInputCredits', + params.totalInputCredits, + ), + totalOutputCredits: buildColumnIncrementExpression( + 'totalOutputCredits', + params.totalOutputCredits, + ), + contextWindowTokens: params.contextWindowTokens, + conversationSize: params.conversationSize, + }); + } + + private async insertTurnWithMessages( + entityManager: EntityManager, + params: { + threadId: string; + persistableParts: Awaited< + ReturnType + >; + userPrompt: string; + agentId: string | null; + workspaceId: string; + }, + ): Promise { + const { threadId, persistableParts, userPrompt, agentId, workspaceId } = + params; + const turnId = await this.insertAndGetId( + entityManager.getRepository(AgentTurnEntity), + { threadId, agentId, workspaceId }, + ); + const userMessageId = await this.insertMessage(entityManager, { + threadId, + turnId, + role: AgentMessageRole.USER, + agentId: null, workspaceId, - userWorkspaceId: null, - workflowRunId, - workflowStepId, - ...threadInsertPayload, }); - return insertResult.identifiers[0].id as string; - } + await entityManager.getRepository(AgentMessagePartEntity).insert({ + messageId: userMessageId, + orderIndex: 0, + type: 'text', + textContent: userPrompt, + workspaceId, + }); - private async insertTurn({ - entityManager, - threadId, - agentId, - workspaceId, - }: { - entityManager: EntityManager; - threadId: string; - agentId: string | null; - workspaceId: string; - }) { - const insertResult = await entityManager - .getRepository(AgentTurnEntity) - .insert({ - threadId, - agentId, + const assistantMessageId = await this.insertMessage(entityManager, { + threadId, + turnId, + role: AgentMessageRole.ASSISTANT, + agentId, + workspaceId, + }); + + if (persistableParts.length > 0) { + const databaseParts = mapPersistablePartsToDatabaseParts( + persistableParts, + assistantMessageId, workspaceId, - }); + ); - return insertResult.identifiers[0].id as string; + await entityManager + .getRepository(AgentMessagePartEntity) + .insert( + databaseParts as QueryDeepPartialEntity[], + ); + } + + return turnId; } - private async insertUserMessage({ - entityManager, - threadId, - turnId, - workspaceId, - }: { - entityManager: EntityManager; - threadId: string; - turnId: string; - workspaceId: string; - }) { - const insertResult = await entityManager - .getRepository(AgentMessageEntity) - .insert({ - threadId, - turnId, - role: AgentMessageRole.USER, + private insertMessage( + entityManager: EntityManager, + params: { + threadId: string; + turnId: string; + role: AgentMessageRole; + agentId: string | null; + workspaceId: string; + }, + ): Promise { + return this.insertAndGetId( + entityManager.getRepository(AgentMessageEntity), + { + threadId: params.threadId, + turnId: params.turnId, + role: params.role, + agentId: params.agentId, processedAt: new Date(), - workspaceId, - }); - - return insertResult.identifiers[0].id as string; + workspaceId: params.workspaceId, + }, + ); } - private async insertAssistantMessage({ - entityManager, - threadId, - turnId, - agentId, - workspaceId, - }: { - entityManager: EntityManager; - threadId: string; - turnId: string; - agentId: string | null; - workspaceId: string; - }) { - const insertResult = await entityManager - .getRepository(AgentMessageEntity) - .insert({ - threadId, - turnId, - role: AgentMessageRole.ASSISTANT, - agentId, - processedAt: new Date(), - workspaceId, - }); + private async insertAndGetId( + repository: Repository, + payload: QueryDeepPartialEntity, + ): Promise { + const insertResult = await repository.insert(payload); return insertResult.identifiers[0].id as string; } diff --git a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapDBPartToUIMessagePart.ts b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapDBPartToUIMessagePart.ts index c3ac3eb0fe5..fdadd19e9f0 100644 --- a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapDBPartToUIMessagePart.ts +++ b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapDBPartToUIMessagePart.ts @@ -1,6 +1,7 @@ import { type ExtendedFileUIPart, type ExtendedUIMessagePart, + getToolApproval, } from 'twenty-shared/ai'; import { type AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message-part.entity'; @@ -9,36 +10,6 @@ import { type AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-a // A parallel mapping for GraphQL DTOs exists in the frontend at: // packages/twenty-front/src/modules/ai/utils/mapDBPartToUIMessagePart.ts -const getToolApproval = ( - errorDetails: Record | null, -): - | { - id: string; - approved?: boolean; - reason?: string; - } - | undefined => { - const approval = - errorDetails && - typeof errorDetails === 'object' && - 'approval' in errorDetails && - typeof errorDetails.approval === 'object' && - errorDetails.approval !== null - ? (errorDetails.approval as Record) - : null; - - if (!approval || typeof approval.id !== 'string') { - return undefined; - } - - return { - id: approval.id, - approved: - typeof approval.approved === 'boolean' ? approval.approved : undefined, - reason: typeof approval.reason === 'string' ? approval.reason : undefined, - }; -}; - export const mapDBPartToUIMessagePart = ( part: AgentMessagePartEntity, ): ExtendedUIMessagePart | null => { diff --git a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDatabaseParts.ts b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDatabaseParts.ts index 97dfe4fd86b..f4e102a4d73 100644 --- a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDatabaseParts.ts +++ b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDatabaseParts.ts @@ -11,7 +11,7 @@ const assertUnreachable = (value: never): never => { throw new Error(`Unsupported persistable part: ${JSON.stringify(value)}`); }; -export const mapPersistablePartsToDBParts = ( +export const mapPersistablePartsToDatabaseParts = ( parts: PersistableAgentMessagePart[], messageId: string, workspaceId: string, diff --git a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToDBParts.ts b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToDBParts.ts index dcdbac79fbc..b9b18a75253 100644 --- a/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToDBParts.ts +++ b/packages/twenty-server/src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToDBParts.ts @@ -1,7 +1,7 @@ import { type ExtendedUIMessagePart } from 'twenty-shared/ai'; import { type AgentMessagePartEntity } from 'src/engine/metadata-modules/ai/ai-agent-execution/entities/agent-message-part.entity'; -import { mapPersistablePartsToDBParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDBParts'; +import { mapPersistablePartsToDatabaseParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapPersistablePartsToDatabaseParts'; import { mapUIMessagePartsToPersistableParts } from 'src/engine/metadata-modules/ai/ai-agent-execution/utils/mapUIMessagePartsToPersistableParts'; export const mapUIMessagePartsToDBParts = ( @@ -9,7 +9,7 @@ export const mapUIMessagePartsToDBParts = ( messageId: string, workspaceId: string, ): Partial[] => { - return mapPersistablePartsToDBParts( + return mapPersistablePartsToDatabaseParts( mapUIMessagePartsToPersistableParts(uiMessageParts), messageId, workspaceId, diff --git a/packages/twenty-shared/src/ai/index.ts b/packages/twenty-shared/src/ai/index.ts index 5d8bfb7d164..46f0a129afa 100644 --- a/packages/twenty-shared/src/ai/index.ts +++ b/packages/twenty-shared/src/ai/index.ts @@ -38,7 +38,10 @@ export type { export type { ExtendedUIMessagePart } from './types/ExtendedUIMessagePart'; export type { ModelConfiguration } from './types/model-configuration.type'; export type { NavigateAppToolOutput } from './types/NavigateAppToolOutput'; +export type { ToolApproval } from './utils/get-tool-approval.util'; +export { getToolApproval } from './utils/get-tool-approval.util'; export { inferAiSdkPackage } from './utils/infer-ai-sdk-package.util'; export { isAgentCapabilityEnabled } from './utils/is-agent-capability-enabled.util'; export { isAiSdkPackage } from './utils/is-ai-sdk-package.util'; export { isDataResidency } from './utils/is-data-residency.util'; +export { isToolPartErrored } from './utils/is-tool-part-errored.util'; diff --git a/packages/twenty-shared/src/ai/utils/__tests__/get-tool-approval.util.spec.ts b/packages/twenty-shared/src/ai/utils/__tests__/get-tool-approval.util.spec.ts new file mode 100644 index 00000000000..1288d2aa782 --- /dev/null +++ b/packages/twenty-shared/src/ai/utils/__tests__/get-tool-approval.util.spec.ts @@ -0,0 +1,45 @@ +import { getToolApproval } from '../get-tool-approval.util'; + +describe('getToolApproval', () => { + it('returns undefined when errorDetails is missing or malformed', () => { + expect(getToolApproval(null)).toBeUndefined(); + expect(getToolApproval(undefined)).toBeUndefined(); + expect(getToolApproval('string')).toBeUndefined(); + expect(getToolApproval({})).toBeUndefined(); + expect(getToolApproval({ approval: null })).toBeUndefined(); + expect(getToolApproval({ approval: 'not-an-object' })).toBeUndefined(); + expect(getToolApproval({ approval: {} })).toBeUndefined(); + }); + + it('returns the approval with only the id when optional fields are missing', () => { + expect(getToolApproval({ approval: { id: 'approval-1' } })).toEqual({ + id: 'approval-1', + approved: undefined, + reason: undefined, + }); + }); + + it('returns all fields when present and correctly typed', () => { + expect( + getToolApproval({ + approval: { id: 'approval-1', approved: true, reason: 'Looks good' }, + }), + ).toEqual({ + id: 'approval-1', + approved: true, + reason: 'Looks good', + }); + }); + + it('omits optional fields that have the wrong runtime type', () => { + expect( + getToolApproval({ + approval: { id: 'approval-1', approved: 'yes', reason: 42 }, + }), + ).toEqual({ + id: 'approval-1', + approved: undefined, + reason: undefined, + }); + }); +}); diff --git a/packages/twenty-shared/src/ai/utils/__tests__/is-tool-part-errored.util.spec.ts b/packages/twenty-shared/src/ai/utils/__tests__/is-tool-part-errored.util.spec.ts new file mode 100644 index 00000000000..01d35b27b21 --- /dev/null +++ b/packages/twenty-shared/src/ai/utils/__tests__/is-tool-part-errored.util.spec.ts @@ -0,0 +1,16 @@ +import { isToolPartErrored } from '../is-tool-part-errored.util'; + +describe('isToolPartErrored', () => { + it('returns true for error states', () => { + expect(isToolPartErrored('output-error')).toBe(true); + expect(isToolPartErrored('output-denied')).toBe(true); + }); + + it('returns false for non-error states', () => { + expect(isToolPartErrored('input-streaming')).toBe(false); + expect(isToolPartErrored('input-available')).toBe(false); + expect(isToolPartErrored('approval-requested')).toBe(false); + expect(isToolPartErrored('approval-responded')).toBe(false); + expect(isToolPartErrored('output-available')).toBe(false); + }); +}); diff --git a/packages/twenty-shared/src/ai/utils/get-tool-approval.util.ts b/packages/twenty-shared/src/ai/utils/get-tool-approval.util.ts new file mode 100644 index 00000000000..d77d0f39a3f --- /dev/null +++ b/packages/twenty-shared/src/ai/utils/get-tool-approval.util.ts @@ -0,0 +1,29 @@ +export type ToolApproval = { + id: string; + approved?: boolean; + reason?: string; +}; + +export const getToolApproval = ( + errorDetails: unknown, +): ToolApproval | undefined => { + const approval = + errorDetails && + typeof errorDetails === 'object' && + 'approval' in errorDetails && + typeof errorDetails.approval === 'object' && + errorDetails.approval !== null + ? (errorDetails.approval as Record) + : null; + + if (!approval || typeof approval.id !== 'string') { + return undefined; + } + + return { + id: approval.id, + approved: + typeof approval.approved === 'boolean' ? approval.approved : undefined, + reason: typeof approval.reason === 'string' ? approval.reason : undefined, + }; +}; diff --git a/packages/twenty-shared/src/ai/utils/is-tool-part-errored.util.ts b/packages/twenty-shared/src/ai/utils/is-tool-part-errored.util.ts new file mode 100644 index 00000000000..6317727ee56 --- /dev/null +++ b/packages/twenty-shared/src/ai/utils/is-tool-part-errored.util.ts @@ -0,0 +1,4 @@ +import { type ToolUIPart } from 'ai'; + +export const isToolPartErrored = (state: ToolUIPart['state']): boolean => + state === 'output-error' || state === 'output-denied';