From 149a2e8d2cd12bdb62c680e555199f715ffecaeb Mon Sep 17 00:00:00 2001 From: Ericson Fogo Soares Date: Thu, 30 Jun 2022 00:04:37 -0300 Subject: [PATCH] Removing tokio::spawn_blocking from identifier job * Some small optimizations to await many queries concurrently --- core/src/file/cas/identifier.rs | 248 +++++++++++++++++--------------- 1 file changed, 131 insertions(+), 117 deletions(-) diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index de9a54483..855352385 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -8,11 +8,11 @@ use crate::{ CoreContext, }; use chrono::{DateTime, FixedOffset}; -use futures::executor::block_on; -use log::info; +use futures::future::join_all; +use log::{error, info}; use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::error::Error; use std::path::{Path, PathBuf}; use tokio::{fs, io}; @@ -50,138 +50,154 @@ impl Job for FileIdentifierJob { // update job with total task count based on orphan file_paths count ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]); - let db = ctx.core_ctx.database.clone(); - // dedicated tokio thread for task - let _ctx = tokio::task::spawn_blocking(move || { - let mut completed: usize = 0; - let mut cursor: i32 = 1; - // loop until task count is complete - while completed < task_count { - // link file_path ids to a CreateFile struct containing unique file data - let mut chunk: HashMap = HashMap::new(); - let mut cas_lookup: HashMap = HashMap::new(); + let mut completed: usize = 0; + let mut cursor: i32 = 1; + // loop until task count is complete + while completed < task_count { + // link file_path ids to a CreateFile struct containing unique file data + let mut chunk: HashMap = HashMap::new(); + let mut cas_lookup: HashMap = HashMap::new(); - // get chunk of orphans to process - let file_paths = match block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)) { - Ok(file_paths) => file_paths, + // get chunk of orphans to process + let file_paths = match get_orphan_file_paths(&ctx.core_ctx, cursor).await { + Ok(file_paths) => file_paths, + Err(e) => { + info!("Error getting orphan file paths: {:#?}", e); + continue; + } + }; + info!( + "Processing {:?} orphan files. ({} completed of {})", + file_paths.len(), + completed, + task_count + ); + + // analyze each file_path + for file_path in &file_paths { + // get the cas_id and extract metadata + match prepare_file(&location_path, file_path).await { + Ok(file) => { + let cas_id = file.cas_id.clone(); + // create entry into chunks for created file data + chunk.insert(file_path.id, file); + cas_lookup.insert(cas_id, file_path.id); + } Err(e) => { - info!("Error getting orphan file paths: {:#?}", e); + info!("Error processing file: {:#?}", e); continue; } }; - info!( - "Processing {:?} orphan files. ({} completed of {})", - file_paths.len(), - completed, - task_count - ); + } - // analyze each file_path - for file_path in file_paths.iter() { - // get the cas_id and extract metadata - match block_on(prepare_file(&location_path, file_path)) { - Ok(file) => { - let cas_id = file.cas_id.clone(); - // create entry into chunks for created file data - chunk.insert(file_path.id, file); - cas_lookup.insert(cas_id, file_path.id); - } - Err(e) => { - info!("Error processing file: {:#?}", e); - continue; - } - }; + // find all existing files by cas id + let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); + let existing_files = ctx + .core_ctx + .database + .file() + .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) + .exec() + .await?; + + info!("Found {} existing files", existing_files.len()); + + // link those existing files to their file paths + // Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker + let prisma_file_path = ctx.core_ctx.database.file_path(); + for result in join_all(existing_files.iter().map(|file| { + prisma_file_path + .find_unique(file_path::id::equals( + *cas_lookup.get(&file.cas_id).unwrap(), + )) + .update(vec![file_path::file_id::set(Some(file.id))]) + .exec() + })) + .await + { + if let Err(e) = result { + error!("Error linking file: {:#?}", e); } + } - // find all existing files by cas id - let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); - let existing_files: Vec = block_on( - db.file() - .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) - .exec(), - ) - .unwrap(); - info!("Found {} existing files", existing_files.len()); + let existing_files_cas_ids = existing_files + .iter() + .map(|file| file.cas_id.clone()) + .collect::>(); - // link those existing files to their file paths - for file in existing_files.iter() { - let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); - block_on( - db.file_path() - .find_unique(file_path::id::equals(*file_path_id)) - .update(vec![file_path::file_id::set(Some(file.id))]) - .exec(), - ) - .unwrap(); - } + // extract files that don't already exist in the database + let new_files = chunk + .iter() + .map(|(_id, create_file)| create_file) + .filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id)) + .collect::>(); - // extract files that don't already exist in the database - let new_files: Vec<&CreateFile> = chunk - .iter() - .map(|(_, c)| c) - .filter(|c| !existing_files.iter().any(|d| d.cas_id == c.cas_id)) - .collect(); + // assemble prisma values for new unique files + let mut values: Vec = Vec::new(); + for file in &new_files { + values.extend([ + PrismaValue::String(file.cas_id.clone()), + PrismaValue::Int(file.size_in_bytes), + PrismaValue::DateTime(file.date_created), + ]); + } - // assemble prisma values for new unique files - let mut values: Vec = Vec::new(); - for file in new_files.iter() { - values.extend([ - PrismaValue::String(file.cas_id.clone()), - PrismaValue::Int(file.size_in_bytes), - PrismaValue::DateTime(file.date_created), - ]); - } - - // create new file records with assembled values - let created_files: Vec = block_on(db._query_raw(Raw::new( + // create new file records with assembled values + let created_files: Vec = ctx + .core_ctx + .database + ._query_raw(Raw::new( &format!( "INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {} ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", vec!["({}, {}, {})"; new_files.len()].join(",") ), values, - ))) + )) + .await .unwrap_or_else(|e| { - info!("Error inserting files: {:#?}", e); + error!("Error inserting files: {:#?}", e); Vec::new() }); + // This code is duplicates, is this right? + for result in join_all(created_files.iter().map(|file| { // associate newly created files with their respective file_paths - for file in created_files.iter() { - // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk - // - insert many could work, but I couldn't find a good way to do this in a single SQL query - let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); - block_on( - db.file_path() - .find_unique(file_path::id::equals(*file_path_id)) - .update(vec![file_path::file_id::set(Some(file.id))]) - .exec(), - ) - .unwrap(); + // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk + // - insert many could work, but I couldn't find a good way to do this in a single SQL query + prisma_file_path + .find_unique(file_path::id::equals( + *cas_lookup.get(&file.cas_id).unwrap(), + )) + .update(vec![file_path::file_id::set(Some(file.id))]) + .exec() + })) + .await + { + if let Err(e) = result { + error!("Error linking file: {:#?}", e); } - - // handle loop end - let last_row = match file_paths.last() { - Some(l) => l, - None => { - break; - } - }; - cursor = last_row.id; - completed += 1; - - ctx.progress(vec![ - JobReportUpdate::CompletedTaskCount(completed), - JobReportUpdate::Message(format!( - "Processed {} of {} orphan files", - completed * CHUNK_SIZE, - total_count - )), - ]); } - ctx - }) - .await?; + + // handle loop end + let last_row = match file_paths.last() { + Some(l) => l, + None => { + break; + } + }; + cursor = last_row.id; + completed += 1; + + ctx.progress(vec![ + JobReportUpdate::CompletedTaskCount(completed), + JobReportUpdate::Message(format!( + "Processed {} of {} orphan files", + completed * CHUNK_SIZE, + total_count + )), + ]); + } // let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; Ok(()) @@ -197,8 +213,7 @@ pub async fn count_orphan_file_paths( ctx: &CoreContext, location_id: i64, ) -> Result { - let db = &ctx.database; - let files_count = db + let files_count = ctx.database ._query_raw::(raw!( "SELECT COUNT(*) AS count FROM file_paths WHERE file_id IS NULL AND is_dir IS FALSE AND location_id = {}", PrismaValue::Int(location_id) @@ -211,12 +226,11 @@ pub async fn get_orphan_file_paths( ctx: &CoreContext, cursor: i32, ) -> Result, FileError> { - let db = &ctx.database; info!( "discovering {} orphan file paths at cursor: {:?}", CHUNK_SIZE, cursor ); - let files = db + ctx.database .file_path() .find_many(vec![ file_path::file_id::equals(None), @@ -226,8 +240,8 @@ pub async fn get_orphan_file_paths( .cursor(file_path::id::cursor(cursor)) .take(CHUNK_SIZE as i64) .exec() - .await?; - Ok(files) + .await + .map_err(|e| e.into()) } #[derive(Deserialize, Serialize, Debug)]