mirror of
https://github.com/CompassConnections/Compass.git
synced 2026-04-07 16:19:44 -04:00
Implement async profile extraction with caching and processing state handling
This commit is contained in:
@@ -35,11 +35,20 @@ const MAX_CONTEXT_LENGTH = 7 * 10 * 30 * 50
|
||||
const USE_CACHE = true
|
||||
const CACHE_DIR = join(tmpdir(), 'compass-llm-cache')
|
||||
const CACHE_TTL_MS = 24 * 60 * 60 * 1000 // 24 hours
|
||||
const PROCESSING_TTL_MS = 10 * 60 * 1000 // 10 minutes
|
||||
|
||||
function getCacheKey(content: string): string {
|
||||
interface ParsedBody {
|
||||
content?: string
|
||||
url?: string
|
||||
locale?: string
|
||||
}
|
||||
|
||||
function getCacheKey(parsedBody: ParsedBody): string {
|
||||
if (!USE_CACHE) return ''
|
||||
const hash = createHash('sha256')
|
||||
hash.update(content)
|
||||
// Normalize: sort keys for consistent hashing
|
||||
const normalized = JSON.stringify(parsedBody, Object.keys(parsedBody).sort())
|
||||
hash.update(normalized)
|
||||
return hash.digest('hex')
|
||||
}
|
||||
|
||||
@@ -215,6 +224,69 @@ async function setCachedResult(
|
||||
}
|
||||
}
|
||||
|
||||
async function isProcessing(cacheKey: string): Promise<boolean> {
|
||||
if (!USE_CACHE) return false
|
||||
try {
|
||||
const processingFile = join(CACHE_DIR, `${cacheKey}.processing`)
|
||||
const stats = await fs.stat(processingFile)
|
||||
// Check if processing lock is still valid (not expired)
|
||||
if (Date.now() - stats.mtime.getTime() > PROCESSING_TTL_MS) {
|
||||
// Stale processing lock, remove it
|
||||
await fs.unlink(processingFile).catch(() => {})
|
||||
return false
|
||||
}
|
||||
return true
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
async function setProcessing(cacheKey: string): Promise<void> {
|
||||
if (!USE_CACHE) return
|
||||
try {
|
||||
await fs.mkdir(CACHE_DIR, {recursive: true})
|
||||
const processingFile = join(CACHE_DIR, `${cacheKey}.processing`)
|
||||
await fs.writeFile(processingFile, Date.now().toString(), 'utf-8')
|
||||
} catch {
|
||||
// Don't throw - processing flag failure shouldn't break the main flow
|
||||
}
|
||||
}
|
||||
|
||||
async function clearProcessing(cacheKey: string): Promise<void> {
|
||||
if (!USE_CACHE) return
|
||||
try {
|
||||
const processingFile = join(CACHE_DIR, `${cacheKey}.processing`)
|
||||
await fs.unlink(processingFile)
|
||||
} catch {
|
||||
// Ignore errors
|
||||
}
|
||||
}
|
||||
|
||||
async function processAndCache(
|
||||
cacheKey: string,
|
||||
content?: string | undefined,
|
||||
url?: string | undefined,
|
||||
locale?: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
let bio: JSONContent | undefined
|
||||
if (!content) {
|
||||
bio = await fetchOnlineProfile(url)
|
||||
debug(JSON.stringify(bio, null, 2))
|
||||
content = parseJsonContentToText(bio)
|
||||
}
|
||||
const profile = await callLLM(content, locale)
|
||||
if (bio) {
|
||||
profile.bio = bio
|
||||
}
|
||||
await setCachedResult(cacheKey, profile)
|
||||
} catch (error) {
|
||||
log('Async LLM processing failed', {cacheKey, error})
|
||||
} finally {
|
||||
await clearProcessing(cacheKey)
|
||||
}
|
||||
}
|
||||
|
||||
async function callGemini(text: string) {
|
||||
const apiKey = process.env.GEMINI_API_KEY
|
||||
|
||||
@@ -311,7 +383,10 @@ async function _callClaude(text: string) {
|
||||
return outputText
|
||||
}
|
||||
|
||||
async function callLLM(content: string, locale?: string): Promise<Partial<ProfileWithoutUser>> {
|
||||
export async function callLLM(
|
||||
content: string,
|
||||
locale?: string,
|
||||
): Promise<Partial<ProfileWithoutUser>> {
|
||||
const [INTERESTS, CAUSE_AREAS, WORK_AREAS] = await Promise.all([
|
||||
getOptions('interests', locale),
|
||||
getOptions('causes', locale),
|
||||
@@ -437,19 +512,10 @@ TEXT TO ANALYZE:
|
||||
const text = EXTRACTION_PROMPT + content
|
||||
if (text.length > MAX_CONTEXT_LENGTH) {
|
||||
log('Content exceeds maximum length, will be cropped', {length: text.length})
|
||||
// throw APIErrors.badRequest('Content exceeds maximum length')
|
||||
}
|
||||
debug({text})
|
||||
|
||||
const cacheKey = getCacheKey(text)
|
||||
const cached = await getCachedResult(cacheKey)
|
||||
if (cached) {
|
||||
debug('Using cached LLM result', {cacheKey: cacheKey.substring(0, 8)})
|
||||
return cached
|
||||
}
|
||||
|
||||
const outputText = await callGemini(text)
|
||||
// const outputText = JSON.stringify({})
|
||||
|
||||
if (!outputText) {
|
||||
throw APIErrors.internalServerError('Failed to parse LLM response')
|
||||
@@ -465,8 +531,6 @@ TEXT TO ANALYZE:
|
||||
throw APIErrors.internalServerError('Failed to parse extracted data')
|
||||
}
|
||||
|
||||
await setCachedResult(cacheKey, parsed)
|
||||
|
||||
return parsed
|
||||
}
|
||||
|
||||
@@ -511,7 +575,7 @@ export const llmExtractProfileEndpoint: APIHandler<'llm-extract-profile'> = asyn
|
||||
auth,
|
||||
) => {
|
||||
const {url, locale} = parsedBody
|
||||
let content = parsedBody.content
|
||||
const content = parsedBody.content
|
||||
|
||||
log('Extracting profile from content', {
|
||||
contentLength: content?.length,
|
||||
@@ -524,22 +588,28 @@ export const llmExtractProfileEndpoint: APIHandler<'llm-extract-profile'> = asyn
|
||||
throw APIErrors.badRequest('Content and URL cannot be provided together')
|
||||
}
|
||||
|
||||
let bio
|
||||
if (!content) {
|
||||
bio = await fetchOnlineProfile(url)
|
||||
debug(JSON.stringify(bio, null, 2))
|
||||
content = parseJsonContentToText(bio)
|
||||
// Check cache based on parsedBody hash
|
||||
const cacheKey = getCacheKey(parsedBody)
|
||||
const cached = await getCachedResult(cacheKey)
|
||||
if (cached) {
|
||||
log('Returning cached profile', {cacheKey: cacheKey.substring(0, 8)})
|
||||
return {profile: cached, status: 'success'}
|
||||
}
|
||||
|
||||
const extracted = await callLLM(content, locale)
|
||||
|
||||
if (bio) {
|
||||
extracted.bio = bio
|
||||
// Check if already processing
|
||||
if (await isProcessing(cacheKey)) {
|
||||
log('Profile extraction already in progress', {cacheKey: cacheKey.substring(0, 8)})
|
||||
return {profile: {}, status: 'pending'}
|
||||
}
|
||||
|
||||
debug(JSON.stringify(bio))
|
||||
// Start processing asynchronously
|
||||
await setProcessing(cacheKey)
|
||||
|
||||
log('Profile extracted successfully', {extracted})
|
||||
// Kick off async processing (don't await)
|
||||
processAndCache(cacheKey, content, url, locale).catch((err) => {
|
||||
log('Unexpected error in async processing', {cacheKey, error: err})
|
||||
})
|
||||
|
||||
return extracted
|
||||
log('Started async profile extraction', {cacheKey: cacheKey.substring(0, 8)})
|
||||
return {profile: {}, status: 'pending'}
|
||||
}
|
||||
|
||||
@@ -1283,7 +1283,10 @@ export const API = (_apiTypeCheck = {
|
||||
locale: z.string().optional(),
|
||||
})
|
||||
.strict(),
|
||||
returns: {} as Partial<ProfileWithoutUser>,
|
||||
returns: {} as {
|
||||
profile: Partial<ProfileWithoutUser>
|
||||
status: string
|
||||
},
|
||||
summary: 'Extract profile information from text using LLM',
|
||||
tag: 'Profiles',
|
||||
},
|
||||
|
||||
@@ -102,7 +102,17 @@ export const OptionalProfileUserForm = (props: {
|
||||
...(isInputUrl ? {url: urlize(llmContent).trim()} : {content: llmContent.trim()}),
|
||||
}
|
||||
try {
|
||||
let extractedProfile = await api('llm-extract-profile', payload)
|
||||
let extractedProfile: Partial<ProfileWithoutUser> = {}
|
||||
let status: string | undefined = 'pending'
|
||||
while (status === 'pending') {
|
||||
const response = await api('llm-extract-profile', payload)
|
||||
status = response.status
|
||||
console.log(status)
|
||||
if (status === 'pending') {
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000))
|
||||
}
|
||||
extractedProfile = response.profile
|
||||
}
|
||||
extractedProfile = removeNullOrUndefinedProps(extractedProfile)
|
||||
for (const data of Object.entries(extractedProfile)) {
|
||||
const key = data[0] as keyof ProfileWithoutUser
|
||||
|
||||
Reference in New Issue
Block a user