Files
Jamie Pine c02e3404b1 data
2026-03-26 12:19:56 -07:00

354 lines
13 KiB
Python

#!/usr/bin/env python3
"""
OpenCode adapter for Spacedrive.
Indexes coding session transcripts from OpenCode.
Reads the opencode.db SQLite database and extracts sessions with their
conversation messages, aggregating token usage, cost, and tool call metadata.
The DB is copied to a temp file first since OpenCode holds an exclusive lock
via redb (though the SQLite portion may be readable, we copy for safety).
Supports incremental sync via time_updated cursor.
"""
import json
import sys
import os
import sqlite3
import shutil
import tempfile
from datetime import datetime, timezone
def log(level: str, message: str):
print(json.dumps({"log": level, "message": message}), flush=True)
def emit(operation: dict):
print(json.dumps(operation), flush=True)
def ms_to_iso(timestamp_ms: int) -> str:
"""Convert millisecond Unix timestamp to ISO 8601."""
try:
if not timestamp_ms or timestamp_ms == 0:
return ""
dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
return dt.isoformat()
except (ValueError, OSError):
return ""
def extract_text_parts(parts_data: list[dict]) -> str:
"""Extract and concatenate text content from message parts."""
texts = []
for part in parts_data:
data = part.get("data", {})
ptype = data.get("type", "")
if ptype == "text":
text = data.get("text", "").strip()
if text:
texts.append(text)
return "\n\n".join(texts)
def extract_tool_summary(parts_data: list[dict]) -> str:
"""Extract a compact summary of tool calls from message parts."""
tools = []
for part in parts_data:
data = part.get("data", {})
if data.get("type") == "tool":
tool_name = data.get("tool", "")
# Extract a brief description of what the tool did
tool_input = data.get("input", {})
if tool_name == "read":
path = tool_input.get("filePath", tool_input.get("path", ""))
if path:
tools.append(f"read:{os.path.basename(path)}")
elif tool_name == "edit":
path = tool_input.get("filePath", tool_input.get("path", ""))
if path:
tools.append(f"edit:{os.path.basename(path)}")
elif tool_name == "write":
path = tool_input.get("filePath", tool_input.get("path", ""))
if path:
tools.append(f"write:{os.path.basename(path)}")
elif tool_name == "bash":
cmd = tool_input.get("command", "")[:80]
if cmd:
tools.append(f"bash:{cmd}")
elif tool_name == "glob":
pattern = tool_input.get("pattern", "")
if pattern:
tools.append(f"glob:{pattern}")
elif tool_name == "grep":
pattern = tool_input.get("pattern", "")
if pattern:
tools.append(f"grep:{pattern}")
elif tool_name:
tools.append(tool_name)
return "; ".join(tools[:50]) # Cap at 50 tool calls
def main():
try:
input_data = json.loads(sys.stdin.read())
except json.JSONDecodeError as e:
log("error", f"Invalid input JSON: {e}")
sys.exit(2)
config = input_data.get("config", {})
cursor = input_data.get("cursor")
db_path = config.get("db_path", "~/.local/share/opencode/opencode.db")
db_path = os.path.expanduser(db_path)
include_tool_calls = config.get("include_tool_calls", False)
project_filter = config.get("project_filter", "")
if not os.path.exists(db_path):
log("error", f"OpenCode database not found: {db_path}")
sys.exit(2)
# Copy the database to avoid lock conflicts
tmp_db = tempfile.mktemp(suffix=".db")
try:
shutil.copy2(db_path, tmp_db)
for ext in ["-wal", "-shm"]:
src = db_path + ext
if os.path.exists(src):
shutil.copy2(src, tmp_db + ext)
except PermissionError:
log("error", f"Permission denied reading: {db_path}")
sys.exit(2)
except Exception as e:
log("error", f"Failed to copy database: {e}")
sys.exit(2)
try:
conn = sqlite3.connect(tmp_db)
conn.row_factory = sqlite3.Row
# ── Fetch sessions ──────────────────────────────────────────────
conditions = ["s.title != ''"]
params = []
if cursor:
conditions.append("s.time_updated > ?")
params.append(int(cursor))
if project_filter:
conditions.append("s.directory LIKE ?")
params.append(f"%{project_filter}%")
where = " AND ".join(conditions)
sessions = conn.execute(f"""
SELECT s.id, s.title, s.directory, s.parent_id,
s.summary_files, s.summary_additions, s.summary_deletions,
s.time_created, s.time_updated,
p.name as project_name, p.worktree as project_worktree
FROM session s
LEFT JOIN project p ON s.project_id = p.id
WHERE {where}
ORDER BY s.time_updated ASC
""", params).fetchall()
max_updated = int(cursor) if cursor else 0
session_count = 0
message_count = 0
for session in sessions:
sid = session["id"]
time_updated = session["time_updated"] or 0
# ── Fetch messages for this session ─────────────────────────
messages = conn.execute("""
SELECT m.id, m.data, m.time_created, m.time_updated
FROM message m
WHERE m.session_id = ?
ORDER BY m.time_created ASC
""", (sid,)).fetchall()
if not messages:
continue
# ── Fetch parts for all messages in this session ────────────
parts_by_message = {}
parts = conn.execute("""
SELECT p.id, p.message_id, p.data, p.time_created
FROM part p
WHERE p.session_id = ?
ORDER BY p.time_created ASC
""", (sid,)).fetchall()
for part in parts:
mid = part["message_id"]
try:
part_data = json.loads(part["data"])
except json.JSONDecodeError:
continue
if mid not in parts_by_message:
parts_by_message[mid] = []
parts_by_message[mid].append({"data": part_data})
# ── Aggregate session stats ─────────────────────────────────
total_input = 0
total_output = 0
total_cost = 0.0
models_used = set()
session_ended = session["time_created"]
summary_parts = []
summary_budget = 4000 # chars for FTS summary
for msg in messages:
try:
msg_data = json.loads(msg["data"])
except json.JSONDecodeError:
continue
tokens = msg_data.get("tokens", {})
total_input += (tokens.get("input", 0) or 0)
total_output += (tokens.get("output", 0) or 0)
cache = tokens.get("cache", {})
total_input += (cache.get("read", 0) or 0)
total_input += (cache.get("write", 0) or 0)
total_cost += (msg_data.get("cost", 0) or 0)
model = msg_data.get("modelID", "")
if model:
models_used.add(model)
msg_time = msg["time_created"] or msg["time_updated"] or 0
if msg_time > session_ended:
session_ended = msg_time
# Build summary from conversation text
if summary_budget > 0:
msg_parts = parts_by_message.get(msg["id"], [])
text = extract_text_parts(msg_parts)
if text:
role = msg_data.get("role", "")
# Take first N chars of each message
chunk = text[:min(800, summary_budget)]
summary_parts.append(chunk)
summary_budget -= len(chunk)
session_summary = "\n".join(summary_parts)
# Determine primary model (most common or first seen)
primary_model = next(iter(models_used), "")
# Clean model name — strip provider prefix
if "/" in primary_model:
primary_model = primary_model.split("/", 1)[1]
project_name = session["project_name"] or ""
if not project_name:
# Derive from directory
directory = session["directory"] or ""
if directory:
project_name = os.path.basename(directory.rstrip("/"))
# ── Emit session ────────────────────────────────────────────
emit({
"upsert": "session",
"external_id": sid,
"fields": {
"title": (session["title"] or "Untitled session")[:500],
"summary": session_summary,
"directory": session["directory"] or "",
"project": project_name,
"model": primary_model,
"message_count": len(messages),
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"total_cost": round(total_cost, 6),
"files_changed": session["summary_files"] or 0,
"lines_added": session["summary_additions"] or 0,
"lines_deleted": session["summary_deletions"] or 0,
"started_at": ms_to_iso(session["time_created"]),
"ended_at": ms_to_iso(session_ended),
}
})
session_count += 1
# ── Emit messages ───────────────────────────────────────────
for msg in messages:
try:
msg_data = json.loads(msg["data"])
except json.JSONDecodeError:
continue
role = msg_data.get("role", "unknown")
msg_parts = parts_by_message.get(msg["id"], [])
# Build message body from text parts
body = extract_text_parts(msg_parts)
# Optionally include tool call summaries
tool_summary = ""
if include_tool_calls:
tool_summary = extract_tool_summary(msg_parts)
if tool_summary and body:
body = body + "\n\n[Tools: " + tool_summary + "]"
elif tool_summary:
body = "[Tools: " + tool_summary + "]"
# Skip empty messages (e.g. step-start/step-finish only)
if not body:
continue
tokens = msg_data.get("tokens", {})
input_tokens = (tokens.get("input", 0) or 0)
cache = tokens.get("cache", {})
input_tokens += (cache.get("read", 0) or 0) + (cache.get("write", 0) or 0)
output_tokens = (tokens.get("output", 0) or 0)
cost = msg_data.get("cost", 0) or 0
model = msg_data.get("modelID", "")
if "/" in model:
model = model.split("/", 1)[1]
msg_time = msg["time_created"] or msg["time_updated"]
emit({
"upsert": "message",
"external_id": msg["id"],
"fields": {
"role": role,
"body": body[:50000], # Cap at 50KB per message
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": round(cost, 6),
"tool_calls": tool_summary[:5000] if tool_summary else "",
"timestamp": ms_to_iso(msg_time),
},
"relations": {
"session": sid
}
})
message_count += 1
if time_updated > max_updated:
max_updated = time_updated
# Emit cursor
if max_updated > 0:
emit({"cursor": str(max_updated)})
log("info", f"Synced {session_count} sessions, {message_count} messages")
conn.close()
except sqlite3.Error as e:
log("error", f"SQLite error: {e}")
sys.exit(1)
finally:
for f in [tmp_db, tmp_db + "-wal", tmp_db + "-shm"]:
if os.path.exists(f):
os.unlink(f)
if __name__ == "__main__":
main()