mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-04-18 05:27:38 -04:00
499 lines
16 KiB
Python
Executable File
499 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Gmail adapter for Spacedrive.
|
|
|
|
Syncs threads, messages, labels, and attachments from Gmail via the Gmail API.
|
|
Uses only Python stdlib (no pip dependencies).
|
|
|
|
Protocol: reads JSON config from stdin, writes JSONL operations to stdout.
|
|
|
|
Incremental sync: uses Gmail history ID as cursor. First sync fetches all
|
|
messages; subsequent syncs fetch only changes since the last history ID.
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import urllib.error
|
|
import urllib.parse
|
|
import base64
|
|
import email
|
|
import email.utils
|
|
import email.header
|
|
from datetime import datetime, timezone
|
|
|
|
# ── Constants ────────────────────────────────────────────────────────────────
|
|
|
|
GMAIL_API = "https://gmail.googleapis.com/gmail/v1"
|
|
MAX_RETRIES = 3
|
|
RETRY_DELAY = 2 # seconds, doubled on each retry
|
|
BATCH_SIZE = 100 # messages per page
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def log(level: str, message: str):
|
|
"""Emit a log operation."""
|
|
print(json.dumps({"log": level, "message": message}), flush=True)
|
|
|
|
|
|
def emit(operation: dict):
|
|
"""Emit a JSONL operation."""
|
|
print(json.dumps(operation), flush=True)
|
|
|
|
|
|
def api_get(path: str, token: str, params: dict = None) -> dict:
|
|
"""Make an authenticated GET request to the Gmail API with retries."""
|
|
url = f"{GMAIL_API}{path}"
|
|
if params:
|
|
url += "?" + urllib.parse.urlencode(params)
|
|
|
|
headers = {"Authorization": f"Bearer {token}"}
|
|
req = urllib.request.Request(url, headers=headers)
|
|
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 429 or e.code >= 500:
|
|
delay = RETRY_DELAY * (2 ** attempt)
|
|
log("warn", f"API returned {e.code}, retrying in {delay}s...")
|
|
time.sleep(delay)
|
|
continue
|
|
elif e.code == 401:
|
|
log("error", "OAuth token expired or invalid (401 Unauthorized)")
|
|
sys.exit(2)
|
|
elif e.code == 403:
|
|
log("error", f"Access denied (403 Forbidden): {e.read().decode('utf-8', errors='replace')}")
|
|
sys.exit(2)
|
|
else:
|
|
raise
|
|
except urllib.error.URLError as e:
|
|
if attempt < MAX_RETRIES - 1:
|
|
delay = RETRY_DELAY * (2 ** attempt)
|
|
log("warn", f"Network error: {e}, retrying in {delay}s...")
|
|
time.sleep(delay)
|
|
continue
|
|
raise
|
|
|
|
log("error", f"Failed after {MAX_RETRIES} retries")
|
|
sys.exit(2)
|
|
|
|
|
|
def decode_header(header_value: str) -> str:
|
|
"""Decode a MIME-encoded email header."""
|
|
if not header_value:
|
|
return ""
|
|
decoded_parts = email.header.decode_header(header_value)
|
|
result = []
|
|
for part, charset in decoded_parts:
|
|
if isinstance(part, bytes):
|
|
result.append(part.decode(charset or "utf-8", errors="replace"))
|
|
else:
|
|
result.append(part)
|
|
return " ".join(result)
|
|
|
|
|
|
def get_header(headers: list, name: str) -> str:
|
|
"""Extract a header value from Gmail's header list."""
|
|
for h in headers:
|
|
if h.get("name", "").lower() == name.lower():
|
|
return h.get("value", "")
|
|
return ""
|
|
|
|
|
|
def extract_body(payload: dict) -> str:
|
|
"""Extract the plain text body from a Gmail message payload."""
|
|
mime_type = payload.get("mimeType", "")
|
|
|
|
# Direct text/plain
|
|
if mime_type == "text/plain":
|
|
data = payload.get("body", {}).get("data", "")
|
|
if data:
|
|
return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
|
|
|
|
# Multipart — recurse
|
|
parts = payload.get("parts", [])
|
|
for part in parts:
|
|
part_mime = part.get("mimeType", "")
|
|
if part_mime == "text/plain":
|
|
data = part.get("body", {}).get("data", "")
|
|
if data:
|
|
return base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
|
|
|
|
# Fallback: try text/html
|
|
if mime_type == "text/html":
|
|
data = payload.get("body", {}).get("data", "")
|
|
if data:
|
|
html = base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
|
|
# Strip HTML tags (basic)
|
|
import re
|
|
return re.sub(r"<[^>]+>", "", html).strip()
|
|
|
|
for part in parts:
|
|
body = extract_body(part)
|
|
if body:
|
|
return body
|
|
|
|
return ""
|
|
|
|
|
|
def extract_attachments(payload: dict, message_id: str) -> list:
|
|
"""Extract attachment metadata from a Gmail message payload."""
|
|
attachments = []
|
|
parts = payload.get("parts", [])
|
|
|
|
for part in parts:
|
|
filename = part.get("filename", "")
|
|
if filename:
|
|
body = part.get("body", {})
|
|
attachments.append({
|
|
"filename": filename,
|
|
"mime_type": part.get("mimeType", "application/octet-stream"),
|
|
"size": body.get("size", 0),
|
|
})
|
|
# Recurse into nested parts
|
|
attachments.extend(extract_attachments(part, message_id))
|
|
|
|
return attachments
|
|
|
|
|
|
def parse_date(date_str: str) -> str:
|
|
"""Parse an email date header into ISO 8601 format, normalized to UTC."""
|
|
if not date_str:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
try:
|
|
parsed = email.utils.parsedate_to_datetime(date_str)
|
|
return parsed.astimezone(timezone.utc).isoformat()
|
|
except Exception:
|
|
return datetime.now(timezone.utc).isoformat()
|
|
|
|
|
|
# ── Sync Logic ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def sync_labels(token: str, user: str, label_filter: list = None):
|
|
"""Fetch and emit all Gmail labels."""
|
|
data = api_get(f"/users/{user}/labels", token)
|
|
labels = data.get("labels", [])
|
|
|
|
count = 0
|
|
for label in labels:
|
|
label_id = label["id"]
|
|
|
|
# Apply label filter if specified
|
|
if label_filter and label["name"] not in label_filter:
|
|
continue
|
|
|
|
# Get full label details
|
|
detail = api_get(f"/users/{user}/labels/{label_id}", token)
|
|
|
|
color_bg = ""
|
|
if "color" in detail:
|
|
color_bg = detail["color"].get("backgroundColor", "")
|
|
|
|
label_type = detail.get("type", "user").lower()
|
|
|
|
emit({
|
|
"upsert": "label",
|
|
"external_id": label_id,
|
|
"fields": {
|
|
"name": detail.get("name", label_id),
|
|
"color": color_bg,
|
|
"type": label_type,
|
|
}
|
|
})
|
|
count += 1
|
|
|
|
log("info", f"Synced {count} labels")
|
|
return count
|
|
|
|
|
|
def sync_messages_full(token: str, user: str, max_results: int, label_filter: list = None):
|
|
"""Full initial sync: fetch all messages."""
|
|
log("info", "Starting full sync...")
|
|
|
|
# Build query params
|
|
params = {"maxResults": min(BATCH_SIZE, max_results)}
|
|
if label_filter:
|
|
params["labelIds"] = ",".join(label_filter)
|
|
|
|
total_fetched = 0
|
|
threads_seen = set()
|
|
page_token = None
|
|
|
|
while total_fetched < max_results:
|
|
if page_token:
|
|
params["pageToken"] = page_token
|
|
|
|
# List messages
|
|
data = api_get(f"/users/{user}/messages", token, params)
|
|
messages = data.get("messages", [])
|
|
|
|
if not messages:
|
|
break
|
|
|
|
for msg_ref in messages:
|
|
if total_fetched >= max_results:
|
|
break
|
|
|
|
msg_id = msg_ref["id"]
|
|
|
|
# Fetch full message
|
|
msg = api_get(
|
|
f"/users/{user}/messages/{msg_id}",
|
|
token,
|
|
{"format": "full"}
|
|
)
|
|
|
|
process_message(msg, user, threads_seen)
|
|
total_fetched += 1
|
|
|
|
if total_fetched % 50 == 0:
|
|
log("info", f"Processed {total_fetched} messages...")
|
|
|
|
page_token = data.get("nextPageToken")
|
|
if not page_token:
|
|
break
|
|
|
|
log("info", f"Full sync complete: {total_fetched} messages, {len(threads_seen)} threads")
|
|
return total_fetched
|
|
|
|
|
|
def sync_messages_incremental(token: str, user: str, history_id: str, max_results: int):
|
|
"""Incremental sync: fetch changes since last history ID."""
|
|
log("info", f"Incremental sync from history ID {history_id}...")
|
|
|
|
params = {
|
|
"startHistoryId": history_id,
|
|
"maxResults": min(BATCH_SIZE, max_results),
|
|
"historyTypes": "messageAdded,messageDeleted,labelAdded,labelRemoved",
|
|
}
|
|
|
|
total_changes = 0
|
|
threads_seen = set()
|
|
page_token = None
|
|
|
|
while True:
|
|
if page_token:
|
|
params["pageToken"] = page_token
|
|
|
|
try:
|
|
data = api_get(f"/users/{user}/history", token, params)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 404:
|
|
# History ID too old — need full sync
|
|
log("warn", "History ID expired, falling back to full sync")
|
|
return sync_messages_full(token, user, max_results)
|
|
raise
|
|
|
|
history = data.get("history", [])
|
|
|
|
for record in history:
|
|
# Messages added
|
|
for added in record.get("messagesAdded", []):
|
|
msg_ref = added.get("message", {})
|
|
msg_id = msg_ref.get("id")
|
|
if msg_id:
|
|
msg = api_get(
|
|
f"/users/{user}/messages/{msg_id}",
|
|
token,
|
|
{"format": "full"}
|
|
)
|
|
process_message(msg, user, threads_seen)
|
|
total_changes += 1
|
|
|
|
# Messages deleted
|
|
for deleted in record.get("messagesDeleted", []):
|
|
msg_ref = deleted.get("message", {})
|
|
msg_id = msg_ref.get("id")
|
|
if msg_id:
|
|
emit({"delete": "message", "external_id": msg_id})
|
|
total_changes += 1
|
|
|
|
# Label changes — re-fetch the message to update links
|
|
for label_change in record.get("labelsAdded", []) + record.get("labelsRemoved", []):
|
|
msg_ref = label_change.get("message", {})
|
|
msg_id = msg_ref.get("id")
|
|
if msg_id:
|
|
try:
|
|
msg = api_get(
|
|
f"/users/{user}/messages/{msg_id}",
|
|
token,
|
|
{"format": "metadata", "metadataHeaders": ""}
|
|
)
|
|
# Re-link labels
|
|
label_ids = msg.get("labelIds", [])
|
|
for label_id in label_ids:
|
|
emit({
|
|
"link": "message",
|
|
"id": msg_id,
|
|
"to": "label",
|
|
"to_id": label_id,
|
|
})
|
|
except Exception:
|
|
pass # message may have been deleted
|
|
|
|
page_token = data.get("nextPageToken")
|
|
if not page_token:
|
|
break
|
|
|
|
new_history_id = data.get("historyId", history_id)
|
|
log("info", f"Incremental sync complete: {total_changes} changes")
|
|
|
|
return total_changes, new_history_id
|
|
|
|
|
|
def process_message(msg: dict, user: str, threads_seen: set):
|
|
"""Process a single Gmail message: emit thread (if new), message, attachments, and label links."""
|
|
msg_id = msg["id"]
|
|
thread_id = msg.get("threadId", msg_id)
|
|
payload = msg.get("payload", {})
|
|
headers = payload.get("headers", [])
|
|
|
|
# Extract message fields
|
|
subject = decode_header(get_header(headers, "Subject"))
|
|
from_addr = get_header(headers, "From")
|
|
to_addr = get_header(headers, "To")
|
|
cc_addr = get_header(headers, "Cc")
|
|
date_str = get_header(headers, "Date")
|
|
date_iso = parse_date(date_str)
|
|
|
|
body = extract_body(payload)
|
|
# Truncate very long bodies for storage
|
|
if len(body) > 50000:
|
|
body = body[:50000] + "..."
|
|
|
|
label_ids = msg.get("labelIds", [])
|
|
is_read = "UNREAD" not in label_ids
|
|
is_starred = "STARRED" in label_ids
|
|
|
|
# Emit thread (upsert — first message wins for subject, subsequent updates are fine)
|
|
if thread_id not in threads_seen:
|
|
threads_seen.add(thread_id)
|
|
snippet = msg.get("snippet", "")
|
|
# The thread subject is typically the first message's subject
|
|
emit({
|
|
"upsert": "thread",
|
|
"external_id": thread_id,
|
|
"fields": {
|
|
"subject": subject,
|
|
"last_date": date_iso,
|
|
"message_count": 1,
|
|
"snippet": snippet,
|
|
}
|
|
})
|
|
|
|
# Emit message
|
|
emit({
|
|
"upsert": "message",
|
|
"external_id": msg_id,
|
|
"fields": {
|
|
"subject": subject,
|
|
"body": body,
|
|
"from": from_addr,
|
|
"to": to_addr,
|
|
"cc": cc_addr,
|
|
"date": date_iso,
|
|
"is_read": is_read,
|
|
"is_starred": is_starred,
|
|
"thread_id": thread_id,
|
|
}
|
|
})
|
|
|
|
# Emit attachments
|
|
attachments = extract_attachments(payload, msg_id)
|
|
for i, att in enumerate(attachments):
|
|
att_id = f"{msg_id}_att_{i}"
|
|
emit({
|
|
"upsert": "attachment",
|
|
"external_id": att_id,
|
|
"fields": {
|
|
"filename": att["filename"],
|
|
"mime_type": att["mime_type"],
|
|
"size": att["size"],
|
|
"message_id": msg_id,
|
|
}
|
|
})
|
|
|
|
# Link message to labels
|
|
for label_id in label_ids:
|
|
emit({
|
|
"link": "message",
|
|
"id": msg_id,
|
|
"to": "label",
|
|
"to_id": label_id,
|
|
})
|
|
|
|
|
|
# ── Main ─────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def main():
|
|
# Read input from Spacedrive
|
|
try:
|
|
input_data = json.loads(sys.stdin.read())
|
|
except json.JSONDecodeError as e:
|
|
log("error", f"Invalid input JSON: {e}")
|
|
sys.exit(2)
|
|
|
|
config = input_data.get("config", {})
|
|
cursor = input_data.get("cursor")
|
|
|
|
# Required config
|
|
oauth_token = config.get("oauth_token")
|
|
if not oauth_token:
|
|
log("error", "Missing required config: oauth_token")
|
|
sys.exit(2)
|
|
|
|
user_email = config.get("email", "me")
|
|
max_results = int(config.get("max_results", 500))
|
|
|
|
# Optional label filter
|
|
labels_str = config.get("labels", "")
|
|
label_filter = [l.strip() for l in labels_str.split(",") if l.strip()] if labels_str else None
|
|
|
|
# Use "me" as the Gmail user (authenticated user)
|
|
user = "me"
|
|
|
|
try:
|
|
# Step 1: Sync labels (always full)
|
|
sync_labels(oauth_token, user, label_filter)
|
|
|
|
# Step 2: Sync messages
|
|
if cursor:
|
|
# Incremental sync
|
|
total, new_history_id = sync_messages_incremental(
|
|
oauth_token, user, cursor, max_results
|
|
)
|
|
# Emit new cursor
|
|
emit({"cursor": str(new_history_id)})
|
|
else:
|
|
# Full sync — get current profile for history ID
|
|
profile = api_get(f"/users/{user}/profile", oauth_token)
|
|
current_history_id = profile.get("historyId", "")
|
|
|
|
total = sync_messages_full(
|
|
oauth_token, user, max_results, label_filter
|
|
)
|
|
|
|
# Set cursor to current history ID for next incremental sync
|
|
if current_history_id:
|
|
emit({"cursor": str(current_history_id)})
|
|
|
|
log("info", f"Sync complete: {total} messages processed")
|
|
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode("utf-8", errors="replace") if hasattr(e, "read") else str(e)
|
|
log("error", f"Gmail API error {e.code}: {body}")
|
|
sys.exit(1) # Partial failure — some records may have been written
|
|
except Exception as e:
|
|
log("error", f"Unexpected error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|