mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-04-18 05:27:38 -04:00
354 lines
13 KiB
Python
354 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
OpenCode adapter for Spacedrive.
|
|
|
|
Indexes coding session transcripts from OpenCode.
|
|
Reads the opencode.db SQLite database and extracts sessions with their
|
|
conversation messages, aggregating token usage, cost, and tool call metadata.
|
|
|
|
The DB is copied to a temp file first since OpenCode holds an exclusive lock
|
|
via redb (though the SQLite portion may be readable, we copy for safety).
|
|
|
|
Supports incremental sync via time_updated cursor.
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import os
|
|
import sqlite3
|
|
import shutil
|
|
import tempfile
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
def log(level: str, message: str):
|
|
print(json.dumps({"log": level, "message": message}), flush=True)
|
|
|
|
|
|
def emit(operation: dict):
|
|
print(json.dumps(operation), flush=True)
|
|
|
|
|
|
def ms_to_iso(timestamp_ms: int) -> str:
|
|
"""Convert millisecond Unix timestamp to ISO 8601."""
|
|
try:
|
|
if not timestamp_ms or timestamp_ms == 0:
|
|
return ""
|
|
dt = datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
|
|
return dt.isoformat()
|
|
except (ValueError, OSError):
|
|
return ""
|
|
|
|
|
|
def extract_text_parts(parts_data: list[dict]) -> str:
|
|
"""Extract and concatenate text content from message parts."""
|
|
texts = []
|
|
for part in parts_data:
|
|
data = part.get("data", {})
|
|
ptype = data.get("type", "")
|
|
if ptype == "text":
|
|
text = data.get("text", "").strip()
|
|
if text:
|
|
texts.append(text)
|
|
return "\n\n".join(texts)
|
|
|
|
|
|
def extract_tool_summary(parts_data: list[dict]) -> str:
|
|
"""Extract a compact summary of tool calls from message parts."""
|
|
tools = []
|
|
for part in parts_data:
|
|
data = part.get("data", {})
|
|
if data.get("type") == "tool":
|
|
tool_name = data.get("tool", "")
|
|
# Extract a brief description of what the tool did
|
|
tool_input = data.get("input", {})
|
|
if tool_name == "read":
|
|
path = tool_input.get("filePath", tool_input.get("path", ""))
|
|
if path:
|
|
tools.append(f"read:{os.path.basename(path)}")
|
|
elif tool_name == "edit":
|
|
path = tool_input.get("filePath", tool_input.get("path", ""))
|
|
if path:
|
|
tools.append(f"edit:{os.path.basename(path)}")
|
|
elif tool_name == "write":
|
|
path = tool_input.get("filePath", tool_input.get("path", ""))
|
|
if path:
|
|
tools.append(f"write:{os.path.basename(path)}")
|
|
elif tool_name == "bash":
|
|
cmd = tool_input.get("command", "")[:80]
|
|
if cmd:
|
|
tools.append(f"bash:{cmd}")
|
|
elif tool_name == "glob":
|
|
pattern = tool_input.get("pattern", "")
|
|
if pattern:
|
|
tools.append(f"glob:{pattern}")
|
|
elif tool_name == "grep":
|
|
pattern = tool_input.get("pattern", "")
|
|
if pattern:
|
|
tools.append(f"grep:{pattern}")
|
|
elif tool_name:
|
|
tools.append(tool_name)
|
|
return "; ".join(tools[:50]) # Cap at 50 tool calls
|
|
|
|
|
|
def main():
|
|
try:
|
|
input_data = json.loads(sys.stdin.read())
|
|
except json.JSONDecodeError as e:
|
|
log("error", f"Invalid input JSON: {e}")
|
|
sys.exit(2)
|
|
|
|
config = input_data.get("config", {})
|
|
cursor = input_data.get("cursor")
|
|
|
|
db_path = config.get("db_path", "~/.local/share/opencode/opencode.db")
|
|
db_path = os.path.expanduser(db_path)
|
|
include_tool_calls = config.get("include_tool_calls", False)
|
|
project_filter = config.get("project_filter", "")
|
|
|
|
if not os.path.exists(db_path):
|
|
log("error", f"OpenCode database not found: {db_path}")
|
|
sys.exit(2)
|
|
|
|
# Copy the database to avoid lock conflicts
|
|
tmp_db = tempfile.mktemp(suffix=".db")
|
|
try:
|
|
shutil.copy2(db_path, tmp_db)
|
|
for ext in ["-wal", "-shm"]:
|
|
src = db_path + ext
|
|
if os.path.exists(src):
|
|
shutil.copy2(src, tmp_db + ext)
|
|
except PermissionError:
|
|
log("error", f"Permission denied reading: {db_path}")
|
|
sys.exit(2)
|
|
except Exception as e:
|
|
log("error", f"Failed to copy database: {e}")
|
|
sys.exit(2)
|
|
|
|
try:
|
|
conn = sqlite3.connect(tmp_db)
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
# ── Fetch sessions ──────────────────────────────────────────────
|
|
conditions = ["s.title != ''"]
|
|
params = []
|
|
|
|
if cursor:
|
|
conditions.append("s.time_updated > ?")
|
|
params.append(int(cursor))
|
|
|
|
if project_filter:
|
|
conditions.append("s.directory LIKE ?")
|
|
params.append(f"%{project_filter}%")
|
|
|
|
where = " AND ".join(conditions)
|
|
sessions = conn.execute(f"""
|
|
SELECT s.id, s.title, s.directory, s.parent_id,
|
|
s.summary_files, s.summary_additions, s.summary_deletions,
|
|
s.time_created, s.time_updated,
|
|
p.name as project_name, p.worktree as project_worktree
|
|
FROM session s
|
|
LEFT JOIN project p ON s.project_id = p.id
|
|
WHERE {where}
|
|
ORDER BY s.time_updated ASC
|
|
""", params).fetchall()
|
|
|
|
max_updated = int(cursor) if cursor else 0
|
|
session_count = 0
|
|
message_count = 0
|
|
|
|
for session in sessions:
|
|
sid = session["id"]
|
|
time_updated = session["time_updated"] or 0
|
|
|
|
# ── Fetch messages for this session ─────────────────────────
|
|
messages = conn.execute("""
|
|
SELECT m.id, m.data, m.time_created, m.time_updated
|
|
FROM message m
|
|
WHERE m.session_id = ?
|
|
ORDER BY m.time_created ASC
|
|
""", (sid,)).fetchall()
|
|
|
|
if not messages:
|
|
continue
|
|
|
|
# ── Fetch parts for all messages in this session ────────────
|
|
parts_by_message = {}
|
|
parts = conn.execute("""
|
|
SELECT p.id, p.message_id, p.data, p.time_created
|
|
FROM part p
|
|
WHERE p.session_id = ?
|
|
ORDER BY p.time_created ASC
|
|
""", (sid,)).fetchall()
|
|
|
|
for part in parts:
|
|
mid = part["message_id"]
|
|
try:
|
|
part_data = json.loads(part["data"])
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if mid not in parts_by_message:
|
|
parts_by_message[mid] = []
|
|
parts_by_message[mid].append({"data": part_data})
|
|
|
|
# ── Aggregate session stats ─────────────────────────────────
|
|
total_input = 0
|
|
total_output = 0
|
|
total_cost = 0.0
|
|
models_used = set()
|
|
session_ended = session["time_created"]
|
|
summary_parts = []
|
|
summary_budget = 4000 # chars for FTS summary
|
|
|
|
for msg in messages:
|
|
try:
|
|
msg_data = json.loads(msg["data"])
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
tokens = msg_data.get("tokens", {})
|
|
total_input += (tokens.get("input", 0) or 0)
|
|
total_output += (tokens.get("output", 0) or 0)
|
|
cache = tokens.get("cache", {})
|
|
total_input += (cache.get("read", 0) or 0)
|
|
total_input += (cache.get("write", 0) or 0)
|
|
total_cost += (msg_data.get("cost", 0) or 0)
|
|
|
|
model = msg_data.get("modelID", "")
|
|
if model:
|
|
models_used.add(model)
|
|
|
|
msg_time = msg["time_created"] or msg["time_updated"] or 0
|
|
if msg_time > session_ended:
|
|
session_ended = msg_time
|
|
|
|
# Build summary from conversation text
|
|
if summary_budget > 0:
|
|
msg_parts = parts_by_message.get(msg["id"], [])
|
|
text = extract_text_parts(msg_parts)
|
|
if text:
|
|
role = msg_data.get("role", "")
|
|
# Take first N chars of each message
|
|
chunk = text[:min(800, summary_budget)]
|
|
summary_parts.append(chunk)
|
|
summary_budget -= len(chunk)
|
|
|
|
session_summary = "\n".join(summary_parts)
|
|
|
|
# Determine primary model (most common or first seen)
|
|
primary_model = next(iter(models_used), "")
|
|
# Clean model name — strip provider prefix
|
|
if "/" in primary_model:
|
|
primary_model = primary_model.split("/", 1)[1]
|
|
|
|
project_name = session["project_name"] or ""
|
|
if not project_name:
|
|
# Derive from directory
|
|
directory = session["directory"] or ""
|
|
if directory:
|
|
project_name = os.path.basename(directory.rstrip("/"))
|
|
|
|
# ── Emit session ────────────────────────────────────────────
|
|
emit({
|
|
"upsert": "session",
|
|
"external_id": sid,
|
|
"fields": {
|
|
"title": (session["title"] or "Untitled session")[:500],
|
|
"summary": session_summary,
|
|
"directory": session["directory"] or "",
|
|
"project": project_name,
|
|
"model": primary_model,
|
|
"message_count": len(messages),
|
|
"total_input_tokens": total_input,
|
|
"total_output_tokens": total_output,
|
|
"total_cost": round(total_cost, 6),
|
|
"files_changed": session["summary_files"] or 0,
|
|
"lines_added": session["summary_additions"] or 0,
|
|
"lines_deleted": session["summary_deletions"] or 0,
|
|
"started_at": ms_to_iso(session["time_created"]),
|
|
"ended_at": ms_to_iso(session_ended),
|
|
}
|
|
})
|
|
session_count += 1
|
|
|
|
# ── Emit messages ───────────────────────────────────────────
|
|
for msg in messages:
|
|
try:
|
|
msg_data = json.loads(msg["data"])
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
role = msg_data.get("role", "unknown")
|
|
msg_parts = parts_by_message.get(msg["id"], [])
|
|
|
|
# Build message body from text parts
|
|
body = extract_text_parts(msg_parts)
|
|
|
|
# Optionally include tool call summaries
|
|
tool_summary = ""
|
|
if include_tool_calls:
|
|
tool_summary = extract_tool_summary(msg_parts)
|
|
if tool_summary and body:
|
|
body = body + "\n\n[Tools: " + tool_summary + "]"
|
|
elif tool_summary:
|
|
body = "[Tools: " + tool_summary + "]"
|
|
|
|
# Skip empty messages (e.g. step-start/step-finish only)
|
|
if not body:
|
|
continue
|
|
|
|
tokens = msg_data.get("tokens", {})
|
|
input_tokens = (tokens.get("input", 0) or 0)
|
|
cache = tokens.get("cache", {})
|
|
input_tokens += (cache.get("read", 0) or 0) + (cache.get("write", 0) or 0)
|
|
output_tokens = (tokens.get("output", 0) or 0)
|
|
cost = msg_data.get("cost", 0) or 0
|
|
|
|
model = msg_data.get("modelID", "")
|
|
if "/" in model:
|
|
model = model.split("/", 1)[1]
|
|
|
|
msg_time = msg["time_created"] or msg["time_updated"]
|
|
|
|
emit({
|
|
"upsert": "message",
|
|
"external_id": msg["id"],
|
|
"fields": {
|
|
"role": role,
|
|
"body": body[:50000], # Cap at 50KB per message
|
|
"model": model,
|
|
"input_tokens": input_tokens,
|
|
"output_tokens": output_tokens,
|
|
"cost": round(cost, 6),
|
|
"tool_calls": tool_summary[:5000] if tool_summary else "",
|
|
"timestamp": ms_to_iso(msg_time),
|
|
},
|
|
"relations": {
|
|
"session": sid
|
|
}
|
|
})
|
|
message_count += 1
|
|
|
|
if time_updated > max_updated:
|
|
max_updated = time_updated
|
|
|
|
# Emit cursor
|
|
if max_updated > 0:
|
|
emit({"cursor": str(max_updated)})
|
|
|
|
log("info", f"Synced {session_count} sessions, {message_count} messages")
|
|
|
|
conn.close()
|
|
|
|
except sqlite3.Error as e:
|
|
log("error", f"SQLite error: {e}")
|
|
sys.exit(1)
|
|
finally:
|
|
for f in [tmp_db, tmp_db + "-wal", tmp_db + "-shm"]:
|
|
if os.path.exists(f):
|
|
os.unlink(f)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|