Files
spacedrive/adapters/gmail/sync.py
Jamie Pine c02e3404b1 data
2026-03-26 12:19:56 -07:00

499 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Gmail adapter for Spacedrive.
Syncs threads, messages, labels, and attachments from Gmail via the Gmail API.
Uses only Python stdlib (no pip dependencies).
Protocol: reads JSON config from stdin, writes JSONL operations to stdout.
Incremental sync: uses Gmail history ID as cursor. First sync fetches all
messages; subsequent syncs fetch only changes since the last history ID.
"""
import json
import sys
import time
import urllib.request
import urllib.error
import urllib.parse
import base64
import email
import email.utils
import email.header
from datetime import datetime, timezone
# ── Constants ────────────────────────────────────────────────────────────────
GMAIL_API = "https://gmail.googleapis.com/gmail/v1"
MAX_RETRIES = 3
RETRY_DELAY = 2 # seconds, doubled on each retry
BATCH_SIZE = 100 # messages per page
# ── Helpers ──────────────────────────────────────────────────────────────────
def log(level: str, message: str):
"""Emit a log operation."""
print(json.dumps({"log": level, "message": message}), flush=True)
def emit(operation: dict):
"""Emit a JSONL operation."""
print(json.dumps(operation), flush=True)
def api_get(path: str, token: str, params: dict = None) -> dict:
"""Make an authenticated GET request to the Gmail API with retries."""
url = f"{GMAIL_API}{path}"
if params:
url += "?" + urllib.parse.urlencode(params)
headers = {"Authorization": f"Bearer {token}"}
req = urllib.request.Request(url, headers=headers)
for attempt in range(MAX_RETRIES):
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
if e.code == 429 or e.code >= 500:
delay = RETRY_DELAY * (2 ** attempt)
log("warn", f"API returned {e.code}, retrying in {delay}s...")
time.sleep(delay)
continue
elif e.code == 401:
log("error", "OAuth token expired or invalid (401 Unauthorized)")
sys.exit(2)
elif e.code == 403:
log("error", f"Access denied (403 Forbidden): {e.read().decode('utf-8', errors='replace')}")
sys.exit(2)
else:
raise
except urllib.error.URLError as e:
if attempt < MAX_RETRIES - 1:
delay = RETRY_DELAY * (2 ** attempt)
log("warn", f"Network error: {e}, retrying in {delay}s...")
time.sleep(delay)
continue
raise
log("error", f"Failed after {MAX_RETRIES} retries")
sys.exit(2)
def decode_header(header_value: str) -> str:
"""Decode a MIME-encoded email header."""
if not header_value:
return ""
decoded_parts = email.header.decode_header(header_value)
result = []
for part, charset in decoded_parts:
if isinstance(part, bytes):
result.append(part.decode(charset or "utf-8", errors="replace"))
else:
result.append(part)
return " ".join(result)
def get_header(headers: list, name: str) -> str:
"""Extract a header value from Gmail's header list."""
for h in headers:
if h.get("name", "").lower() == name.lower():
return h.get("value", "")
return ""
def extract_body(payload: dict) -> str:
"""Extract the plain text body from a Gmail message payload."""
mime_type = payload.get("mimeType", "")
# Direct text/plain
if mime_type == "text/plain":
data = payload.get("body", {}).get("data", "")
if data:
return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
# Multipart — recurse
parts = payload.get("parts", [])
for part in parts:
part_mime = part.get("mimeType", "")
if part_mime == "text/plain":
data = part.get("body", {}).get("data", "")
if data:
return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
# Fallback: try text/html
if mime_type == "text/html":
data = payload.get("body", {}).get("data", "")
if data:
html = base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
# Strip HTML tags (basic)
import re
return re.sub(r"<[^>]+>", "", html).strip()
for part in parts:
body = extract_body(part)
if body:
return body
return ""
def extract_attachments(payload: dict, message_id: str) -> list:
"""Extract attachment metadata from a Gmail message payload."""
attachments = []
parts = payload.get("parts", [])
for part in parts:
filename = part.get("filename", "")
if filename:
body = part.get("body", {})
attachments.append({
"filename": filename,
"mime_type": part.get("mimeType", "application/octet-stream"),
"size": body.get("size", 0),
})
# Recurse into nested parts
attachments.extend(extract_attachments(part, message_id))
return attachments
def parse_date(date_str: str) -> str:
"""Parse an email date header into ISO 8601 format, normalized to UTC."""
if not date_str:
return datetime.now(timezone.utc).isoformat()
try:
parsed = email.utils.parsedate_to_datetime(date_str)
return parsed.astimezone(timezone.utc).isoformat()
except Exception:
return datetime.now(timezone.utc).isoformat()
# ── Sync Logic ───────────────────────────────────────────────────────────────
def sync_labels(token: str, user: str, label_filter: list = None):
"""Fetch and emit all Gmail labels."""
data = api_get(f"/users/{user}/labels", token)
labels = data.get("labels", [])
count = 0
for label in labels:
label_id = label["id"]
# Apply label filter if specified
if label_filter and label["name"] not in label_filter:
continue
# Get full label details
detail = api_get(f"/users/{user}/labels/{label_id}", token)
color_bg = ""
if "color" in detail:
color_bg = detail["color"].get("backgroundColor", "")
label_type = detail.get("type", "user").lower()
emit({
"upsert": "label",
"external_id": label_id,
"fields": {
"name": detail.get("name", label_id),
"color": color_bg,
"type": label_type,
}
})
count += 1
log("info", f"Synced {count} labels")
return count
def sync_messages_full(token: str, user: str, max_results: int, label_filter: list = None):
"""Full initial sync: fetch all messages."""
log("info", "Starting full sync...")
# Build query params
params = {"maxResults": min(BATCH_SIZE, max_results)}
if label_filter:
params["labelIds"] = ",".join(label_filter)
total_fetched = 0
threads_seen = set()
page_token = None
while total_fetched < max_results:
if page_token:
params["pageToken"] = page_token
# List messages
data = api_get(f"/users/{user}/messages", token, params)
messages = data.get("messages", [])
if not messages:
break
for msg_ref in messages:
if total_fetched >= max_results:
break
msg_id = msg_ref["id"]
# Fetch full message
msg = api_get(
f"/users/{user}/messages/{msg_id}",
token,
{"format": "full"}
)
process_message(msg, user, threads_seen)
total_fetched += 1
if total_fetched % 50 == 0:
log("info", f"Processed {total_fetched} messages...")
page_token = data.get("nextPageToken")
if not page_token:
break
log("info", f"Full sync complete: {total_fetched} messages, {len(threads_seen)} threads")
return total_fetched
def sync_messages_incremental(token: str, user: str, history_id: str, max_results: int):
"""Incremental sync: fetch changes since last history ID."""
log("info", f"Incremental sync from history ID {history_id}...")
params = {
"startHistoryId": history_id,
"maxResults": min(BATCH_SIZE, max_results),
"historyTypes": "messageAdded,messageDeleted,labelAdded,labelRemoved",
}
total_changes = 0
threads_seen = set()
page_token = None
while True:
if page_token:
params["pageToken"] = page_token
try:
data = api_get(f"/users/{user}/history", token, params)
except urllib.error.HTTPError as e:
if e.code == 404:
# History ID too old — need full sync
log("warn", "History ID expired, falling back to full sync")
return sync_messages_full(token, user, max_results)
raise
history = data.get("history", [])
for record in history:
# Messages added
for added in record.get("messagesAdded", []):
msg_ref = added.get("message", {})
msg_id = msg_ref.get("id")
if msg_id:
msg = api_get(
f"/users/{user}/messages/{msg_id}",
token,
{"format": "full"}
)
process_message(msg, user, threads_seen)
total_changes += 1
# Messages deleted
for deleted in record.get("messagesDeleted", []):
msg_ref = deleted.get("message", {})
msg_id = msg_ref.get("id")
if msg_id:
emit({"delete": "message", "external_id": msg_id})
total_changes += 1
# Label changes — re-fetch the message to update links
for label_change in record.get("labelsAdded", []) + record.get("labelsRemoved", []):
msg_ref = label_change.get("message", {})
msg_id = msg_ref.get("id")
if msg_id:
try:
msg = api_get(
f"/users/{user}/messages/{msg_id}",
token,
{"format": "metadata", "metadataHeaders": ""}
)
# Re-link labels
label_ids = msg.get("labelIds", [])
for label_id in label_ids:
emit({
"link": "message",
"id": msg_id,
"to": "label",
"to_id": label_id,
})
except Exception:
pass # message may have been deleted
page_token = data.get("nextPageToken")
if not page_token:
break
new_history_id = data.get("historyId", history_id)
log("info", f"Incremental sync complete: {total_changes} changes")
return total_changes, new_history_id
def process_message(msg: dict, user: str, threads_seen: set):
"""Process a single Gmail message: emit thread (if new), message, attachments, and label links."""
msg_id = msg["id"]
thread_id = msg.get("threadId", msg_id)
payload = msg.get("payload", {})
headers = payload.get("headers", [])
# Extract message fields
subject = decode_header(get_header(headers, "Subject"))
from_addr = get_header(headers, "From")
to_addr = get_header(headers, "To")
cc_addr = get_header(headers, "Cc")
date_str = get_header(headers, "Date")
date_iso = parse_date(date_str)
body = extract_body(payload)
# Truncate very long bodies for storage
if len(body) > 50000:
body = body[:50000] + "..."
label_ids = msg.get("labelIds", [])
is_read = "UNREAD" not in label_ids
is_starred = "STARRED" in label_ids
# Emit thread (upsert — first message wins for subject, subsequent updates are fine)
if thread_id not in threads_seen:
threads_seen.add(thread_id)
snippet = msg.get("snippet", "")
# The thread subject is typically the first message's subject
emit({
"upsert": "thread",
"external_id": thread_id,
"fields": {
"subject": subject,
"last_date": date_iso,
"message_count": 1,
"snippet": snippet,
}
})
# Emit message
emit({
"upsert": "message",
"external_id": msg_id,
"fields": {
"subject": subject,
"body": body,
"from": from_addr,
"to": to_addr,
"cc": cc_addr,
"date": date_iso,
"is_read": is_read,
"is_starred": is_starred,
"thread_id": thread_id,
}
})
# Emit attachments
attachments = extract_attachments(payload, msg_id)
for i, att in enumerate(attachments):
att_id = f"{msg_id}_att_{i}"
emit({
"upsert": "attachment",
"external_id": att_id,
"fields": {
"filename": att["filename"],
"mime_type": att["mime_type"],
"size": att["size"],
"message_id": msg_id,
}
})
# Link message to labels
for label_id in label_ids:
emit({
"link": "message",
"id": msg_id,
"to": "label",
"to_id": label_id,
})
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
# Read input from Spacedrive
try:
input_data = json.loads(sys.stdin.read())
except json.JSONDecodeError as e:
log("error", f"Invalid input JSON: {e}")
sys.exit(2)
config = input_data.get("config", {})
cursor = input_data.get("cursor")
# Required config
oauth_token = config.get("oauth_token")
if not oauth_token:
log("error", "Missing required config: oauth_token")
sys.exit(2)
user_email = config.get("email", "me")
max_results = int(config.get("max_results", 500))
# Optional label filter
labels_str = config.get("labels", "")
label_filter = [l.strip() for l in labels_str.split(",") if l.strip()] if labels_str else None
# Use "me" as the Gmail user (authenticated user)
user = "me"
try:
# Step 1: Sync labels (always full)
sync_labels(oauth_token, user, label_filter)
# Step 2: Sync messages
if cursor:
# Incremental sync
total, new_history_id = sync_messages_incremental(
oauth_token, user, cursor, max_results
)
# Emit new cursor
emit({"cursor": str(new_history_id)})
else:
# Full sync — get current profile for history ID
profile = api_get(f"/users/{user}/profile", oauth_token)
current_history_id = profile.get("historyId", "")
total = sync_messages_full(
oauth_token, user, max_results, label_filter
)
# Set cursor to current history ID for next incremental sync
if current_history_id:
emit({"cursor": str(current_history_id)})
log("info", f"Sync complete: {total} messages processed")
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else str(e)
log("error", f"Gmail API error {e.code}: {body}")
sys.exit(1) # Partial failure — some records may have been written
except Exception as e:
log("error", f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()