mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-04-20 14:38:58 -04:00
Identifier Exponential Lag (#269)
A little refactor of the identifier job to remove memory leak and improve performance.
This commit is contained in:
@@ -1,16 +1,16 @@
|
||||
use std::collections::HashMap;
|
||||
use std::{fs, io};
|
||||
use std::path::Path;
|
||||
use std::{fs, io};
|
||||
|
||||
use crate::job::JobReportUpdate;
|
||||
use crate::prisma::file;
|
||||
use crate::sys::get_location;
|
||||
use crate::{
|
||||
file::FileError,
|
||||
job::{Job, WorkerContext},
|
||||
prisma::file_path,
|
||||
prisma::{file, file_path},
|
||||
CoreContext,
|
||||
};
|
||||
use chrono::{DateTime, FixedOffset, Utc};
|
||||
use futures::executor::block_on;
|
||||
use prisma_client_rust::prisma_models::PrismaValue;
|
||||
use prisma_client_rust::raw::Raw;
|
||||
@@ -30,6 +30,9 @@ pub struct FileIdentifierJob {
|
||||
pub path: String,
|
||||
}
|
||||
|
||||
// we break this job into chunks of 100 to improve performance
|
||||
static CHUNK_SIZE: usize = 100;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Job for FileIdentifierJob {
|
||||
fn name(&self) -> &'static str {
|
||||
@@ -41,11 +44,9 @@ impl Job for FileIdentifierJob {
|
||||
let location_path = location.path.unwrap_or("".to_string());
|
||||
|
||||
let total_count = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
|
||||
|
||||
println!("Found {} orphan file paths", total_count);
|
||||
|
||||
let task_count = (total_count as f64 / 100f64).ceil() as usize;
|
||||
|
||||
let task_count = (total_count as f64 / CHUNK_SIZE as f64).ceil() as usize;
|
||||
println!("Will process {} tasks", task_count);
|
||||
|
||||
// update job with total task count based on orphan file_paths count
|
||||
@@ -53,14 +54,23 @@ impl Job for FileIdentifierJob {
|
||||
|
||||
let db = ctx.core_ctx.database.clone();
|
||||
|
||||
let ctx = tokio::task::spawn_blocking(move || {
|
||||
let _ctx = tokio::task::spawn_blocking(move || {
|
||||
let mut completed: usize = 0;
|
||||
let mut cursor: i32 = 1;
|
||||
// map cas_id to file_path ids
|
||||
let mut cas_id_lookup: HashMap<i32, String> = HashMap::new();
|
||||
|
||||
while completed < task_count {
|
||||
let file_paths = block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)).unwrap();
|
||||
// link file_path ids to a CreateFile struct containing unique file data
|
||||
let mut chunk: HashMap<i32, CreateFile> = HashMap::new();
|
||||
let mut cas_lookup: HashMap<String, i32> = HashMap::new();
|
||||
|
||||
// get chunk of orphans to process
|
||||
let file_paths = match block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)) {
|
||||
Ok(file_paths) => file_paths,
|
||||
Err(e) => {
|
||||
println!("Error getting orphan file paths: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
println!(
|
||||
"Processing {:?} orphan files. ({} completed of {})",
|
||||
file_paths.len(),
|
||||
@@ -68,15 +78,15 @@ impl Job for FileIdentifierJob {
|
||||
task_count
|
||||
);
|
||||
|
||||
// raw values to be inserted into the database
|
||||
let mut values: Vec<PrismaValue> = Vec::new();
|
||||
|
||||
// only rows that have a valid cas_id to be inserted
|
||||
// analyze each file_path
|
||||
for file_path in file_paths.iter() {
|
||||
match prepare_file_values(&location_path, file_path) {
|
||||
Ok((cas_id, data)) => {
|
||||
cas_id_lookup.insert(file_path.id, cas_id);
|
||||
values.extend(data);
|
||||
// get the cas_id and extract metadata
|
||||
match prepare_file(&location_path, file_path) {
|
||||
Ok(file) => {
|
||||
let cas_id = file.cas_id.clone();
|
||||
// create entry into chunks for created file data
|
||||
chunk.insert(file_path.id, file);
|
||||
cas_lookup.insert(cas_id, file_path.id);
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error processing file: {}", e);
|
||||
@@ -84,80 +94,93 @@ impl Job for FileIdentifierJob {
|
||||
}
|
||||
};
|
||||
}
|
||||
if values.len() == 0 {
|
||||
println!("No orphan files to process, finishing...");
|
||||
break;
|
||||
|
||||
// find all existing files by cas id
|
||||
let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect();
|
||||
let existing_files: Vec<file::Data> = block_on(
|
||||
db.file()
|
||||
.find_many(vec![file::cas_id::in_vec(generated_cas_ids)])
|
||||
.exec(),
|
||||
)
|
||||
.unwrap();
|
||||
println!("Found {} existing files", existing_files.len());
|
||||
|
||||
// TODO: link existing files to file_paths
|
||||
for file in existing_files.iter() {
|
||||
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
|
||||
block_on(
|
||||
db.file_path()
|
||||
.find_unique(file_path::id::equals(file_path_id.clone()))
|
||||
.update(vec![file_path::file_id::set(Some(file.id.clone()))])
|
||||
.exec(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
println!("Inserting {} unique file records ({:?} values)", file_paths.len(), values.len());
|
||||
|
||||
let files: Vec<FileCreated> = block_on(db._query_raw(Raw::new(
|
||||
&format!(
|
||||
"INSERT INTO files (cas_id, size_in_bytes) VALUES {} ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
|
||||
vec!["({}, {})"; file_paths.len()].join(",")
|
||||
),
|
||||
values
|
||||
))).unwrap_or_else(|e| {
|
||||
// extract files that don't already exist in the database
|
||||
let new_files: Vec<&CreateFile> = chunk
|
||||
.iter()
|
||||
.map(|(_, c)| c)
|
||||
.filter(|c| !existing_files.iter().any(|d| d.cas_id == c.cas_id))
|
||||
.collect();
|
||||
|
||||
// assemble prisma values
|
||||
let mut values: Vec<PrismaValue> = Vec::new();
|
||||
for file in new_files.iter() {
|
||||
values.extend([
|
||||
PrismaValue::String(file.cas_id.clone()),
|
||||
PrismaValue::Int(file.size_in_bytes.clone()),
|
||||
PrismaValue::DateTime(file.date_created.clone()),
|
||||
]);
|
||||
}
|
||||
|
||||
// create new files
|
||||
let created_files: Vec<FileCreated> = block_on(db._query_raw(Raw::new(
|
||||
&format!(
|
||||
"INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {}
|
||||
ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
|
||||
vec!["({}, {}, {})"; new_files.len()].join(",")
|
||||
),
|
||||
values,
|
||||
)))
|
||||
.unwrap_or_else(|e| {
|
||||
println!("Error inserting files: {}", e);
|
||||
Vec::new()
|
||||
});
|
||||
|
||||
println!("Unique files: {:?}" , files);
|
||||
|
||||
// assign unique file to file path
|
||||
println!("Assigning {} unique file ids to origin file_paths", files.len());
|
||||
for (file_path_id, cas_id) in cas_id_lookup.iter() {
|
||||
// get the cas id from the lookup table
|
||||
let file = files.iter().find(|f| &f.cas_id == cas_id);
|
||||
let file_id: i32;
|
||||
if let Some(file) = file {
|
||||
file_id = file.id;
|
||||
} else {
|
||||
let unique_file = match block_on(db.file().find_unique(file::cas_id::equals(cas_id.clone())).exec()) {
|
||||
Ok(f) => match f {
|
||||
Some(f) => f,
|
||||
None => {
|
||||
println!("Unique file does not exist, this shouldn't happen: {}", cas_id);
|
||||
continue;
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
println!("Error finding unique file: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
file_id = unique_file.id;
|
||||
}
|
||||
|
||||
block_on(
|
||||
db.file_path()
|
||||
.find_unique(file_path::id::equals(file_path_id.clone()))
|
||||
.update(vec![
|
||||
file_path::file_id::set(Some(file_id))
|
||||
])
|
||||
.exec()
|
||||
).unwrap();
|
||||
// associate newly created files with their respective file_paths
|
||||
for file in created_files.iter() {
|
||||
// TODO: This is bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk.
|
||||
// Maybe an insert many could work? not sure.
|
||||
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
|
||||
block_on(
|
||||
db.file_path()
|
||||
.find_unique(file_path::id::equals(file_path_id.clone()))
|
||||
.update(vec![file_path::file_id::set(Some(file.id.clone()))])
|
||||
.exec(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// handle loop end
|
||||
let last_row = file_paths.last().unwrap();
|
||||
|
||||
cursor = last_row.id;
|
||||
completed += 1;
|
||||
|
||||
ctx.progress(vec![
|
||||
JobReportUpdate::CompletedTaskCount(completed),
|
||||
JobReportUpdate::Message(format!(
|
||||
"Processed {} of {} orphan files",
|
||||
completed,
|
||||
task_count
|
||||
)),
|
||||
JobReportUpdate::CompletedTaskCount(completed),
|
||||
JobReportUpdate::Message(format!(
|
||||
"Processed {} of {} orphan files",
|
||||
completed * CHUNK_SIZE,
|
||||
total_count
|
||||
)),
|
||||
]);
|
||||
}
|
||||
ctx
|
||||
})
|
||||
.await?;
|
||||
|
||||
let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
|
||||
|
||||
// let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -195,22 +218,34 @@ pub async fn get_orphan_file_paths(
|
||||
])
|
||||
.order_by(file_path::id::order(Direction::Asc))
|
||||
.cursor(file_path::id::cursor(cursor))
|
||||
.take(100)
|
||||
.take(CHUNK_SIZE as i64)
|
||||
.exec()
|
||||
.await?;
|
||||
Ok(files)
|
||||
}
|
||||
|
||||
pub fn prepare_file_values(
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
pub struct CreateFile {
|
||||
pub cas_id: String,
|
||||
pub size_in_bytes: i64,
|
||||
pub date_created: DateTime<FixedOffset>,
|
||||
}
|
||||
|
||||
pub fn prepare_file(
|
||||
location_path: &str,
|
||||
file_path: &file_path::Data,
|
||||
) -> Result<(String, [PrismaValue; 2]), io::Error> {
|
||||
) -> Result<CreateFile, io::Error> {
|
||||
let path = Path::new(&location_path).join(Path::new(file_path.materialized_path.as_str()));
|
||||
// println!("Processing file: {:?}", path);
|
||||
|
||||
let metadata = fs::metadata(&path)?;
|
||||
|
||||
let date_created: DateTime<Utc> = metadata.created().unwrap().into();
|
||||
|
||||
let size = metadata.len();
|
||||
|
||||
let cas_id = {
|
||||
if !file_path.is_dir {
|
||||
let mut ret = generate_cas_id(path.clone(), metadata.len()).unwrap();
|
||||
let mut ret = generate_cas_id(path.clone(), size.clone()).unwrap();
|
||||
ret.truncate(16);
|
||||
ret
|
||||
} else {
|
||||
@@ -218,7 +253,9 @@ pub fn prepare_file_values(
|
||||
}
|
||||
};
|
||||
|
||||
println!("cas id for path {:?} is {:?}", path, cas_id);
|
||||
|
||||
Ok((cas_id.clone(), [PrismaValue::String(cas_id), PrismaValue::Int(0)]))
|
||||
Ok(CreateFile {
|
||||
cas_id,
|
||||
size_in_bytes: size as i64,
|
||||
date_created: date_created.into(),
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user