mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-04-30 19:33:30 -04:00
Remove GraphQL application and related configuration files; add comprehensive documentation for sync system file organization and network integration status. Implement retry queue for failed sync messages with exponential backoff.
This commit is contained in:
@@ -1,17 +0,0 @@
|
||||
[package]
|
||||
edition = "2021"
|
||||
name = "spacedrive-graphql"
|
||||
version = "0.1.0"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
async-graphql = "6"
|
||||
async-graphql-axum = "6"
|
||||
axum = { version = "0.7", features = ["macros"] }
|
||||
bincode = "1"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
uuid = { version = "1", features = ["serde", "v4"] }
|
||||
|
||||
sd-core = { path = "../../core" }
|
||||
@@ -1,107 +0,0 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_graphql::{Context, EmptySubscription, Object, Schema};
|
||||
use async_graphql_axum::{GraphQL, GraphQLRequest, GraphQLResponse};
|
||||
use axum::{routing::get, Router};
|
||||
|
||||
use sd_core::client::CoreClient;
|
||||
|
||||
struct AppState {
|
||||
core: CoreClient,
|
||||
}
|
||||
|
||||
struct QueryRoot;
|
||||
|
||||
#[Object]
|
||||
impl QueryRoot {
|
||||
async fn libraries(
|
||||
&self,
|
||||
ctx: &Context<'_>,
|
||||
) -> async_graphql::Result<Vec<sd_core::ops::libraries::list::output::LibraryInfo>> {
|
||||
let state = ctx.data::<Arc<AppState>>()?;
|
||||
let q = sd_core::ops::libraries::list::query::ListLibrariesQuery::basic();
|
||||
let out: Vec<sd_core::ops::libraries::list::output::LibraryInfo> = state
|
||||
.core
|
||||
.query(&q)
|
||||
.await
|
||||
.map_err(|e| async_graphql::Error::new(e.to_string()))?;
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
async fn core_status(
|
||||
&self,
|
||||
ctx: &Context<'_>,
|
||||
) -> async_graphql::Result<sd_core::ops::core::status::output::CoreStatus> {
|
||||
let state = ctx.data::<Arc<AppState>>()?;
|
||||
let q = sd_core::ops::core::status::query::CoreStatusQuery;
|
||||
let out: sd_core::ops::core::status::output::CoreStatus = state
|
||||
.core
|
||||
.query(&q)
|
||||
.await
|
||||
.map_err(|e| async_graphql::Error::new(e.to_string()))?;
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
struct MutationRoot;
|
||||
|
||||
#[Object]
|
||||
impl MutationRoot {
|
||||
async fn copy(
|
||||
&self,
|
||||
ctx: &Context<'_>,
|
||||
sources: Vec<String>,
|
||||
destination: String,
|
||||
) -> async_graphql::Result<bool> {
|
||||
let state = ctx.data::<Arc<AppState>>()?;
|
||||
let mut input = sd_core::ops::files::copy::input::FileCopyInput::default();
|
||||
input.sources = sources.into_iter().map(Into::into).collect();
|
||||
input.destination = destination.into();
|
||||
let _bytes = state
|
||||
.core
|
||||
.action(&input)
|
||||
.await
|
||||
.map_err(|e| async_graphql::Error::new(e.to_string()))?;
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let data_dir = sd_core::config::default_data_dir()?;
|
||||
let socket = data_dir.join("daemon/daemon.sock");
|
||||
let state = Arc::new(AppState {
|
||||
core: CoreClient::new(socket),
|
||||
});
|
||||
|
||||
let schema = Schema::build(QueryRoot, MutationRoot, EmptySubscription)
|
||||
.data(state.clone())
|
||||
.finish();
|
||||
|
||||
let app = Router::new()
|
||||
.route("/graphql", get(graphiql).post(graphql_handler))
|
||||
.with_state(schema.clone());
|
||||
|
||||
axum::Server::bind(&"0.0.0.0:8080".parse().unwrap())
|
||||
.serve(app.into_make_service())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn graphql_handler(
|
||||
schema: axum::extract::State<Schema<QueryRoot, MutationRoot, EmptySubscription>>,
|
||||
req: GraphQLRequest,
|
||||
) -> GraphQLResponse {
|
||||
schema.execute(req.into_inner()).await.into()
|
||||
}
|
||||
|
||||
async fn graphiql() -> impl axum::response::IntoResponse {
|
||||
GraphQL::playground_source(
|
||||
async_graphql::http::GraphiQLSource::build()
|
||||
.endpoint("/graphql")
|
||||
.finish(),
|
||||
)
|
||||
}
|
||||
@@ -44,7 +44,6 @@ location::Entity::insert(active_model).exec(db).await?;
|
||||
- Job System (file operations that create/update entries)
|
||||
- Operations in `src/ops/` (all CRUD actions)
|
||||
- File Watcher (entry creation/updates)
|
||||
- User-initiated actions (via CLI, GraphQL, UI)
|
||||
|
||||
**Action Required**: Audit all database write operations and migrate them to use `TransactionManager`.
|
||||
|
||||
@@ -168,11 +168,31 @@ impl SyncProtocolHandler {
|
||||
checkpoint,
|
||||
batch_size,
|
||||
} => {
|
||||
warn!("StateRequest handling not yet implemented");
|
||||
// TODO: Query local state and return StateResponse
|
||||
Ok(Some(SyncMessage::Error {
|
||||
debug!(
|
||||
model_types = ?model_types,
|
||||
device_id = ?device_id,
|
||||
batch_size = batch_size,
|
||||
"Processing StateRequest"
|
||||
);
|
||||
|
||||
// Query local state
|
||||
let records = peer_sync
|
||||
.get_device_state(model_types.clone(), device_id, since, batch_size)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
NetworkingError::Protocol(format!("Failed to query device state: {}", e))
|
||||
})?;
|
||||
|
||||
let has_more = records.len() >= batch_size;
|
||||
let model_type = model_types.first().cloned().unwrap_or_default();
|
||||
|
||||
Ok(Some(SyncMessage::StateResponse {
|
||||
library_id,
|
||||
message: "StateRequest not yet implemented".to_string(),
|
||||
model_type,
|
||||
device_id: device_id.unwrap_or(from_device),
|
||||
records,
|
||||
checkpoint: None, // TODO: Implement checkpoint tracking
|
||||
has_more,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -186,11 +206,31 @@ impl SyncProtocolHandler {
|
||||
since_hlc,
|
||||
limit,
|
||||
} => {
|
||||
warn!("SharedChangeRequest handling not yet implemented");
|
||||
// TODO: Query peer log and return SharedChangeResponse
|
||||
Ok(Some(SyncMessage::Error {
|
||||
debug!(
|
||||
since_hlc = ?since_hlc,
|
||||
limit = limit,
|
||||
"Processing SharedChangeRequest"
|
||||
);
|
||||
|
||||
// Query peer log
|
||||
let (entries, has_more) = peer_sync
|
||||
.get_shared_changes(since_hlc, limit)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
NetworkingError::Protocol(format!("Failed to query shared changes: {}", e))
|
||||
})?;
|
||||
|
||||
info!(
|
||||
count = entries.len(),
|
||||
has_more = has_more,
|
||||
"Returning shared changes to requester"
|
||||
);
|
||||
|
||||
Ok(Some(SyncMessage::SharedChangeResponse {
|
||||
library_id,
|
||||
message: "SharedChangeRequest not yet implemented".to_string(),
|
||||
entries,
|
||||
current_state: None, // TODO: Add fallback for pruned logs
|
||||
has_more,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -224,17 +264,21 @@ impl SyncProtocolHandler {
|
||||
debug!(
|
||||
from_device = %from_device,
|
||||
device_id = %device_id,
|
||||
peer_state_watermark = ?state_watermark,
|
||||
peer_shared_watermark = ?shared_watermark,
|
||||
"Received heartbeat"
|
||||
);
|
||||
|
||||
// Send heartbeat response (for now, echo back with current time)
|
||||
// TODO: Get actual device ID from PeerSync and track watermarks
|
||||
// Get our current watermarks
|
||||
let (our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await;
|
||||
|
||||
// Send heartbeat response with our current watermarks
|
||||
Ok(Some(SyncMessage::Heartbeat {
|
||||
library_id: self.library_id,
|
||||
device_id: from_device, // Use the sender's device ID for now
|
||||
device_id: peer_sync.device_id(),
|
||||
timestamp: chrono::Utc::now(),
|
||||
state_watermark: None, // TODO: Track watermarks
|
||||
shared_watermark: None,
|
||||
state_watermark: our_state_watermark,
|
||||
shared_watermark: our_shared_watermark,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ pub mod applier;
|
||||
pub mod backfill;
|
||||
pub mod peer;
|
||||
pub mod protocol_handler;
|
||||
pub mod retry_queue;
|
||||
pub mod state;
|
||||
|
||||
// No longer need SyncLogDb in leaderless architecture
|
||||
|
||||
@@ -19,11 +19,14 @@ use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{broadcast, RwLock};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::state::{BufferQueue, DeviceSyncState, StateChangeMessage};
|
||||
use super::{
|
||||
retry_queue::RetryQueue,
|
||||
state::{BufferQueue, DeviceSyncState, StateChangeMessage},
|
||||
};
|
||||
|
||||
/// Peer sync service for leaderless architecture
|
||||
///
|
||||
@@ -56,6 +59,9 @@ pub struct PeerSync {
|
||||
/// Event bus
|
||||
event_bus: Arc<EventBus>,
|
||||
|
||||
/// Retry queue for failed messages
|
||||
retry_queue: Arc<RetryQueue>,
|
||||
|
||||
/// Whether the service is running
|
||||
is_running: Arc<AtomicBool>,
|
||||
}
|
||||
@@ -86,6 +92,7 @@ impl PeerSync {
|
||||
hlc_generator: Arc::new(tokio::sync::Mutex::new(HLCGenerator::new(device_id))),
|
||||
peer_log,
|
||||
event_bus: library.event_bus().clone(),
|
||||
retry_queue: Arc::new(RetryQueue::new()),
|
||||
is_running: Arc::new(AtomicBool::new(false)),
|
||||
})
|
||||
}
|
||||
@@ -95,6 +102,23 @@ impl PeerSync {
|
||||
&self.db
|
||||
}
|
||||
|
||||
/// Get this device's ID
|
||||
pub fn device_id(&self) -> Uuid {
|
||||
self.device_id
|
||||
}
|
||||
|
||||
/// Get watermarks for heartbeat
|
||||
pub async fn get_watermarks(&self) -> (Option<chrono::DateTime<chrono::Utc>>, Option<HLC>) {
|
||||
// State watermark: Would need to track last state change timestamp
|
||||
// For now, return None - this would require adding timestamp tracking
|
||||
let state_watermark = None;
|
||||
|
||||
// Shared watermark: Get last HLC from generator
|
||||
let shared_watermark = self.hlc_generator.lock().await.last();
|
||||
|
||||
(state_watermark, shared_watermark)
|
||||
}
|
||||
|
||||
/// Start the sync service
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
if self.is_running.load(Ordering::SeqCst) {
|
||||
@@ -110,8 +134,10 @@ impl PeerSync {
|
||||
|
||||
self.is_running.store(true, Ordering::SeqCst);
|
||||
|
||||
// Start event listener for TransactionManager events
|
||||
self.start_event_listener();
|
||||
|
||||
// TODO: Start background tasks for:
|
||||
// - Listening to network messages
|
||||
// - Processing buffer queue
|
||||
// - Pruning sync log
|
||||
// - Periodic peer health checks
|
||||
@@ -119,6 +145,342 @@ impl PeerSync {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start event listener for TransactionManager sync events
|
||||
fn start_event_listener(&self) {
|
||||
// Clone necessary fields for the spawned task
|
||||
let library_id = self.library_id;
|
||||
let network = self.network.clone();
|
||||
let state = self.state.clone();
|
||||
let buffer = self.buffer.clone();
|
||||
let db = self.db.clone();
|
||||
let event_bus_for_emit = self.event_bus.clone();
|
||||
let retry_queue = self.retry_queue.clone();
|
||||
let mut subscriber = self.event_bus.subscribe();
|
||||
let is_running = self.is_running.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
info!("PeerSync event listener started");
|
||||
|
||||
while is_running.load(Ordering::SeqCst) {
|
||||
match subscriber.recv().await {
|
||||
Ok(Event::Custom { event_type, data }) => {
|
||||
match event_type.as_str() {
|
||||
"sync:state_change" => {
|
||||
if let Err(e) = Self::handle_state_change_event_static(
|
||||
library_id,
|
||||
data,
|
||||
&network,
|
||||
&state,
|
||||
&buffer,
|
||||
&retry_queue,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(error = %e, "Failed to handle state change event");
|
||||
}
|
||||
}
|
||||
"sync:shared_change" => {
|
||||
if let Err(e) = Self::handle_shared_change_event_static(
|
||||
library_id,
|
||||
data,
|
||||
&network,
|
||||
&state,
|
||||
&buffer,
|
||||
&retry_queue,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(error = %e, "Failed to handle shared change event");
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// Ignore other custom events
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
// Ignore non-custom events
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
|
||||
warn!(
|
||||
skipped = skipped,
|
||||
"Event listener lagged, some events skipped"
|
||||
);
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
info!("Event bus closed, stopping event listener");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("PeerSync event listener stopped");
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle state change event from TransactionManager (static version for spawned task)
|
||||
async fn handle_state_change_event_static(
|
||||
library_id: Uuid,
|
||||
data: serde_json::Value,
|
||||
network: &Arc<dyn NetworkTransport>,
|
||||
state: &Arc<RwLock<DeviceSyncState>>,
|
||||
buffer: &Arc<BufferQueue>,
|
||||
retry_queue: &Arc<RetryQueue>,
|
||||
) -> Result<()> {
|
||||
let model_type: String = data
|
||||
.get("model_type")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing model_type in state_change event"))?
|
||||
.to_string();
|
||||
|
||||
let record_uuid: Uuid = data
|
||||
.get("record_uuid")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Uuid::parse_str(s).ok())
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing or invalid record_uuid"))?;
|
||||
|
||||
let device_id: Uuid = data
|
||||
.get("device_id")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| Uuid::parse_str(s).ok())
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing or invalid device_id"))?;
|
||||
|
||||
let data_value = data
|
||||
.get("data")
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing data in state_change event"))?
|
||||
.clone();
|
||||
|
||||
let timestamp = data
|
||||
.get("timestamp")
|
||||
.and_then(|v| v.as_str())
|
||||
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
|
||||
.map(|dt| dt.with_timezone(&chrono::Utc))
|
||||
.unwrap_or_else(Utc::now);
|
||||
|
||||
let change = StateChangeMessage {
|
||||
model_type,
|
||||
record_uuid,
|
||||
device_id,
|
||||
data: data_value,
|
||||
timestamp,
|
||||
};
|
||||
|
||||
debug!(
|
||||
model_type = %change.model_type,
|
||||
record_uuid = %change.record_uuid,
|
||||
"Broadcasting state change from event"
|
||||
);
|
||||
|
||||
// Check if we should buffer
|
||||
let current_state = *state.read().await;
|
||||
if current_state.should_buffer() {
|
||||
debug!("Buffering own state change during backfill");
|
||||
buffer
|
||||
.push(super::state::BufferedUpdate::StateChange(change))
|
||||
.await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Get all connected sync partners
|
||||
let connected_partners = network.get_connected_sync_partners().await.map_err(|e| {
|
||||
warn!(error = %e, "Failed to get connected partners");
|
||||
e
|
||||
})?;
|
||||
|
||||
if connected_partners.is_empty() {
|
||||
debug!("No connected sync partners to broadcast to");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Create sync message
|
||||
let message = SyncMessage::StateChange {
|
||||
library_id,
|
||||
model_type: change.model_type.clone(),
|
||||
record_uuid: change.record_uuid,
|
||||
device_id: change.device_id,
|
||||
data: change.data.clone(),
|
||||
timestamp: Utc::now(),
|
||||
};
|
||||
|
||||
debug!(
|
||||
model_type = %change.model_type,
|
||||
record_uuid = %change.record_uuid,
|
||||
partner_count = connected_partners.len(),
|
||||
"Broadcasting state change to sync partners"
|
||||
);
|
||||
|
||||
// Broadcast to all partners in parallel
|
||||
use futures::future::join_all;
|
||||
|
||||
let send_futures: Vec<_> = connected_partners
|
||||
.iter()
|
||||
.map(|&partner| {
|
||||
let network = network.clone();
|
||||
let msg = message.clone();
|
||||
async move {
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
network.send_sync_message(partner, msg),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(())) => (partner, Ok(())),
|
||||
Ok(Err(e)) => (partner, Err(e)),
|
||||
Err(_) => (partner, Err(anyhow::anyhow!("Send timeout after 30s"))),
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let results = join_all(send_futures).await;
|
||||
|
||||
// Process results
|
||||
let mut success_count = 0;
|
||||
let mut error_count = 0;
|
||||
|
||||
for (partner_uuid, result) in results {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
success_count += 1;
|
||||
debug!(partner = %partner_uuid, "State change sent successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
error_count += 1;
|
||||
warn!(
|
||||
partner = %partner_uuid,
|
||||
error = %e,
|
||||
"Failed to send state change to partner, enqueuing for retry"
|
||||
);
|
||||
// Enqueue for retry
|
||||
retry_queue.enqueue(partner_uuid, message.clone()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
model_type = %change.model_type,
|
||||
success = success_count,
|
||||
errors = error_count,
|
||||
"State change broadcast complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle shared change event from TransactionManager (static version for spawned task)
|
||||
async fn handle_shared_change_event_static(
|
||||
library_id: Uuid,
|
||||
data: serde_json::Value,
|
||||
network: &Arc<dyn NetworkTransport>,
|
||||
state: &Arc<RwLock<DeviceSyncState>>,
|
||||
buffer: &Arc<BufferQueue>,
|
||||
retry_queue: &Arc<RetryQueue>,
|
||||
) -> Result<()> {
|
||||
let entry: SharedChangeEntry = serde_json::from_value(
|
||||
data.get("entry")
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing entry in shared_change event"))?
|
||||
.clone(),
|
||||
)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to parse SharedChangeEntry: {}", e))?;
|
||||
|
||||
debug!(
|
||||
hlc = %entry.hlc,
|
||||
model_type = %entry.model_type,
|
||||
"Broadcasting shared change from event"
|
||||
);
|
||||
|
||||
// Broadcast to peers (entry is already in peer_log via TransactionManager)
|
||||
let message = SyncMessage::SharedChange {
|
||||
library_id,
|
||||
entry: entry.clone(),
|
||||
};
|
||||
|
||||
let current_state = *state.read().await;
|
||||
if current_state.should_buffer() {
|
||||
debug!("Buffering own shared change during backfill");
|
||||
buffer
|
||||
.push(super::state::BufferedUpdate::SharedChange(entry))
|
||||
.await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Get all connected sync partners
|
||||
let connected_partners = network.get_connected_sync_partners().await.map_err(|e| {
|
||||
warn!(error = %e, "Failed to get connected partners");
|
||||
e
|
||||
})?;
|
||||
|
||||
if connected_partners.is_empty() {
|
||||
debug!("No connected sync partners to broadcast to");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
debug!(
|
||||
hlc = %entry.hlc,
|
||||
model_type = %entry.model_type,
|
||||
partner_count = connected_partners.len(),
|
||||
"Broadcasting shared change to sync partners"
|
||||
);
|
||||
|
||||
// Broadcast to all partners in parallel
|
||||
use futures::future::join_all;
|
||||
|
||||
let send_futures: Vec<_> = connected_partners
|
||||
.iter()
|
||||
.map(|&partner| {
|
||||
let network = network.clone();
|
||||
let msg = message.clone();
|
||||
async move {
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_secs(30),
|
||||
network.send_sync_message(partner, msg),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(())) => (partner, Ok(())),
|
||||
Ok(Err(e)) => (partner, Err(e)),
|
||||
Err(_) => (partner, Err(anyhow::anyhow!("Send timeout after 30s"))),
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
let results = join_all(send_futures).await;
|
||||
|
||||
// Process results
|
||||
let mut success_count = 0;
|
||||
let mut error_count = 0;
|
||||
|
||||
for (partner_uuid, result) in results {
|
||||
match result {
|
||||
Ok(()) => {
|
||||
success_count += 1;
|
||||
debug!(partner = %partner_uuid, "Shared change sent successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
error_count += 1;
|
||||
warn!(
|
||||
partner = %partner_uuid,
|
||||
error = %e,
|
||||
"Failed to send shared change to partner, enqueuing for retry"
|
||||
);
|
||||
// Enqueue for retry
|
||||
retry_queue.enqueue(partner_uuid, message.clone()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
hlc = %entry.hlc,
|
||||
model_type = %entry.model_type,
|
||||
success = success_count,
|
||||
errors = error_count,
|
||||
"Shared change broadcast complete"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Stop the sync service
|
||||
pub async fn stop(&self) -> Result<()> {
|
||||
if !self.is_running.load(Ordering::SeqCst) {
|
||||
@@ -226,10 +588,12 @@ impl PeerSync {
|
||||
warn!(
|
||||
partner = %partner_uuid,
|
||||
error = %e,
|
||||
"Failed to send state change to partner"
|
||||
"Failed to send state change to partner, enqueuing for retry"
|
||||
);
|
||||
// TODO: Enqueue for retry
|
||||
// self.retry_queue.enqueue(partner_uuid, message.clone()).await;
|
||||
// Enqueue for retry
|
||||
self.retry_queue
|
||||
.enqueue(partner_uuid, message.clone())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -350,10 +714,12 @@ impl PeerSync {
|
||||
warn!(
|
||||
partner = %partner_uuid,
|
||||
error = %e,
|
||||
"Failed to send shared change to partner"
|
||||
"Failed to send shared change to partner, enqueuing for retry"
|
||||
);
|
||||
// TODO: Enqueue for retry
|
||||
// self.retry_queue.enqueue(partner_uuid, message.clone()).await;
|
||||
// Enqueue for retry
|
||||
self.retry_queue
|
||||
.enqueue(partner_uuid, message.clone())
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -469,7 +835,44 @@ impl PeerSync {
|
||||
"Shared change applied successfully"
|
||||
);
|
||||
|
||||
// TODO: Send ACK to sender for pruning
|
||||
// Send ACK to sender for pruning
|
||||
let sender_device_id = entry.hlc.device_id;
|
||||
let up_to_hlc = entry.hlc;
|
||||
|
||||
// Don't send ACK to ourselves
|
||||
if sender_device_id != self.device_id {
|
||||
let ack_message = SyncMessage::AckSharedChanges {
|
||||
library_id: self.library_id,
|
||||
from_device: self.device_id,
|
||||
up_to_hlc,
|
||||
};
|
||||
|
||||
debug!(
|
||||
sender = %sender_device_id,
|
||||
hlc = %up_to_hlc,
|
||||
"Sending ACK for shared change"
|
||||
);
|
||||
|
||||
// Send ACK (don't fail the whole operation if ACK send fails)
|
||||
if let Err(e) = self
|
||||
.network
|
||||
.send_sync_message(sender_device_id, ack_message)
|
||||
.await
|
||||
{
|
||||
warn!(
|
||||
sender = %sender_device_id,
|
||||
hlc = %up_to_hlc,
|
||||
error = %e,
|
||||
"Failed to send ACK to sender (non-fatal)"
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
sender = %sender_device_id,
|
||||
hlc = %up_to_hlc,
|
||||
"ACK sent successfully"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Emit event
|
||||
self.event_bus.emit(Event::Custom {
|
||||
@@ -506,6 +909,66 @@ impl PeerSync {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get device-owned state for backfill (StateRequest)
|
||||
pub async fn get_device_state(
|
||||
&self,
|
||||
model_types: Vec<String>,
|
||||
device_id: Option<Uuid>,
|
||||
since: Option<chrono::DateTime<chrono::Utc>>,
|
||||
batch_size: usize,
|
||||
) -> Result<Vec<crate::service::network::protocol::sync::messages::StateRecord>> {
|
||||
use crate::service::network::protocol::sync::messages::StateRecord;
|
||||
|
||||
debug!(
|
||||
model_types = ?model_types,
|
||||
device_id = ?device_id,
|
||||
since = ?since,
|
||||
batch_size = batch_size,
|
||||
"Querying device state for backfill"
|
||||
);
|
||||
|
||||
// For now, return empty result
|
||||
// TODO: Query database for each model type
|
||||
// This requires the Syncable registry to have query methods
|
||||
warn!("get_device_state not fully implemented - returning empty result");
|
||||
|
||||
Ok(Vec::new())
|
||||
}
|
||||
|
||||
/// Get shared changes from peer log (SharedChangeRequest)
|
||||
pub async fn get_shared_changes(
|
||||
&self,
|
||||
since_hlc: Option<HLC>,
|
||||
limit: usize,
|
||||
) -> Result<(Vec<SharedChangeEntry>, bool)> {
|
||||
debug!(
|
||||
since_hlc = ?since_hlc,
|
||||
limit = limit,
|
||||
"Querying shared changes from peer log"
|
||||
);
|
||||
|
||||
// Query peer log (get all since HLC, then limit in memory)
|
||||
let mut entries = self
|
||||
.peer_log
|
||||
.get_since(since_hlc)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to query peer log: {}", e))?;
|
||||
|
||||
// Check if there are more entries beyond the limit
|
||||
let has_more = entries.len() > limit;
|
||||
|
||||
// Truncate to limit
|
||||
entries.truncate(limit);
|
||||
|
||||
info!(
|
||||
count = entries.len(),
|
||||
has_more = has_more,
|
||||
"Retrieved shared changes from peer log"
|
||||
);
|
||||
|
||||
Ok((entries, has_more))
|
||||
}
|
||||
|
||||
/// Transition to ready state (after backfill)
|
||||
pub async fn transition_to_ready(&self) -> Result<()> {
|
||||
let current_state = self.state().await;
|
||||
|
||||
140
core/src/service/sync/retry_queue.rs
Normal file
140
core/src/service/sync/retry_queue.rs
Normal file
@@ -0,0 +1,140 @@
|
||||
//! Retry queue for failed sync messages
|
||||
//!
|
||||
//! Handles automatic retry of failed message sends with exponential backoff.
|
||||
|
||||
use crate::service::network::protocol::sync::messages::SyncMessage;
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use std::collections::VecDeque;
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Maximum number of retry attempts before giving up
|
||||
const MAX_RETRIES: u32 = 5;
|
||||
|
||||
/// Initial retry delay in seconds
|
||||
const INITIAL_DELAY_SECS: i64 = 5;
|
||||
|
||||
/// Entry in the retry queue
|
||||
#[derive(Debug, Clone)]
|
||||
struct RetryEntry {
|
||||
/// Target device to send to
|
||||
target_device: Uuid,
|
||||
|
||||
/// Message to send
|
||||
message: SyncMessage,
|
||||
|
||||
/// Number of attempts made
|
||||
attempts: u32,
|
||||
|
||||
/// Next retry time
|
||||
next_retry: DateTime<Utc>,
|
||||
}
|
||||
|
||||
/// Retry queue for failed message sends
|
||||
pub struct RetryQueue {
|
||||
queue: RwLock<VecDeque<RetryEntry>>,
|
||||
}
|
||||
|
||||
impl RetryQueue {
|
||||
/// Create a new empty retry queue
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
queue: RwLock::new(VecDeque::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Enqueue a message for retry
|
||||
pub async fn enqueue(&self, target_device: Uuid, message: SyncMessage) {
|
||||
let entry = RetryEntry {
|
||||
target_device,
|
||||
message,
|
||||
attempts: 0,
|
||||
next_retry: Utc::now() + Duration::seconds(INITIAL_DELAY_SECS),
|
||||
};
|
||||
|
||||
self.queue.write().await.push_back(entry);
|
||||
}
|
||||
|
||||
/// Get messages that are ready for retry
|
||||
pub async fn get_ready(&self) -> Vec<(Uuid, SyncMessage)> {
|
||||
let now = Utc::now();
|
||||
let mut queue = self.queue.write().await;
|
||||
let mut ready = Vec::new();
|
||||
let mut to_requeue = Vec::new();
|
||||
|
||||
// Process all entries
|
||||
while let Some(mut entry) = queue.pop_front() {
|
||||
if entry.next_retry <= now {
|
||||
// This entry is ready for retry
|
||||
entry.attempts += 1;
|
||||
|
||||
if entry.attempts >= MAX_RETRIES {
|
||||
// Max retries reached, drop it
|
||||
tracing::warn!(
|
||||
target_device = %entry.target_device,
|
||||
attempts = entry.attempts,
|
||||
"Max retries reached, dropping message"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Add to ready list
|
||||
ready.push((entry.target_device, entry.message.clone()));
|
||||
|
||||
// Calculate exponential backoff delay
|
||||
let delay_secs = INITIAL_DELAY_SECS * (2_i64.pow(entry.attempts));
|
||||
entry.next_retry = Utc::now() + Duration::seconds(delay_secs);
|
||||
|
||||
// Re-queue for next attempt
|
||||
to_requeue.push(entry);
|
||||
} else {
|
||||
// Not ready yet, put it back
|
||||
to_requeue.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
// Put back entries that aren't ready yet
|
||||
for entry in to_requeue {
|
||||
queue.push_back(entry);
|
||||
}
|
||||
|
||||
ready
|
||||
}
|
||||
|
||||
/// Get current queue size
|
||||
pub async fn len(&self) -> usize {
|
||||
self.queue.read().await.len()
|
||||
}
|
||||
|
||||
/// Check if queue is empty
|
||||
pub async fn is_empty(&self) -> bool {
|
||||
self.queue.read().await.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_retry_queue() {
|
||||
let queue = RetryQueue::new();
|
||||
let device_id = Uuid::new_v4();
|
||||
let message = SyncMessage::Error {
|
||||
library_id: Uuid::new_v4(),
|
||||
message: "test".to_string(),
|
||||
};
|
||||
|
||||
// Enqueue a message
|
||||
queue.enqueue(device_id, message.clone()).await;
|
||||
assert_eq!(queue.len().await, 1);
|
||||
|
||||
// Should not be ready immediately
|
||||
let ready = queue.get_ready().await;
|
||||
assert_eq!(ready.len(), 0);
|
||||
|
||||
// Still in queue
|
||||
assert_eq!(queue.len().await, 1);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user