Removing tokio::spawn_blocking from identifier job

* Some small optimizations to await many queries concurrently
This commit is contained in:
Ericson Fogo Soares
2022-06-30 00:04:37 -03:00
parent db1cbf0ffe
commit 149a2e8d2c

View File

@@ -8,11 +8,11 @@ use crate::{
CoreContext,
};
use chrono::{DateTime, FixedOffset};
use futures::executor::block_on;
use log::info;
use futures::future::join_all;
use log::{error, info};
use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::path::{Path, PathBuf};
use tokio::{fs, io};
@@ -50,138 +50,154 @@ impl Job for FileIdentifierJob {
// update job with total task count based on orphan file_paths count
ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]);
let db = ctx.core_ctx.database.clone();
// dedicated tokio thread for task
let _ctx = tokio::task::spawn_blocking(move || {
let mut completed: usize = 0;
let mut cursor: i32 = 1;
// loop until task count is complete
while completed < task_count {
// link file_path ids to a CreateFile struct containing unique file data
let mut chunk: HashMap<i32, CreateFile> = HashMap::new();
let mut cas_lookup: HashMap<String, i32> = HashMap::new();
let mut completed: usize = 0;
let mut cursor: i32 = 1;
// loop until task count is complete
while completed < task_count {
// link file_path ids to a CreateFile struct containing unique file data
let mut chunk: HashMap<i32, CreateFile> = HashMap::new();
let mut cas_lookup: HashMap<String, i32> = HashMap::new();
// get chunk of orphans to process
let file_paths = match block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)) {
Ok(file_paths) => file_paths,
// get chunk of orphans to process
let file_paths = match get_orphan_file_paths(&ctx.core_ctx, cursor).await {
Ok(file_paths) => file_paths,
Err(e) => {
info!("Error getting orphan file paths: {:#?}", e);
continue;
}
};
info!(
"Processing {:?} orphan files. ({} completed of {})",
file_paths.len(),
completed,
task_count
);
// analyze each file_path
for file_path in &file_paths {
// get the cas_id and extract metadata
match prepare_file(&location_path, file_path).await {
Ok(file) => {
let cas_id = file.cas_id.clone();
// create entry into chunks for created file data
chunk.insert(file_path.id, file);
cas_lookup.insert(cas_id, file_path.id);
}
Err(e) => {
info!("Error getting orphan file paths: {:#?}", e);
info!("Error processing file: {:#?}", e);
continue;
}
};
info!(
"Processing {:?} orphan files. ({} completed of {})",
file_paths.len(),
completed,
task_count
);
}
// analyze each file_path
for file_path in file_paths.iter() {
// get the cas_id and extract metadata
match block_on(prepare_file(&location_path, file_path)) {
Ok(file) => {
let cas_id = file.cas_id.clone();
// create entry into chunks for created file data
chunk.insert(file_path.id, file);
cas_lookup.insert(cas_id, file_path.id);
}
Err(e) => {
info!("Error processing file: {:#?}", e);
continue;
}
};
// find all existing files by cas id
let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect();
let existing_files = ctx
.core_ctx
.database
.file()
.find_many(vec![file::cas_id::in_vec(generated_cas_ids)])
.exec()
.await?;
info!("Found {} existing files", existing_files.len());
// link those existing files to their file paths
// Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker
let prisma_file_path = ctx.core_ctx.database.file_path();
for result in join_all(existing_files.iter().map(|file| {
prisma_file_path
.find_unique(file_path::id::equals(
*cas_lookup.get(&file.cas_id).unwrap(),
))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec()
}))
.await
{
if let Err(e) = result {
error!("Error linking file: {:#?}", e);
}
}
// find all existing files by cas id
let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect();
let existing_files: Vec<file::Data> = block_on(
db.file()
.find_many(vec![file::cas_id::in_vec(generated_cas_ids)])
.exec(),
)
.unwrap();
info!("Found {} existing files", existing_files.len());
let existing_files_cas_ids = existing_files
.iter()
.map(|file| file.cas_id.clone())
.collect::<HashSet<_>>();
// link those existing files to their file paths
for file in existing_files.iter() {
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
block_on(
db.file_path()
.find_unique(file_path::id::equals(*file_path_id))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec(),
)
.unwrap();
}
// extract files that don't already exist in the database
let new_files = chunk
.iter()
.map(|(_id, create_file)| create_file)
.filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id))
.collect::<Vec<_>>();
// extract files that don't already exist in the database
let new_files: Vec<&CreateFile> = chunk
.iter()
.map(|(_, c)| c)
.filter(|c| !existing_files.iter().any(|d| d.cas_id == c.cas_id))
.collect();
// assemble prisma values for new unique files
let mut values: Vec<PrismaValue> = Vec::new();
for file in &new_files {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes),
PrismaValue::DateTime(file.date_created),
]);
}
// assemble prisma values for new unique files
let mut values: Vec<PrismaValue> = Vec::new();
for file in new_files.iter() {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes),
PrismaValue::DateTime(file.date_created),
]);
}
// create new file records with assembled values
let created_files: Vec<FileCreated> = block_on(db._query_raw(Raw::new(
// create new file records with assembled values
let created_files: Vec<FileCreated> = ctx
.core_ctx
.database
._query_raw(Raw::new(
&format!(
"INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {}
ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
vec!["({}, {}, {})"; new_files.len()].join(",")
),
values,
)))
))
.await
.unwrap_or_else(|e| {
info!("Error inserting files: {:#?}", e);
error!("Error inserting files: {:#?}", e);
Vec::new()
});
// This code is duplicates, is this right?
for result in join_all(created_files.iter().map(|file| {
// associate newly created files with their respective file_paths
for file in created_files.iter() {
// TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk
// - insert many could work, but I couldn't find a good way to do this in a single SQL query
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
block_on(
db.file_path()
.find_unique(file_path::id::equals(*file_path_id))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec(),
)
.unwrap();
// TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk
// - insert many could work, but I couldn't find a good way to do this in a single SQL query
prisma_file_path
.find_unique(file_path::id::equals(
*cas_lookup.get(&file.cas_id).unwrap(),
))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec()
}))
.await
{
if let Err(e) = result {
error!("Error linking file: {:#?}", e);
}
// handle loop end
let last_row = match file_paths.last() {
Some(l) => l,
None => {
break;
}
};
cursor = last_row.id;
completed += 1;
ctx.progress(vec![
JobReportUpdate::CompletedTaskCount(completed),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
completed * CHUNK_SIZE,
total_count
)),
]);
}
ctx
})
.await?;
// handle loop end
let last_row = match file_paths.last() {
Some(l) => l,
None => {
break;
}
};
cursor = last_row.id;
completed += 1;
ctx.progress(vec![
JobReportUpdate::CompletedTaskCount(completed),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
completed * CHUNK_SIZE,
total_count
)),
]);
}
// let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
Ok(())
@@ -197,8 +213,7 @@ pub async fn count_orphan_file_paths(
ctx: &CoreContext,
location_id: i64,
) -> Result<usize, FileError> {
let db = &ctx.database;
let files_count = db
let files_count = ctx.database
._query_raw::<CountRes>(raw!(
"SELECT COUNT(*) AS count FROM file_paths WHERE file_id IS NULL AND is_dir IS FALSE AND location_id = {}",
PrismaValue::Int(location_id)
@@ -211,12 +226,11 @@ pub async fn get_orphan_file_paths(
ctx: &CoreContext,
cursor: i32,
) -> Result<Vec<file_path::Data>, FileError> {
let db = &ctx.database;
info!(
"discovering {} orphan file paths at cursor: {:?}",
CHUNK_SIZE, cursor
);
let files = db
ctx.database
.file_path()
.find_many(vec![
file_path::file_id::equals(None),
@@ -226,8 +240,8 @@ pub async fn get_orphan_file_paths(
.cursor(file_path::id::cursor(cursor))
.take(CHUNK_SIZE as i64)
.exec()
.await?;
Ok(files)
.await
.map_err(|e| e.into())
}
#[derive(Deserialize, Serialize, Debug)]