From 7cffba2fe518becf3ecf757d2b0bb7f43eb2ef76 Mon Sep 17 00:00:00 2001 From: Jamie Pine <32987599+jamiepine@users.noreply.github.com> Date: Tue, 21 Jun 2022 07:39:31 -0700 Subject: [PATCH] Identifier Exponential Lag (#269) A little refactor of the identifier job to remove memory leak and improve performance. --- core/src/file/cas/identifier.rs | 201 +++++++++++++++++++------------- 1 file changed, 119 insertions(+), 82 deletions(-) diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index e4b701283..8cebc857b 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -1,16 +1,16 @@ use std::collections::HashMap; -use std::{fs, io}; use std::path::Path; +use std::{fs, io}; use crate::job::JobReportUpdate; -use crate::prisma::file; use crate::sys::get_location; use crate::{ file::FileError, job::{Job, WorkerContext}, - prisma::file_path, + prisma::{file, file_path}, CoreContext, }; +use chrono::{DateTime, FixedOffset, Utc}; use futures::executor::block_on; use prisma_client_rust::prisma_models::PrismaValue; use prisma_client_rust::raw::Raw; @@ -30,6 +30,9 @@ pub struct FileIdentifierJob { pub path: String, } +// we break this job into chunks of 100 to improve performance +static CHUNK_SIZE: usize = 100; + #[async_trait::async_trait] impl Job for FileIdentifierJob { fn name(&self) -> &'static str { @@ -41,11 +44,9 @@ impl Job for FileIdentifierJob { let location_path = location.path.unwrap_or("".to_string()); let total_count = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; - println!("Found {} orphan file paths", total_count); - let task_count = (total_count as f64 / 100f64).ceil() as usize; - + let task_count = (total_count as f64 / CHUNK_SIZE as f64).ceil() as usize; println!("Will process {} tasks", task_count); // update job with total task count based on orphan file_paths count @@ -53,14 +54,23 @@ impl Job for FileIdentifierJob { let db = ctx.core_ctx.database.clone(); - let ctx = tokio::task::spawn_blocking(move || { + let _ctx = tokio::task::spawn_blocking(move || { let mut completed: usize = 0; let mut cursor: i32 = 1; - // map cas_id to file_path ids - let mut cas_id_lookup: HashMap = HashMap::new(); while completed < task_count { - let file_paths = block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)).unwrap(); + // link file_path ids to a CreateFile struct containing unique file data + let mut chunk: HashMap = HashMap::new(); + let mut cas_lookup: HashMap = HashMap::new(); + + // get chunk of orphans to process + let file_paths = match block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)) { + Ok(file_paths) => file_paths, + Err(e) => { + println!("Error getting orphan file paths: {}", e); + continue; + } + }; println!( "Processing {:?} orphan files. ({} completed of {})", file_paths.len(), @@ -68,15 +78,15 @@ impl Job for FileIdentifierJob { task_count ); - // raw values to be inserted into the database - let mut values: Vec = Vec::new(); - - // only rows that have a valid cas_id to be inserted + // analyze each file_path for file_path in file_paths.iter() { - match prepare_file_values(&location_path, file_path) { - Ok((cas_id, data)) => { - cas_id_lookup.insert(file_path.id, cas_id); - values.extend(data); + // get the cas_id and extract metadata + match prepare_file(&location_path, file_path) { + Ok(file) => { + let cas_id = file.cas_id.clone(); + // create entry into chunks for created file data + chunk.insert(file_path.id, file); + cas_lookup.insert(cas_id, file_path.id); } Err(e) => { println!("Error processing file: {}", e); @@ -84,80 +94,93 @@ impl Job for FileIdentifierJob { } }; } - if values.len() == 0 { - println!("No orphan files to process, finishing..."); - break; + + // find all existing files by cas id + let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); + let existing_files: Vec = block_on( + db.file() + .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) + .exec(), + ) + .unwrap(); + println!("Found {} existing files", existing_files.len()); + + // TODO: link existing files to file_paths + for file in existing_files.iter() { + let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); + block_on( + db.file_path() + .find_unique(file_path::id::equals(file_path_id.clone())) + .update(vec![file_path::file_id::set(Some(file.id.clone()))]) + .exec(), + ) + .unwrap(); } - println!("Inserting {} unique file records ({:?} values)", file_paths.len(), values.len()); - - let files: Vec = block_on(db._query_raw(Raw::new( - &format!( - "INSERT INTO files (cas_id, size_in_bytes) VALUES {} ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", - vec!["({}, {})"; file_paths.len()].join(",") - ), - values - ))).unwrap_or_else(|e| { + // extract files that don't already exist in the database + let new_files: Vec<&CreateFile> = chunk + .iter() + .map(|(_, c)| c) + .filter(|c| !existing_files.iter().any(|d| d.cas_id == c.cas_id)) + .collect(); + + // assemble prisma values + let mut values: Vec = Vec::new(); + for file in new_files.iter() { + values.extend([ + PrismaValue::String(file.cas_id.clone()), + PrismaValue::Int(file.size_in_bytes.clone()), + PrismaValue::DateTime(file.date_created.clone()), + ]); + } + + // create new files + let created_files: Vec = block_on(db._query_raw(Raw::new( + &format!( + "INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {} + ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", + vec!["({}, {}, {})"; new_files.len()].join(",") + ), + values, + ))) + .unwrap_or_else(|e| { println!("Error inserting files: {}", e); Vec::new() }); - println!("Unique files: {:?}" , files); - - // assign unique file to file path - println!("Assigning {} unique file ids to origin file_paths", files.len()); - for (file_path_id, cas_id) in cas_id_lookup.iter() { - // get the cas id from the lookup table - let file = files.iter().find(|f| &f.cas_id == cas_id); - let file_id: i32; - if let Some(file) = file { - file_id = file.id; - } else { - let unique_file = match block_on(db.file().find_unique(file::cas_id::equals(cas_id.clone())).exec()) { - Ok(f) => match f { - Some(f) => f, - None => { - println!("Unique file does not exist, this shouldn't happen: {}", cas_id); - continue; - } - }, - Err(e) => { - println!("Error finding unique file: {}", e); - continue; - } - }; - file_id = unique_file.id; - } - - block_on( - db.file_path() - .find_unique(file_path::id::equals(file_path_id.clone())) - .update(vec![ - file_path::file_id::set(Some(file_id)) - ]) - .exec() - ).unwrap(); + // associate newly created files with their respective file_paths + for file in created_files.iter() { + // TODO: This is bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk. + // Maybe an insert many could work? not sure. + let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); + block_on( + db.file_path() + .find_unique(file_path::id::equals(file_path_id.clone())) + .update(vec![file_path::file_id::set(Some(file.id.clone()))]) + .exec(), + ) + .unwrap(); } + // handle loop end let last_row = file_paths.last().unwrap(); - cursor = last_row.id; completed += 1; + ctx.progress(vec![ - JobReportUpdate::CompletedTaskCount(completed), - JobReportUpdate::Message(format!( - "Processed {} of {} orphan files", - completed, - task_count - )), + JobReportUpdate::CompletedTaskCount(completed), + JobReportUpdate::Message(format!( + "Processed {} of {} orphan files", + completed * CHUNK_SIZE, + total_count + )), ]); } ctx }) .await?; - let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; - + // let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; Ok(()) } } @@ -195,22 +218,34 @@ pub async fn get_orphan_file_paths( ]) .order_by(file_path::id::order(Direction::Asc)) .cursor(file_path::id::cursor(cursor)) - .take(100) + .take(CHUNK_SIZE as i64) .exec() .await?; Ok(files) } -pub fn prepare_file_values( +#[derive(Deserialize, Serialize, Debug)] +pub struct CreateFile { + pub cas_id: String, + pub size_in_bytes: i64, + pub date_created: DateTime, +} + +pub fn prepare_file( location_path: &str, file_path: &file_path::Data, -) -> Result<(String, [PrismaValue; 2]), io::Error> { +) -> Result { let path = Path::new(&location_path).join(Path::new(file_path.materialized_path.as_str())); - // println!("Processing file: {:?}", path); + let metadata = fs::metadata(&path)?; + + let date_created: DateTime = metadata.created().unwrap().into(); + + let size = metadata.len(); + let cas_id = { if !file_path.is_dir { - let mut ret = generate_cas_id(path.clone(), metadata.len()).unwrap(); + let mut ret = generate_cas_id(path.clone(), size.clone()).unwrap(); ret.truncate(16); ret } else { @@ -218,7 +253,9 @@ pub fn prepare_file_values( } }; - println!("cas id for path {:?} is {:?}", path, cas_id); - - Ok((cas_id.clone(), [PrismaValue::String(cas_id), PrismaValue::Int(0)])) + Ok(CreateFile { + cas_id, + size_in_bytes: size as i64, + date_created: date_created.into(), + }) }