diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index fdb78c060..e262ce2c2 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -5,7 +5,7 @@ use crate::{ }; use chrono::{DateTime, FixedOffset}; -use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction}; +use prisma_client_rust::{prisma_models::PrismaValue, raw::Raw, Direction}; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, @@ -67,7 +67,7 @@ impl StatefulJob for FileIdentifierJob { ctx: WorkerContext, state: &mut JobState, ) -> JobResult { - info!("Identifying orphan file paths..."); + info!("Identifying orphan Paths..."); let library = ctx.library_ctx(); @@ -94,10 +94,11 @@ impl StatefulJob for FileIdentifierJob { .exec() .await? as usize; - info!("Found {} orphan file paths", total_count); - let task_count = (total_count as f64 / CHUNK_SIZE as f64).ceil() as usize; - info!("Will process {} tasks", task_count); + info!( + "Found {} orphan Paths. Will execute {} tasks...", + total_count, task_count + ); // update job with total task count based on orphan file_paths count ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]); @@ -123,7 +124,6 @@ impl StatefulJob for FileIdentifierJob { }); state.steps = (0..task_count).map(|_| ()).collect(); - Ok(()) } @@ -139,19 +139,21 @@ impl StatefulJob for FileIdentifierJob { let data = state .data .as_mut() - .expect("critical error: missing data on job state"); + .expect("Critical error: missing data on job state"); // get chunk of orphans to process let file_paths = - match get_orphan_file_paths(&ctx.library_ctx(), &data.cursor, data.location.id).await { - Ok(file_paths) => file_paths, - Err(e) => { - info!("Error getting orphan file paths: {:#?}", e); - return Ok(()); - } - }; + get_orphan_file_paths(&ctx.library_ctx(), &data.cursor, data.location.id).await?; + + // if no file paths found, abort entire job early + if file_paths.is_empty() { + return Err(JobError::JobDataNotFound( + "Expected orphan Paths not returned from database query for this chunk".to_string(), + )); + } + info!( - "Processing {:?} orphan files. ({} completed of {})", + "Processing {:?} orphan Paths. ({} completed of {})", file_paths.len(), state.step_number, data.task_count @@ -218,74 +220,68 @@ impl StatefulJob for FileIdentifierJob { .filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id)) .collect::>(); - if new_files.is_empty() { - error!("This shouldn't happen?"); - return Ok(()); - } + if !new_files.is_empty() { + // assemble prisma values for new unique files + let mut values = Vec::with_capacity(new_files.len() * 3); + for file in &new_files { + values.extend([ + PrismaValue::String(file.cas_id.clone()), + PrismaValue::Int(file.size_in_bytes), + PrismaValue::DateTime(file.date_created), + ]); + } - // assemble prisma values for new unique files - let mut values = Vec::with_capacity(new_files.len() * 3); - for file in &new_files { - values.extend([ - PrismaValue::String(file.cas_id.clone()), - PrismaValue::Int(file.size_in_bytes), - PrismaValue::DateTime(file.date_created), - ]); - } - - // create new file records with assembled values - let created_files: Vec = ctx - .library_ctx() - .db - ._query_raw(Raw::new( - &format!( - "INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {} - ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", - vec!["({}, {}, {})"; new_files.len()].join(",") - ), - values, - )) - .exec() - .await - .unwrap_or_else(|e| { - error!("Error inserting files: {:#?}", e); - Vec::new() - }); - - for created_file in created_files { - // associate newly created files with their respective file_paths - // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk - // - insert many could work, but I couldn't find a good way to do this in a single SQL query - if let Err(e) = ctx + // create new file records with assembled values + let created_files: Vec = ctx .library_ctx() .db - .file_path() - .update( - file_path::location_id_id( - state.init.location.id, - *cas_lookup.get(&created_file.cas_id).unwrap(), + ._query_raw(Raw::new( + &format!( + "INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {} + ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", + vec!["({}, {}, {})"; new_files.len()].join(",") ), - vec![file_path::file_id::set(Some(created_file.id))], - ) + values, + )) .exec() .await - { - info!("Error updating file_id: {:#?}", e); + .unwrap_or_else(|e| { + error!("Error inserting files: {:#?}", e); + Vec::new() + }); + + for created_file in created_files { + // associate newly created files with their respective file_paths + // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk + // - insert many could work, but I couldn't find a good way to do this in a single SQL query + if let Err(e) = ctx + .library_ctx() + .db + .file_path() + .update( + file_path::location_id_id( + state.init.location.id, + *cas_lookup.get(&created_file.cas_id).unwrap(), + ), + vec![file_path::file_id::set(Some(created_file.id))], + ) + .exec() + .await + { + info!("Error updating file_id: {:#?}", e); + } } } // set the step data cursor to the last row of this chunk if let Some(last_row) = file_paths.last() { data.cursor.file_path_id = last_row.id; - } else { - return Ok(()); } - // } ctx.progress(vec![ JobReportUpdate::CompletedTaskCount(state.step_number), JobReportUpdate::Message(format!( - "Processed {} of {} orphan files", + "Processed {} of {} orphan Paths", state.step_number * CHUNK_SIZE, data.total_count )), @@ -333,7 +329,7 @@ async fn get_orphan_file_paths( location_id: i32, ) -> Result, prisma_client_rust::QueryError> { info!( - "discovering {} orphan file paths at cursor: {:?}", + "Querying {} orphan Paths at cursor: {:?}", CHUNK_SIZE, cursor ); ctx.db diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs index 68526acad..96f47c1bb 100644 --- a/core/src/job/mod.rs +++ b/core/src/job/mod.rs @@ -34,6 +34,8 @@ pub enum JobError { MissingJobDataState(Uuid, String), #[error("Indexer error: {0}")] IndexerError(#[from] IndexerError), + #[error("Data needed for job execution not found: job ")] + JobDataNotFound(String), #[error("Job paused")] Paused(Vec), }