(fix) identifier job: better error handling

This commit is contained in:
Jamie Pine
2022-09-06 23:21:58 -07:00
parent 90afd62b73
commit 6b5a6fd39c
2 changed files with 65 additions and 67 deletions

View File

@@ -5,7 +5,7 @@ use crate::{
};
use chrono::{DateTime, FixedOffset};
use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction};
use prisma_client_rust::{prisma_models::PrismaValue, raw::Raw, Direction};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
@@ -67,7 +67,7 @@ impl StatefulJob for FileIdentifierJob {
ctx: WorkerContext,
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
) -> JobResult {
info!("Identifying orphan file paths...");
info!("Identifying orphan Paths...");
let library = ctx.library_ctx();
@@ -94,10 +94,11 @@ impl StatefulJob for FileIdentifierJob {
.exec()
.await? as usize;
info!("Found {} orphan file paths", total_count);
let task_count = (total_count as f64 / CHUNK_SIZE as f64).ceil() as usize;
info!("Will process {} tasks", task_count);
info!(
"Found {} orphan Paths. Will execute {} tasks...",
total_count, task_count
);
// update job with total task count based on orphan file_paths count
ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]);
@@ -123,7 +124,6 @@ impl StatefulJob for FileIdentifierJob {
});
state.steps = (0..task_count).map(|_| ()).collect();
Ok(())
}
@@ -139,19 +139,21 @@ impl StatefulJob for FileIdentifierJob {
let data = state
.data
.as_mut()
.expect("critical error: missing data on job state");
.expect("Critical error: missing data on job state");
// get chunk of orphans to process
let file_paths =
match get_orphan_file_paths(&ctx.library_ctx(), &data.cursor, data.location.id).await {
Ok(file_paths) => file_paths,
Err(e) => {
info!("Error getting orphan file paths: {:#?}", e);
return Ok(());
}
};
get_orphan_file_paths(&ctx.library_ctx(), &data.cursor, data.location.id).await?;
// if no file paths found, abort entire job early
if file_paths.is_empty() {
return Err(JobError::JobDataNotFound(
"Expected orphan Paths not returned from database query for this chunk".to_string(),
));
}
info!(
"Processing {:?} orphan files. ({} completed of {})",
"Processing {:?} orphan Paths. ({} completed of {})",
file_paths.len(),
state.step_number,
data.task_count
@@ -218,74 +220,68 @@ impl StatefulJob for FileIdentifierJob {
.filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id))
.collect::<Vec<_>>();
if new_files.is_empty() {
error!("This shouldn't happen?");
return Ok(());
}
if !new_files.is_empty() {
// assemble prisma values for new unique files
let mut values = Vec::with_capacity(new_files.len() * 3);
for file in &new_files {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes),
PrismaValue::DateTime(file.date_created),
]);
}
// assemble prisma values for new unique files
let mut values = Vec::with_capacity(new_files.len() * 3);
for file in &new_files {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes),
PrismaValue::DateTime(file.date_created),
]);
}
// create new file records with assembled values
let created_files: Vec<FileCreated> = ctx
.library_ctx()
.db
._query_raw(Raw::new(
&format!(
"INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {}
ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
vec!["({}, {}, {})"; new_files.len()].join(",")
),
values,
))
.exec()
.await
.unwrap_or_else(|e| {
error!("Error inserting files: {:#?}", e);
Vec::new()
});
for created_file in created_files {
// associate newly created files with their respective file_paths
// TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk
// - insert many could work, but I couldn't find a good way to do this in a single SQL query
if let Err(e) = ctx
// create new file records with assembled values
let created_files: Vec<FileCreated> = ctx
.library_ctx()
.db
.file_path()
.update(
file_path::location_id_id(
state.init.location.id,
*cas_lookup.get(&created_file.cas_id).unwrap(),
._query_raw(Raw::new(
&format!(
"INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {}
ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
vec!["({}, {}, {})"; new_files.len()].join(",")
),
vec![file_path::file_id::set(Some(created_file.id))],
)
values,
))
.exec()
.await
{
info!("Error updating file_id: {:#?}", e);
.unwrap_or_else(|e| {
error!("Error inserting files: {:#?}", e);
Vec::new()
});
for created_file in created_files {
// associate newly created files with their respective file_paths
// TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk
// - insert many could work, but I couldn't find a good way to do this in a single SQL query
if let Err(e) = ctx
.library_ctx()
.db
.file_path()
.update(
file_path::location_id_id(
state.init.location.id,
*cas_lookup.get(&created_file.cas_id).unwrap(),
),
vec![file_path::file_id::set(Some(created_file.id))],
)
.exec()
.await
{
info!("Error updating file_id: {:#?}", e);
}
}
}
// set the step data cursor to the last row of this chunk
if let Some(last_row) = file_paths.last() {
data.cursor.file_path_id = last_row.id;
} else {
return Ok(());
}
// }
ctx.progress(vec![
JobReportUpdate::CompletedTaskCount(state.step_number),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
"Processed {} of {} orphan Paths",
state.step_number * CHUNK_SIZE,
data.total_count
)),
@@ -333,7 +329,7 @@ async fn get_orphan_file_paths(
location_id: i32,
) -> Result<Vec<file_path::Data>, prisma_client_rust::QueryError> {
info!(
"discovering {} orphan file paths at cursor: {:?}",
"Querying {} orphan Paths at cursor: {:?}",
CHUNK_SIZE, cursor
);
ctx.db

View File

@@ -34,6 +34,8 @@ pub enum JobError {
MissingJobDataState(Uuid, String),
#[error("Indexer error: {0}")]
IndexerError(#[from] IndexerError),
#[error("Data needed for job execution not found: job <name='{0}'>")]
JobDataNotFound(String),
#[error("Job paused")]
Paused(Vec<u8>),
}