diff --git a/apps/graphql/Cargo.toml b/apps/graphql/Cargo.toml deleted file mode 100644 index 92b817793..000000000 --- a/apps/graphql/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -edition = "2021" -name = "spacedrive-graphql" -version = "0.1.0" - -[dependencies] -anyhow = "1" -async-graphql = "6" -async-graphql-axum = "6" -axum = { version = "0.7", features = ["macros"] } -bincode = "1" -serde = { version = "1", features = ["derive"] } -serde_json = "1" -tokio = { version = "1", features = ["full"] } -uuid = { version = "1", features = ["serde", "v4"] } - -sd-core = { path = "../../core" } diff --git a/apps/graphql/src/main.rs b/apps/graphql/src/main.rs deleted file mode 100644 index cf9f7ea9c..000000000 --- a/apps/graphql/src/main.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::path::PathBuf; -use std::sync::Arc; - -use anyhow::Result; -use async_graphql::{Context, EmptySubscription, Object, Schema}; -use async_graphql_axum::{GraphQL, GraphQLRequest, GraphQLResponse}; -use axum::{routing::get, Router}; - -use sd_core::client::CoreClient; - -struct AppState { - core: CoreClient, -} - -struct QueryRoot; - -#[Object] -impl QueryRoot { - async fn libraries( - &self, - ctx: &Context<'_>, - ) -> async_graphql::Result> { - let state = ctx.data::>()?; - let q = sd_core::ops::libraries::list::query::ListLibrariesQuery::basic(); - let out: Vec = state - .core - .query(&q) - .await - .map_err(|e| async_graphql::Error::new(e.to_string()))?; - Ok(out) - } - - async fn core_status( - &self, - ctx: &Context<'_>, - ) -> async_graphql::Result { - let state = ctx.data::>()?; - let q = sd_core::ops::core::status::query::CoreStatusQuery; - let out: sd_core::ops::core::status::output::CoreStatus = state - .core - .query(&q) - .await - .map_err(|e| async_graphql::Error::new(e.to_string()))?; - Ok(out) - } -} - -struct MutationRoot; - -#[Object] -impl MutationRoot { - async fn copy( - &self, - ctx: &Context<'_>, - sources: Vec, - destination: String, - ) -> async_graphql::Result { - let state = ctx.data::>()?; - let mut input = sd_core::ops::files::copy::input::FileCopyInput::default(); - input.sources = sources.into_iter().map(Into::into).collect(); - input.destination = destination.into(); - let _bytes = state - .core - .action(&input) - .await - .map_err(|e| async_graphql::Error::new(e.to_string()))?; - Ok(true) - } -} - -#[tokio::main] -async fn main() -> Result<()> { - let data_dir = sd_core::config::default_data_dir()?; - let socket = data_dir.join("daemon/daemon.sock"); - let state = Arc::new(AppState { - core: CoreClient::new(socket), - }); - - let schema = Schema::build(QueryRoot, MutationRoot, EmptySubscription) - .data(state.clone()) - .finish(); - - let app = Router::new() - .route("/graphql", get(graphiql).post(graphql_handler)) - .with_state(schema.clone()); - - axum::Server::bind(&"0.0.0.0:8080".parse().unwrap()) - .serve(app.into_make_service()) - .await?; - - Ok(()) -} - -async fn graphql_handler( - schema: axum::extract::State>, - req: GraphQLRequest, -) -> GraphQLResponse { - schema.execute(req.into_inner()).await.into() -} - -async fn graphiql() -> impl axum::response::IntoResponse { - GraphQL::playground_source( - async_graphql::http::GraphiQLSource::build() - .endpoint("/graphql") - .finish(), - ) -} diff --git a/core/src/infra/sync/FILE_ORGANIZATION.md b/core/src/infra/sync/docs/FILE_ORGANIZATION.md similarity index 100% rename from core/src/infra/sync/FILE_ORGANIZATION.md rename to core/src/infra/sync/docs/FILE_ORGANIZATION.md diff --git a/core/src/infra/sync/NETWORK_INTEGRATION_STATUS.md b/core/src/infra/sync/docs/NETWORK_INTEGRATION_STATUS.md similarity index 100% rename from core/src/infra/sync/NETWORK_INTEGRATION_STATUS.md rename to core/src/infra/sync/docs/NETWORK_INTEGRATION_STATUS.md diff --git a/core/src/infra/sync/SYNC_IMPLEMENTATION_GUIDE.md b/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md similarity index 99% rename from core/src/infra/sync/SYNC_IMPLEMENTATION_GUIDE.md rename to core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md index 2ff4b412e..80856c712 100644 --- a/core/src/infra/sync/SYNC_IMPLEMENTATION_GUIDE.md +++ b/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_GUIDE.md @@ -44,7 +44,6 @@ location::Entity::insert(active_model).exec(db).await?; - Job System (file operations that create/update entries) - Operations in `src/ops/` (all CRUD actions) - File Watcher (entry creation/updates) -- User-initiated actions (via CLI, GraphQL, UI) **Action Required**: Audit all database write operations and migrate them to use `TransactionManager`. diff --git a/core/src/infra/sync/SYNC_IMPLEMENTATION_ROADMAP.md b/core/src/infra/sync/docs/SYNC_IMPLEMENTATION_ROADMAP.md similarity index 100% rename from core/src/infra/sync/SYNC_IMPLEMENTATION_ROADMAP.md rename to core/src/infra/sync/docs/SYNC_IMPLEMENTATION_ROADMAP.md diff --git a/core/src/service/network/protocol/sync/handler.rs b/core/src/service/network/protocol/sync/handler.rs index d7899cd26..d85f3cca3 100644 --- a/core/src/service/network/protocol/sync/handler.rs +++ b/core/src/service/network/protocol/sync/handler.rs @@ -168,11 +168,31 @@ impl SyncProtocolHandler { checkpoint, batch_size, } => { - warn!("StateRequest handling not yet implemented"); - // TODO: Query local state and return StateResponse - Ok(Some(SyncMessage::Error { + debug!( + model_types = ?model_types, + device_id = ?device_id, + batch_size = batch_size, + "Processing StateRequest" + ); + + // Query local state + let records = peer_sync + .get_device_state(model_types.clone(), device_id, since, batch_size) + .await + .map_err(|e| { + NetworkingError::Protocol(format!("Failed to query device state: {}", e)) + })?; + + let has_more = records.len() >= batch_size; + let model_type = model_types.first().cloned().unwrap_or_default(); + + Ok(Some(SyncMessage::StateResponse { library_id, - message: "StateRequest not yet implemented".to_string(), + model_type, + device_id: device_id.unwrap_or(from_device), + records, + checkpoint: None, // TODO: Implement checkpoint tracking + has_more, })) } @@ -186,11 +206,31 @@ impl SyncProtocolHandler { since_hlc, limit, } => { - warn!("SharedChangeRequest handling not yet implemented"); - // TODO: Query peer log and return SharedChangeResponse - Ok(Some(SyncMessage::Error { + debug!( + since_hlc = ?since_hlc, + limit = limit, + "Processing SharedChangeRequest" + ); + + // Query peer log + let (entries, has_more) = peer_sync + .get_shared_changes(since_hlc, limit) + .await + .map_err(|e| { + NetworkingError::Protocol(format!("Failed to query shared changes: {}", e)) + })?; + + info!( + count = entries.len(), + has_more = has_more, + "Returning shared changes to requester" + ); + + Ok(Some(SyncMessage::SharedChangeResponse { library_id, - message: "SharedChangeRequest not yet implemented".to_string(), + entries, + current_state: None, // TODO: Add fallback for pruned logs + has_more, })) } @@ -224,17 +264,21 @@ impl SyncProtocolHandler { debug!( from_device = %from_device, device_id = %device_id, + peer_state_watermark = ?state_watermark, + peer_shared_watermark = ?shared_watermark, "Received heartbeat" ); - // Send heartbeat response (for now, echo back with current time) - // TODO: Get actual device ID from PeerSync and track watermarks + // Get our current watermarks + let (our_state_watermark, our_shared_watermark) = peer_sync.get_watermarks().await; + + // Send heartbeat response with our current watermarks Ok(Some(SyncMessage::Heartbeat { library_id: self.library_id, - device_id: from_device, // Use the sender's device ID for now + device_id: peer_sync.device_id(), timestamp: chrono::Utc::now(), - state_watermark: None, // TODO: Track watermarks - shared_watermark: None, + state_watermark: our_state_watermark, + shared_watermark: our_shared_watermark, })) } diff --git a/core/src/service/sync/mod.rs b/core/src/service/sync/mod.rs index 113989311..c62b96949 100644 --- a/core/src/service/sync/mod.rs +++ b/core/src/service/sync/mod.rs @@ -8,6 +8,7 @@ pub mod applier; pub mod backfill; pub mod peer; pub mod protocol_handler; +pub mod retry_queue; pub mod state; // No longer need SyncLogDb in leaderless architecture diff --git a/core/src/service/sync/peer.rs b/core/src/service/sync/peer.rs index 3ef26076a..60882656f 100644 --- a/core/src/service/sync/peer.rs +++ b/core/src/service/sync/peer.rs @@ -19,11 +19,14 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; -use tokio::sync::RwLock; +use tokio::sync::{broadcast, RwLock}; use tracing::{debug, error, info, warn}; use uuid::Uuid; -use super::state::{BufferQueue, DeviceSyncState, StateChangeMessage}; +use super::{ + retry_queue::RetryQueue, + state::{BufferQueue, DeviceSyncState, StateChangeMessage}, +}; /// Peer sync service for leaderless architecture /// @@ -56,6 +59,9 @@ pub struct PeerSync { /// Event bus event_bus: Arc, + /// Retry queue for failed messages + retry_queue: Arc, + /// Whether the service is running is_running: Arc, } @@ -86,6 +92,7 @@ impl PeerSync { hlc_generator: Arc::new(tokio::sync::Mutex::new(HLCGenerator::new(device_id))), peer_log, event_bus: library.event_bus().clone(), + retry_queue: Arc::new(RetryQueue::new()), is_running: Arc::new(AtomicBool::new(false)), }) } @@ -95,6 +102,23 @@ impl PeerSync { &self.db } + /// Get this device's ID + pub fn device_id(&self) -> Uuid { + self.device_id + } + + /// Get watermarks for heartbeat + pub async fn get_watermarks(&self) -> (Option>, Option) { + // State watermark: Would need to track last state change timestamp + // For now, return None - this would require adding timestamp tracking + let state_watermark = None; + + // Shared watermark: Get last HLC from generator + let shared_watermark = self.hlc_generator.lock().await.last(); + + (state_watermark, shared_watermark) + } + /// Start the sync service pub async fn start(&self) -> Result<()> { if self.is_running.load(Ordering::SeqCst) { @@ -110,8 +134,10 @@ impl PeerSync { self.is_running.store(true, Ordering::SeqCst); + // Start event listener for TransactionManager events + self.start_event_listener(); + // TODO: Start background tasks for: - // - Listening to network messages // - Processing buffer queue // - Pruning sync log // - Periodic peer health checks @@ -119,6 +145,342 @@ impl PeerSync { Ok(()) } + /// Start event listener for TransactionManager sync events + fn start_event_listener(&self) { + // Clone necessary fields for the spawned task + let library_id = self.library_id; + let network = self.network.clone(); + let state = self.state.clone(); + let buffer = self.buffer.clone(); + let db = self.db.clone(); + let event_bus_for_emit = self.event_bus.clone(); + let retry_queue = self.retry_queue.clone(); + let mut subscriber = self.event_bus.subscribe(); + let is_running = self.is_running.clone(); + + tokio::spawn(async move { + info!("PeerSync event listener started"); + + while is_running.load(Ordering::SeqCst) { + match subscriber.recv().await { + Ok(Event::Custom { event_type, data }) => { + match event_type.as_str() { + "sync:state_change" => { + if let Err(e) = Self::handle_state_change_event_static( + library_id, + data, + &network, + &state, + &buffer, + &retry_queue, + ) + .await + { + warn!(error = %e, "Failed to handle state change event"); + } + } + "sync:shared_change" => { + if let Err(e) = Self::handle_shared_change_event_static( + library_id, + data, + &network, + &state, + &buffer, + &retry_queue, + ) + .await + { + warn!(error = %e, "Failed to handle shared change event"); + } + } + _ => { + // Ignore other custom events + } + } + } + Ok(_) => { + // Ignore non-custom events + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + warn!( + skipped = skipped, + "Event listener lagged, some events skipped" + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + info!("Event bus closed, stopping event listener"); + break; + } + } + } + + info!("PeerSync event listener stopped"); + }); + } + + /// Handle state change event from TransactionManager (static version for spawned task) + async fn handle_state_change_event_static( + library_id: Uuid, + data: serde_json::Value, + network: &Arc, + state: &Arc>, + buffer: &Arc, + retry_queue: &Arc, + ) -> Result<()> { + let model_type: String = data + .get("model_type") + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing model_type in state_change event"))? + .to_string(); + + let record_uuid: Uuid = data + .get("record_uuid") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()) + .ok_or_else(|| anyhow::anyhow!("Missing or invalid record_uuid"))?; + + let device_id: Uuid = data + .get("device_id") + .and_then(|v| v.as_str()) + .and_then(|s| Uuid::parse_str(s).ok()) + .ok_or_else(|| anyhow::anyhow!("Missing or invalid device_id"))?; + + let data_value = data + .get("data") + .ok_or_else(|| anyhow::anyhow!("Missing data in state_change event"))? + .clone(); + + let timestamp = data + .get("timestamp") + .and_then(|v| v.as_str()) + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&chrono::Utc)) + .unwrap_or_else(Utc::now); + + let change = StateChangeMessage { + model_type, + record_uuid, + device_id, + data: data_value, + timestamp, + }; + + debug!( + model_type = %change.model_type, + record_uuid = %change.record_uuid, + "Broadcasting state change from event" + ); + + // Check if we should buffer + let current_state = *state.read().await; + if current_state.should_buffer() { + debug!("Buffering own state change during backfill"); + buffer + .push(super::state::BufferedUpdate::StateChange(change)) + .await; + return Ok(()); + } + + // Get all connected sync partners + let connected_partners = network.get_connected_sync_partners().await.map_err(|e| { + warn!(error = %e, "Failed to get connected partners"); + e + })?; + + if connected_partners.is_empty() { + debug!("No connected sync partners to broadcast to"); + return Ok(()); + } + + // Create sync message + let message = SyncMessage::StateChange { + library_id, + model_type: change.model_type.clone(), + record_uuid: change.record_uuid, + device_id: change.device_id, + data: change.data.clone(), + timestamp: Utc::now(), + }; + + debug!( + model_type = %change.model_type, + record_uuid = %change.record_uuid, + partner_count = connected_partners.len(), + "Broadcasting state change to sync partners" + ); + + // Broadcast to all partners in parallel + use futures::future::join_all; + + let send_futures: Vec<_> = connected_partners + .iter() + .map(|&partner| { + let network = network.clone(); + let msg = message.clone(); + async move { + match tokio::time::timeout( + std::time::Duration::from_secs(30), + network.send_sync_message(partner, msg), + ) + .await + { + Ok(Ok(())) => (partner, Ok(())), + Ok(Err(e)) => (partner, Err(e)), + Err(_) => (partner, Err(anyhow::anyhow!("Send timeout after 30s"))), + } + } + }) + .collect(); + + let results = join_all(send_futures).await; + + // Process results + let mut success_count = 0; + let mut error_count = 0; + + for (partner_uuid, result) in results { + match result { + Ok(()) => { + success_count += 1; + debug!(partner = %partner_uuid, "State change sent successfully"); + } + Err(e) => { + error_count += 1; + warn!( + partner = %partner_uuid, + error = %e, + "Failed to send state change to partner, enqueuing for retry" + ); + // Enqueue for retry + retry_queue.enqueue(partner_uuid, message.clone()).await; + } + } + } + + info!( + model_type = %change.model_type, + success = success_count, + errors = error_count, + "State change broadcast complete" + ); + + Ok(()) + } + + /// Handle shared change event from TransactionManager (static version for spawned task) + async fn handle_shared_change_event_static( + library_id: Uuid, + data: serde_json::Value, + network: &Arc, + state: &Arc>, + buffer: &Arc, + retry_queue: &Arc, + ) -> Result<()> { + let entry: SharedChangeEntry = serde_json::from_value( + data.get("entry") + .ok_or_else(|| anyhow::anyhow!("Missing entry in shared_change event"))? + .clone(), + ) + .map_err(|e| anyhow::anyhow!("Failed to parse SharedChangeEntry: {}", e))?; + + debug!( + hlc = %entry.hlc, + model_type = %entry.model_type, + "Broadcasting shared change from event" + ); + + // Broadcast to peers (entry is already in peer_log via TransactionManager) + let message = SyncMessage::SharedChange { + library_id, + entry: entry.clone(), + }; + + let current_state = *state.read().await; + if current_state.should_buffer() { + debug!("Buffering own shared change during backfill"); + buffer + .push(super::state::BufferedUpdate::SharedChange(entry)) + .await; + return Ok(()); + } + + // Get all connected sync partners + let connected_partners = network.get_connected_sync_partners().await.map_err(|e| { + warn!(error = %e, "Failed to get connected partners"); + e + })?; + + if connected_partners.is_empty() { + debug!("No connected sync partners to broadcast to"); + return Ok(()); + } + + debug!( + hlc = %entry.hlc, + model_type = %entry.model_type, + partner_count = connected_partners.len(), + "Broadcasting shared change to sync partners" + ); + + // Broadcast to all partners in parallel + use futures::future::join_all; + + let send_futures: Vec<_> = connected_partners + .iter() + .map(|&partner| { + let network = network.clone(); + let msg = message.clone(); + async move { + match tokio::time::timeout( + std::time::Duration::from_secs(30), + network.send_sync_message(partner, msg), + ) + .await + { + Ok(Ok(())) => (partner, Ok(())), + Ok(Err(e)) => (partner, Err(e)), + Err(_) => (partner, Err(anyhow::anyhow!("Send timeout after 30s"))), + } + } + }) + .collect(); + + let results = join_all(send_futures).await; + + // Process results + let mut success_count = 0; + let mut error_count = 0; + + for (partner_uuid, result) in results { + match result { + Ok(()) => { + success_count += 1; + debug!(partner = %partner_uuid, "Shared change sent successfully"); + } + Err(e) => { + error_count += 1; + warn!( + partner = %partner_uuid, + error = %e, + "Failed to send shared change to partner, enqueuing for retry" + ); + // Enqueue for retry + retry_queue.enqueue(partner_uuid, message.clone()).await; + } + } + } + + info!( + hlc = %entry.hlc, + model_type = %entry.model_type, + success = success_count, + errors = error_count, + "Shared change broadcast complete" + ); + + Ok(()) + } + /// Stop the sync service pub async fn stop(&self) -> Result<()> { if !self.is_running.load(Ordering::SeqCst) { @@ -226,10 +588,12 @@ impl PeerSync { warn!( partner = %partner_uuid, error = %e, - "Failed to send state change to partner" + "Failed to send state change to partner, enqueuing for retry" ); - // TODO: Enqueue for retry - // self.retry_queue.enqueue(partner_uuid, message.clone()).await; + // Enqueue for retry + self.retry_queue + .enqueue(partner_uuid, message.clone()) + .await; } } } @@ -350,10 +714,12 @@ impl PeerSync { warn!( partner = %partner_uuid, error = %e, - "Failed to send shared change to partner" + "Failed to send shared change to partner, enqueuing for retry" ); - // TODO: Enqueue for retry - // self.retry_queue.enqueue(partner_uuid, message.clone()).await; + // Enqueue for retry + self.retry_queue + .enqueue(partner_uuid, message.clone()) + .await; } } } @@ -469,7 +835,44 @@ impl PeerSync { "Shared change applied successfully" ); - // TODO: Send ACK to sender for pruning + // Send ACK to sender for pruning + let sender_device_id = entry.hlc.device_id; + let up_to_hlc = entry.hlc; + + // Don't send ACK to ourselves + if sender_device_id != self.device_id { + let ack_message = SyncMessage::AckSharedChanges { + library_id: self.library_id, + from_device: self.device_id, + up_to_hlc, + }; + + debug!( + sender = %sender_device_id, + hlc = %up_to_hlc, + "Sending ACK for shared change" + ); + + // Send ACK (don't fail the whole operation if ACK send fails) + if let Err(e) = self + .network + .send_sync_message(sender_device_id, ack_message) + .await + { + warn!( + sender = %sender_device_id, + hlc = %up_to_hlc, + error = %e, + "Failed to send ACK to sender (non-fatal)" + ); + } else { + debug!( + sender = %sender_device_id, + hlc = %up_to_hlc, + "ACK sent successfully" + ); + } + } // Emit event self.event_bus.emit(Event::Custom { @@ -506,6 +909,66 @@ impl PeerSync { Ok(()) } + /// Get device-owned state for backfill (StateRequest) + pub async fn get_device_state( + &self, + model_types: Vec, + device_id: Option, + since: Option>, + batch_size: usize, + ) -> Result> { + use crate::service::network::protocol::sync::messages::StateRecord; + + debug!( + model_types = ?model_types, + device_id = ?device_id, + since = ?since, + batch_size = batch_size, + "Querying device state for backfill" + ); + + // For now, return empty result + // TODO: Query database for each model type + // This requires the Syncable registry to have query methods + warn!("get_device_state not fully implemented - returning empty result"); + + Ok(Vec::new()) + } + + /// Get shared changes from peer log (SharedChangeRequest) + pub async fn get_shared_changes( + &self, + since_hlc: Option, + limit: usize, + ) -> Result<(Vec, bool)> { + debug!( + since_hlc = ?since_hlc, + limit = limit, + "Querying shared changes from peer log" + ); + + // Query peer log (get all since HLC, then limit in memory) + let mut entries = self + .peer_log + .get_since(since_hlc) + .await + .map_err(|e| anyhow::anyhow!("Failed to query peer log: {}", e))?; + + // Check if there are more entries beyond the limit + let has_more = entries.len() > limit; + + // Truncate to limit + entries.truncate(limit); + + info!( + count = entries.len(), + has_more = has_more, + "Retrieved shared changes from peer log" + ); + + Ok((entries, has_more)) + } + /// Transition to ready state (after backfill) pub async fn transition_to_ready(&self) -> Result<()> { let current_state = self.state().await; diff --git a/core/src/service/sync/retry_queue.rs b/core/src/service/sync/retry_queue.rs new file mode 100644 index 000000000..276147b56 --- /dev/null +++ b/core/src/service/sync/retry_queue.rs @@ -0,0 +1,140 @@ +//! Retry queue for failed sync messages +//! +//! Handles automatic retry of failed message sends with exponential backoff. + +use crate::service::network::protocol::sync::messages::SyncMessage; +use chrono::{DateTime, Duration, Utc}; +use std::collections::VecDeque; +use tokio::sync::RwLock; +use uuid::Uuid; + +/// Maximum number of retry attempts before giving up +const MAX_RETRIES: u32 = 5; + +/// Initial retry delay in seconds +const INITIAL_DELAY_SECS: i64 = 5; + +/// Entry in the retry queue +#[derive(Debug, Clone)] +struct RetryEntry { + /// Target device to send to + target_device: Uuid, + + /// Message to send + message: SyncMessage, + + /// Number of attempts made + attempts: u32, + + /// Next retry time + next_retry: DateTime, +} + +/// Retry queue for failed message sends +pub struct RetryQueue { + queue: RwLock>, +} + +impl RetryQueue { + /// Create a new empty retry queue + pub fn new() -> Self { + Self { + queue: RwLock::new(VecDeque::new()), + } + } + + /// Enqueue a message for retry + pub async fn enqueue(&self, target_device: Uuid, message: SyncMessage) { + let entry = RetryEntry { + target_device, + message, + attempts: 0, + next_retry: Utc::now() + Duration::seconds(INITIAL_DELAY_SECS), + }; + + self.queue.write().await.push_back(entry); + } + + /// Get messages that are ready for retry + pub async fn get_ready(&self) -> Vec<(Uuid, SyncMessage)> { + let now = Utc::now(); + let mut queue = self.queue.write().await; + let mut ready = Vec::new(); + let mut to_requeue = Vec::new(); + + // Process all entries + while let Some(mut entry) = queue.pop_front() { + if entry.next_retry <= now { + // This entry is ready for retry + entry.attempts += 1; + + if entry.attempts >= MAX_RETRIES { + // Max retries reached, drop it + tracing::warn!( + target_device = %entry.target_device, + attempts = entry.attempts, + "Max retries reached, dropping message" + ); + continue; + } + + // Add to ready list + ready.push((entry.target_device, entry.message.clone())); + + // Calculate exponential backoff delay + let delay_secs = INITIAL_DELAY_SECS * (2_i64.pow(entry.attempts)); + entry.next_retry = Utc::now() + Duration::seconds(delay_secs); + + // Re-queue for next attempt + to_requeue.push(entry); + } else { + // Not ready yet, put it back + to_requeue.push(entry); + } + } + + // Put back entries that aren't ready yet + for entry in to_requeue { + queue.push_back(entry); + } + + ready + } + + /// Get current queue size + pub async fn len(&self) -> usize { + self.queue.read().await.len() + } + + /// Check if queue is empty + pub async fn is_empty(&self) -> bool { + self.queue.read().await.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_retry_queue() { + let queue = RetryQueue::new(); + let device_id = Uuid::new_v4(); + let message = SyncMessage::Error { + library_id: Uuid::new_v4(), + message: "test".to_string(), + }; + + // Enqueue a message + queue.enqueue(device_id, message.clone()).await; + assert_eq!(queue.len().await, 1); + + // Should not be ready immediately + let ready = queue.get_ready().await; + assert_eq!(ready.len(), 0); + + // Still in queue + assert_eq!(queue.len().await, 1); + } +} +