mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-04-20 06:28:14 -04:00
Merge remote-tracking branch 'origin/eng-237-explorer-performance-2' into eng-237-explorer-performance-2
This commit is contained in:
@@ -1,9 +1,7 @@
|
||||
use crate::{
|
||||
api::{locations::LocationExplorerArgs, CoreEvent, LibraryArgs},
|
||||
invalidate_query,
|
||||
job::{
|
||||
JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
|
||||
},
|
||||
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
|
||||
library::LibraryContext,
|
||||
prisma::{file_path, location},
|
||||
};
|
||||
@@ -95,7 +93,7 @@ impl StatefulJob for ThumbnailJob {
|
||||
});
|
||||
state.steps = image_files.into_iter().collect();
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn execute_step(
|
||||
@@ -128,12 +126,12 @@ impl StatefulJob for ThumbnailJob {
|
||||
"skipping thumbnail generation for {}",
|
||||
step.materialized_path
|
||||
);
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Error getting cas_id {:?}", step.materialized_path);
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -175,14 +173,14 @@ impl StatefulJob for ThumbnailJob {
|
||||
state.step_number + 1,
|
||||
)]);
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn finalize(
|
||||
&self,
|
||||
_ctx: WorkerContext,
|
||||
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
|
||||
) -> Result<JobMetadata, JobError> {
|
||||
) -> JobResult {
|
||||
let data = state
|
||||
.data
|
||||
.as_ref()
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
use crate::{
|
||||
job::{
|
||||
JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
|
||||
},
|
||||
job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
|
||||
library::LibraryContext,
|
||||
prisma::{file, file_path, location},
|
||||
};
|
||||
@@ -126,7 +124,7 @@ impl StatefulJob for FileIdentifierJob {
|
||||
});
|
||||
|
||||
state.steps = (0..task_count).map(|_| ()).collect();
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn execute_step(
|
||||
@@ -290,14 +288,14 @@ impl StatefulJob for FileIdentifierJob {
|
||||
]);
|
||||
|
||||
// let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
async fn finalize(
|
||||
&self,
|
||||
_ctx: WorkerContext,
|
||||
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
|
||||
) -> Result<JobMetadata, JobError> {
|
||||
) -> JobResult {
|
||||
let data = state
|
||||
.data
|
||||
.as_ref()
|
||||
|
||||
@@ -42,7 +42,7 @@ pub enum JobError {
|
||||
Paused(Vec<u8>),
|
||||
}
|
||||
|
||||
pub type JobResult = Result<(), JobError>;
|
||||
pub type JobResult = Result<JobMetadata, JobError>;
|
||||
pub type JobMetadata = Option<Vec<u8>>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -68,7 +68,7 @@ pub trait StatefulJob: Send + Sync {
|
||||
&self,
|
||||
ctx: WorkerContext,
|
||||
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
|
||||
) -> Result<JobMetadata, JobError>;
|
||||
) -> JobResult;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -184,12 +184,8 @@ where
|
||||
self.state.step_number += 1;
|
||||
}
|
||||
|
||||
// It is ok to unwrap here, a running job will always have a report.
|
||||
self.report.as_mut().unwrap().metadata = self
|
||||
.stateful_job
|
||||
self.stateful_job
|
||||
.finalize(ctx.clone(), &mut self.state)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::job::{DynJob, JobError, JobManager, JobReportUpdate, JobStatus};
|
||||
use crate::job::{DynJob, JobError, JobManager, JobMetadata, JobReportUpdate, JobStatus};
|
||||
use crate::library::LibraryContext;
|
||||
use crate::{api::LibraryArgs, invalidate_query};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
@@ -22,7 +22,7 @@ pub enum WorkerEvent {
|
||||
updates: Vec<JobReportUpdate>,
|
||||
debounce: bool,
|
||||
},
|
||||
Completed(oneshot::Sender<()>),
|
||||
Completed(oneshot::Sender<()>, JobMetadata),
|
||||
Failed(oneshot::Sender<()>),
|
||||
Paused(Vec<u8>, oneshot::Sender<()>),
|
||||
}
|
||||
@@ -151,25 +151,27 @@ impl Worker {
|
||||
|
||||
let (done_tx, done_rx) = oneshot::channel();
|
||||
|
||||
if let Err(e) = job.run(worker_ctx.clone()).await {
|
||||
if let JobError::Paused(state) = e {
|
||||
match job.run(worker_ctx.clone()).await {
|
||||
Ok(metadata) => {
|
||||
// handle completion
|
||||
worker_ctx
|
||||
.events_tx
|
||||
.send(WorkerEvent::Completed(done_tx, metadata))
|
||||
.expect("critical error: failed to send worker complete event");
|
||||
}
|
||||
Err(JobError::Paused(state)) => {
|
||||
worker_ctx
|
||||
.events_tx
|
||||
.send(WorkerEvent::Paused(state, done_tx))
|
||||
.expect("critical error: failed to send worker pause event");
|
||||
} else {
|
||||
}
|
||||
Err(e) => {
|
||||
error!("job '{}' failed with error: {:#?}", job_id, e);
|
||||
worker_ctx
|
||||
.events_tx
|
||||
.send(WorkerEvent::Failed(done_tx))
|
||||
.expect("critical error: failed to send worker fail event");
|
||||
}
|
||||
} else {
|
||||
// handle completion
|
||||
worker_ctx
|
||||
.events_tx
|
||||
.send(WorkerEvent::Completed(done_tx))
|
||||
.expect("critical error: failed to send worker complete event");
|
||||
}
|
||||
|
||||
if let Err(e) = done_rx.await {
|
||||
@@ -228,9 +230,10 @@ impl Worker {
|
||||
LibraryArgs::new(library.id, ())
|
||||
);
|
||||
}
|
||||
WorkerEvent::Completed(done_tx) => {
|
||||
WorkerEvent::Completed(done_tx, metadata) => {
|
||||
worker.report.status = JobStatus::Completed;
|
||||
worker.report.data = None;
|
||||
worker.report.metadata = metadata;
|
||||
if let Err(e) = worker.report.update(&library).await {
|
||||
error!("failed to update job report: {:#?}", e);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
use crate::{
|
||||
job::{
|
||||
JobError, JobMetadata, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext,
|
||||
},
|
||||
job::{JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext},
|
||||
prisma::{file_path, location},
|
||||
};
|
||||
|
||||
@@ -221,7 +219,7 @@ impl StatefulJob for IndexerJob {
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Process each chunk of entries in the indexer job, writing to the `file_path` table
|
||||
@@ -284,7 +282,7 @@ impl StatefulJob for IndexerJob {
|
||||
|
||||
info!("Inserted {count} records");
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Logs some metadata about the indexer job
|
||||
@@ -292,7 +290,7 @@ impl StatefulJob for IndexerJob {
|
||||
&self,
|
||||
_ctx: WorkerContext,
|
||||
state: &mut JobState<Self::Init, Self::Data, Self::Step>,
|
||||
) -> Result<JobMetadata, JobError> {
|
||||
) -> JobResult {
|
||||
let data = state
|
||||
.data
|
||||
.as_ref()
|
||||
|
||||
Reference in New Issue
Block a user