From 54f757bf1fb20dcd17fddff92d92535a901fbd09 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Sat, 15 Apr 2023 14:43:04 +0800 Subject: [PATCH] Job system improvements v2 (#707) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add `JobInitData` trait + change `StatefulJob::name` to a constant * `StatefulJob::new` * Move `invalidate_query` into `StatefulJob::finalize` * Cleanup `spawn_job` signature * Remove `queue_job` and run jobs from the `StatefulJob::finalize` method * `StatefulJob::queue_jobs` * `JobManager::ingest` return `Result` * Remove `jobs.isRunning` * Invalidation system direct push + batching invalidate * Look ma, only a single clippy warning!!!! * Error handling for JobManager * Rust fmt * Introducing Job hierarchy to enable job enqueuing * Rust fmt again 🙄 * core.ts --------- Co-authored-by: Ericson Soares --- Cargo.lock | Bin 229421 -> 229665 bytes core/Cargo.toml | 9 +- .../20230210031123_init/migration.sql | 4 +- core/prisma/schema.prisma | 13 +- core/src/api/files.rs | 118 +++---- core/src/api/jobs.rs | 72 ++-- core/src/api/mod.rs | 29 +- core/src/api/utils/invalidate.rs | 155 ++++++-- core/src/job/job_manager.rs | 333 ++++++++++-------- core/src/job/mod.rs | 245 +++++++++++-- core/src/job/worker.rs | 23 +- core/src/lib.rs | 35 -- core/src/library/library.rs | 21 +- core/src/library/manager.rs | 49 ++- core/src/location/file_path_helper.rs | 6 +- core/src/location/indexer/indexer_job.rs | 14 +- .../location/indexer/shallow_indexer_job.rs | 13 +- core/src/location/manager/mod.rs | 4 +- core/src/location/manager/watcher/utils.rs | 2 +- core/src/location/mod.rs | 147 +++----- .../file_identifier/file_identifier_job.rs | 20 +- .../shallow_file_identifier_job.rs | 20 +- core/src/object/fs/copy.rs | 32 +- core/src/object/fs/cut.rs | 21 +- core/src/object/fs/decrypt.rs | 23 +- core/src/object/fs/delete.rs | 21 +- core/src/object/fs/encrypt.rs | 30 +- core/src/object/fs/erase.rs | 21 +- .../thumbnail/shallow_thumbnailer_job.rs | 16 +- .../preview/thumbnail/thumbnailer_job.rs | 16 +- core/src/object/validation/validator_job.rs | 16 +- core/src/sync/manager.rs | 84 ++--- .../Layout/Sidebar/IsRunningJob.tsx | 3 +- packages/client/src/core.ts | 7 +- packages/client/src/rspc.ts | 19 +- 35 files changed, 995 insertions(+), 646 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 924122f6e8438f42a6dd149211e3090e4e398f2b..cd2dd2fa8a78633f992f1e2639f040a10315b2ad 100644 GIT binary patch delta 132 zcmZ46z_+l8uc3u;3)A!@4&98z;*9Ln%BdHbq_!_lVmc$j1`(P5Qj9UlKH1PX(b6Er zIN8`RH7Ug)$-+3**uu=fAlbsu(A)sXOEyV1Ni|F~Pc%qQOfohzNj0}jHMBH0FiT8H gH87d%XeTy(!W|}#>H5`7GVMj>OxugfnRjyl0E~z#wEzGB delta 39 vcmZ43#J9GAuc3u;3)A$Z=?(3S+|vX8GqP^qnZ$HPq&>ZYX?uDF^DYhmHx>^r diff --git a/core/Cargo.toml b/core/Cargo.toml index 069810bd6..6adf9383d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,12 +10,8 @@ rust-version = "1.68.1" [features] default = [] -mobile = [ -] # This feature allows features to be disabled when the Core is running on mobile. -ffmpeg = [ - "dep:ffmpeg-next", - "dep:sd-ffmpeg", -] # This feature controls whether the Spacedrive Core contains functionality which requires FFmpeg. +mobile = [] # This feature allows features to be disabled when the Core is running on mobile. +ffmpeg = ["dep:ffmpeg-next", "dep:sd-ffmpeg"] # This feature controls whether the Spacedrive Core contains functionality which requires FFmpeg. location-watcher = ["dep:notify"] sync-messages = [] @@ -77,6 +73,7 @@ notify = { version = "5.0.0", default-features = false, features = [ "macos_fsevent", ], optional = true } static_assertions = "1.1.0" +serde-hashkey = "0.4.5" [target.'cfg(windows)'.dependencies.winapi-util] version = "0.1.5" diff --git a/core/prisma/migrations/20230210031123_init/migration.sql b/core/prisma/migrations/20230210031123_init/migration.sql index 72f9a7ed4..6e552b740 100644 --- a/core/prisma/migrations/20230210031123_init/migration.sql +++ b/core/prisma/migrations/20230210031123_init/migration.sql @@ -236,12 +236,14 @@ CREATE TABLE "job" ( "status" INTEGER NOT NULL DEFAULT 0, "data" BLOB, "metadata" BLOB, + "parent_id" BLOB, "task_count" INTEGER NOT NULL DEFAULT 1, "completed_task_count" INTEGER NOT NULL DEFAULT 0, "date_created" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, "date_modified" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, "seconds_elapsed" INTEGER NOT NULL DEFAULT 0, - CONSTRAINT "job_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "node" ("id") ON DELETE CASCADE ON UPDATE CASCADE + CONSTRAINT "job_node_id_fkey" FOREIGN KEY ("node_id") REFERENCES "node" ("id") ON DELETE CASCADE ON UPDATE CASCADE, + CONSTRAINT "job_parent_id_fkey" FOREIGN KEY ("parent_id") REFERENCES "job" ("id") ON DELETE CASCADE ON UPDATE CASCADE ); -- CreateTable diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index ba05531ed..4ffb54106 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -134,7 +134,7 @@ model FilePath { name String extension String - size_in_bytes String @default("0") + size_in_bytes String @default("0") inode Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite device Bytes // This is actually an unsigned 64 bit integer, but we don't have this type in SQLite @@ -167,9 +167,9 @@ model FilePath { /// @shared(id: pub_id) model Object { - id Int @id @default(autoincrement()) - pub_id Bytes @unique - kind Int @default(0) + id Int @id @default(autoincrement()) + pub_id Bytes @unique + kind Int @default(0) key_id Int? // handy ways to mark an object @@ -362,6 +362,8 @@ model Job { data Bytes? metadata Bytes? + parent_id Bytes? + task_count Int @default(1) completed_task_count Int @default(0) date_created DateTime @default(now()) @@ -370,6 +372,9 @@ model Job { nodes Node @relation(fields: [node_id], references: [id], onDelete: Cascade, onUpdate: Cascade) + parent Job? @relation("jobs_dependency", fields: [parent_id], references: [id], onDelete: Cascade, onUpdate: Cascade) + children Job[] @relation("jobs_dependency") + @@map("job") } diff --git a/core/src/api/files.rs b/core/src/api/files.rs index 63b726eb1..d09101df2 100644 --- a/core/src/api/files.rs +++ b/core/src/api/files.rs @@ -1,15 +1,10 @@ use crate::{ invalidate_query, - job::Job, library::Library, - location::{find_location, LocationError}, + location::{file_path_helper::MaterializedPath, find_location, LocationError}, object::fs::{ - copy::{FileCopierJob, FileCopierJobInit}, - cut::{FileCutterJob, FileCutterJobInit}, - decrypt::{FileDecryptorJob, FileDecryptorJobInit}, - delete::{FileDeleterJob, FileDeleterJobInit}, - encrypt::{FileEncryptorJob, FileEncryptorJobInit}, - erase::{FileEraserJob, FileEraserJobInit}, + copy::FileCopierJobInit, cut::FileCutterJobInit, decrypt::FileDecryptorJobInit, + delete::FileDeleterJobInit, encrypt::FileEncryptorJobInit, erase::FileEraserJobInit, }, prisma::{location, object}, }; @@ -17,7 +12,7 @@ use crate::{ use rspc::{ErrorCode, Type}; use serde::Deserialize; use std::path::Path; -use tokio::{fs, sync::oneshot}; +use tokio::fs; use super::{utils::LibraryRequest, RouterBuilder}; @@ -102,83 +97,40 @@ pub(crate) fn mount() -> RouterBuilder { .library_mutation("encryptFiles", |t| { t( |_, args: FileEncryptorJobInit, library: Library| async move { - library.spawn_job(Job::new(args, FileEncryptorJob {})).await; - invalidate_query!(library, "locations.getExplorerData"); - - Ok(()) + library.spawn_job(args).await.map_err(Into::into) }, ) }) .library_mutation("decryptFiles", |t| { t( |_, args: FileDecryptorJobInit, library: Library| async move { - library.spawn_job(Job::new(args, FileDecryptorJob {})).await; - invalidate_query!(library, "locations.getExplorerData"); - - Ok(()) + library.spawn_job(args).await.map_err(Into::into) }, ) }) .library_mutation("deleteFiles", |t| { t(|_, args: FileDeleterJobInit, library: Library| async move { - library.spawn_job(Job::new(args, FileDeleterJob {})).await; - invalidate_query!(library, "locations.getExplorerData"); - - Ok(()) + library.spawn_job(args).await.map_err(Into::into) }) }) .library_mutation("eraseFiles", |t| { t(|_, args: FileEraserJobInit, library: Library| async move { - library.spawn_job(Job::new(args, FileEraserJob {})).await; - invalidate_query!(library, "locations.getExplorerData"); - - Ok(()) + library.spawn_job(args).await.map_err(Into::into) }) }) .library_mutation("duplicateFiles", |t| { t(|_, args: FileCopierJobInit, library: Library| async move { - let (done_tx, done_rx) = oneshot::channel(); - - library - .spawn_job(Job::new( - args, - FileCopierJob { - done_tx: Some(done_tx), - }, - )) - .await; - - let _ = done_rx.await; - invalidate_query!(library, "locations.getExplorerData"); - - Ok(()) + library.spawn_job(args).await.map_err(Into::into) }) }) .library_mutation("copyFiles", |t| { t(|_, args: FileCopierJobInit, library: Library| async move { - let (done_tx, done_rx) = oneshot::channel(); - - library - .spawn_job(Job::new( - args, - FileCopierJob { - done_tx: Some(done_tx), - }, - )) - .await; - - let _ = done_rx.await; - invalidate_query!(library, "locations.getExplorerData"); - - Ok(()) + library.spawn_job(args).await.map_err(Into::into) }) }) .library_mutation("cutFiles", |t| { t(|_, args: FileCutterJobInit, library: Library| async move { - library.spawn_job(Job::new(args, FileCutterJob {})).await; - invalidate_query!(library, "locations.getExplorerData"); - - Ok(()) + library.spawn_job(args).await.map_err(Into::into) }) }) .library_mutation("renameFile", |t| { @@ -189,26 +141,38 @@ pub(crate) fn mount() -> RouterBuilder { pub new_file_name: String, } - t(|_, args: RenameFileArgs, library: Library| async move { - let location = find_location(&library, args.location_id) - .select(location::select!({ path })) - .exec() - .await? - .ok_or(LocationError::IdNotFound(args.location_id))?; + t( + |_, + RenameFileArgs { + location_id, + file_name, + new_file_name, + }: RenameFileArgs, + library: Library| async move { + let location = find_location(&library, location_id) + .select(location::select!({ path })) + .exec() + .await? + .ok_or(LocationError::IdNotFound(location_id))?; - let location_path = Path::new(&location.path); - fs::rename( - location_path.join(&args.file_name), - location_path.join(&args.new_file_name), - ) - .await - .map_err(|e| { - rspc::Error::new(ErrorCode::Conflict, format!("Failed to rename file: {e}")) - })?; + let location_path = Path::new(&location.path); + fs::rename( + location_path.join(&MaterializedPath::from((location_id, &file_name))), + location_path.join(&MaterializedPath::from((location_id, &new_file_name))), + ) + .await + .map_err(|e| { + rspc::Error::with_cause( + ErrorCode::Conflict, + "Failed to rename file".to_string(), + e, + ) + })?; - invalidate_query!(library, "tags.getExplorerData"); + invalidate_query!(library, "tags.getExplorerData"); - Ok(()) - }) + Ok(()) + }, + ) }) } diff --git a/core/src/api/jobs.rs b/core/src/api/jobs.rs index 8a27561e5..c9df9852a 100644 --- a/core/src/api/jobs.rs +++ b/core/src/api/jobs.rs @@ -1,10 +1,10 @@ use crate::{ - job::{Job, JobManager}, + job::JobManager, location::{find_location, LocationError}, object::{ - file_identifier::file_identifier_job::{FileIdentifierJob, FileIdentifierJobInit}, - preview::thumbnailer_job::{ThumbnailerJob, ThumbnailerJobInit}, - validation::validator_job::{ObjectValidatorJob, ObjectValidatorJobInit}, + file_identifier::file_identifier_job::FileIdentifierJobInit, + preview::thumbnailer_job::ThumbnailerJobInit, + validation::validator_job::ObjectValidatorJobInit, }, }; @@ -19,16 +19,16 @@ pub(crate) fn mount() -> RouterBuilder { .library_query("getRunning", |t| { t(|ctx, _: (), _| async move { Ok(ctx.jobs.get_running().await) }) }) - .library_query("isRunning", |t| { - t(|ctx, _: (), _| async move { Ok(!ctx.jobs.get_running().await.is_empty()) }) - }) .library_query("getHistory", |t| { - t(|_, _: (), library| async move { Ok(JobManager::get_history(&library).await?) }) + t(|_, _: (), library| async move { + JobManager::get_history(&library).await.map_err(Into::into) + }) }) .library_mutation("clearAll", |t| { t(|_, _: (), library| async move { - JobManager::clear_all_jobs(&library).await?; - Ok(()) + JobManager::clear_all_jobs(&library) + .await + .map_err(Into::into) }) }) .library_mutation("generateThumbsForLocation", |t| { @@ -45,17 +45,13 @@ pub(crate) fn mount() -> RouterBuilder { }; library - .spawn_job(Job::new( - ThumbnailerJobInit { - location, - sub_path: Some(args.path), - background: false, - }, - ThumbnailerJob {}, - )) - .await; - - Ok(()) + .spawn_job(ThumbnailerJobInit { + location, + sub_path: Some(args.path), + background: false, + }) + .await + .map_err(Into::into) }, ) }) @@ -72,17 +68,13 @@ pub(crate) fn mount() -> RouterBuilder { } library - .spawn_job(Job::new( - ObjectValidatorJobInit { - location_id: args.id, - path: args.path, - background: true, - }, - ObjectValidatorJob {}, - )) - .await; - - Ok(()) + .spawn_job(ObjectValidatorJobInit { + location_id: args.id, + path: args.path, + background: true, + }) + .await + .map_err(Into::into) }) }) .library_mutation("identifyUniqueFiles", |t| { @@ -98,16 +90,12 @@ pub(crate) fn mount() -> RouterBuilder { }; library - .spawn_job(Job::new( - FileIdentifierJobInit { - location, - sub_path: Some(args.path), - }, - FileIdentifierJob {}, - )) - .await; - - Ok(()) + .spawn_job(FileIdentifierJobInit { + location, + sub_path: Some(args.path), + }) + .await + .map_err(Into::into) }) }) .library_subscription("newThumbnail", |t| { diff --git a/core/src/api/mod.rs b/core/src/api/mod.rs index 08b65c56a..b7064c552 100644 --- a/core/src/api/mod.rs +++ b/core/src/api/mod.rs @@ -1,9 +1,6 @@ use rspc::{Config, Type}; use serde::{Deserialize, Serialize}; -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::sync::Arc; use crate::{node::NodeConfig, Node}; @@ -18,7 +15,6 @@ pub(crate) type RouterBuilder = rspc::RouterBuilder; pub enum CoreEvent { NewThumbnail { cas_id: String }, InvalidateOperation(InvalidateOperationEvent), - InvalidateOperationDebounced(InvalidateOperationEvent), } mod files; @@ -86,28 +82,7 @@ pub(crate) fn mount() -> Arc { .yolo_merge("jobs.", jobs::mount()) .yolo_merge("p2p.", p2p::mount()) .yolo_merge("sync.", sync::mount()) - // TODO: Scope the invalidate queries to a specific library (filtered server side) - .subscription("invalidateQuery", |t| { - t(|ctx, _: ()| { - let mut event_bus_rx = ctx.event_bus.0.subscribe(); - let mut last = Instant::now(); - async_stream::stream! { - while let Ok(event) = event_bus_rx.recv().await { - match event { - CoreEvent::InvalidateOperation(op) => yield op, - CoreEvent::InvalidateOperationDebounced(op) => { - let current = Instant::now(); - if current.duration_since(last) > Duration::from_millis(1000 / 10) { - last = current; - yield op; - } - }, - _ => {} - } - } - } - }) - }) + .yolo_merge("invalidation.", utils::mount_invalidate()) .build() .arced(); InvalidRequests::validate(r.clone()); // This validates all invalidation calls. diff --git a/core/src/api/utils/invalidate.rs b/core/src/api/utils/invalidate.rs index 7d4d63b48..497387aaf 100644 --- a/core/src/api/utils/invalidate.rs +++ b/core/src/api/utils/invalidate.rs @@ -1,9 +1,20 @@ -use crate::api::Router; +use crate::api::{CoreEvent, Router, RouterBuilder}; +use async_stream::stream; use rspc::{internal::specta::DataType, Type}; use serde::Serialize; +use serde_hashkey::to_key; use serde_json::Value; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::sync::broadcast; +use tracing::warn; #[cfg(debug_assertions)] use std::sync::Mutex; @@ -18,12 +29,13 @@ pub struct InvalidateOperationEvent { /// This fields are intentionally private. key: &'static str, arg: Value, + result: Option, } impl InvalidateOperationEvent { /// If you are using this function, your doing it wrong. - pub fn dangerously_create(key: &'static str, arg: Value) -> Self { - Self { key, arg } + pub fn dangerously_create(key: &'static str, arg: Value, result: Option) -> Self { + Self { key, arg, result } } } @@ -32,7 +44,8 @@ impl InvalidateOperationEvent { #[allow(dead_code)] pub(crate) struct InvalidationRequest { pub key: &'static str, - pub input_ty: Option, + pub arg_ty: Option, + pub result_ty: Option, pub macro_src: &'static str, } @@ -60,11 +73,20 @@ impl InvalidRequests { let queries = r.queries(); for req in &invalidate_requests.queries { if let Some(query_ty) = queries.get(req.key) { - if let Some(input) = &req.input_ty { - if &query_ty.ty.input != input { + if let Some(arg) = &req.arg_ty { + if &query_ty.ty.input != arg { panic!( "Error at '{}': Attempted to invalid query '{}' but the argument type does not match the type defined on the router.", req.macro_src, req.key + ); + } + } + + if let Some(result) = &req.result_ty { + if &query_ty.ty.result != result { + panic!( + "Error at '{}': Attempted to invalid query '{}' but the data type does not match the type defined on the router.", + req.macro_src, req.key ); } } @@ -91,8 +113,8 @@ impl InvalidRequests { #[macro_export] #[allow(clippy::crate_in_macro_def)] macro_rules! invalidate_query { - ($library:expr, $key:literal) => {{ - let library: &crate::library::Library = &$library; // Assert the library is the correct type + ($ctx:expr, $key:literal) => {{ + let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type #[cfg(debug_assertions)] { @@ -104,20 +126,21 @@ macro_rules! invalidate_query { .queries .push(crate::api::utils::InvalidationRequest { key: $key, - input_ty: None, + arg_ty: None, + result_ty: None, macro_src: concat!(file!(), ":", line!()), }) } } // The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit. - library.emit(crate::api::CoreEvent::InvalidateOperation( - crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null) + ctx.emit(crate::api::CoreEvent::InvalidateOperation( + crate::api::utils::InvalidateOperationEvent::dangerously_create($key, serde_json::Value::Null, None) )) }}; - ($library:expr, $key:literal: $input_ty:ty, $input:expr $(,)?) => {{ - let _: $input_ty = $input; // Assert the type the user provided is correct - let library: &crate::library::Library = &$library; // Assert the library is the correct type + ($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr $(,)?) => {{ + let _: $arg_ty = $arg; // Assert the type the user provided is correct + let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type #[cfg(debug_assertions)] { @@ -129,7 +152,46 @@ macro_rules! invalidate_query { .queries .push(crate::api::utils::InvalidationRequest { key: $key, - input_ty: Some(<$input_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts { + arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts { + parent_inline: false, + type_map: &mut rspc::internal::specta::TypeDefs::new(), + }, &[])), + result_ty: None, + macro_src: concat!(file!(), ":", line!()), + }) + } + } + + // The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit. + let _ = serde_json::to_value($arg) + .map(|v| + ctx.emit(crate::api::CoreEvent::InvalidateOperation( + crate::api::utils::InvalidateOperationEvent::dangerously_create($key, v, None), + )) + ) + .map_err(|_| { + tracing::warn!("Failed to serialize invalidate query event!"); + }); + }}; + ($ctx:expr, $key:literal: $arg_ty:ty, $arg:expr, $result_ty:ty: $result:expr $(,)?) => {{ + let _: $arg_ty = $arg; // Assert the type the user provided is correct + let ctx: &crate::library::Library = &$ctx; // Assert the context is the correct type + + #[cfg(debug_assertions)] + { + #[ctor::ctor] + fn invalidate() { + crate::api::utils::INVALIDATION_REQUESTS + .lock() + .unwrap() + .queries + .push(crate::api::utils::InvalidationRequest { + key: $key, + arg_ty: Some(<$arg_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts { + parent_inline: false, + type_map: &mut rspc::internal::specta::TypeDefs::new(), + }, &[])), + result_ty: Some(<$result_ty as rspc::internal::specta::Type>::reference(rspc::internal::specta::DefOpts { parent_inline: false, type_map: &mut rspc::internal::specta::TypeDefs::new(), }, &[])), @@ -139,14 +201,65 @@ macro_rules! invalidate_query { } // The error are ignored here because they aren't mission critical. If they fail the UI might be outdated for a bit. - let _ = serde_json::to_value($input) - .map(|v| - library.emit(crate::api::CoreEvent::InvalidateOperation( - crate::api::utils::InvalidateOperationEvent::dangerously_create($key, v), - )) + let _ = serde_json::to_value($arg) + .and_then(|arg| + serde_json::to_value($result) + .map(|result| + ctx.emit(crate::api::CoreEvent::InvalidateOperation( + crate::api::utils::InvalidateOperationEvent::dangerously_create($key, arg, Some(result)), + )) + ) ) .map_err(|_| { tracing::warn!("Failed to serialize invalidate query event!"); }); }}; } + +pub fn mount_invalidate() -> RouterBuilder { + let (tx, _) = broadcast::channel(100); + let manager_thread_active = AtomicBool::new(false); + + // TODO: Scope the invalidate queries to a specific library (filtered server side) + RouterBuilder::new().subscription("listen", move |t| { + t(move |ctx, _: ()| { + // This thread is used to deal with batching and deduplication. + // Their is only ever one of these management threads per Node but we spawn it like this so we can steal the event bus from the rspc context. + // Batching is important because when refetching data on the frontend rspc can fetch all invalidated queries in a single round trip. + if !manager_thread_active.swap(true, Ordering::Relaxed) { + let mut event_bus_rx = ctx.event_bus.0.subscribe(); + let tx = tx.clone(); + tokio::spawn(async move { + let mut buf = HashMap::with_capacity(100); + + tokio::select! { + event = event_bus_rx.recv() => { + if let Ok(event) = event { + if let CoreEvent::InvalidateOperation(op) = event { + // Newer data replaces older data in the buffer + buf.insert(to_key(&(op.key, &op.arg)).unwrap(), op); + } + } else { + warn!("Shutting down invalidation manager thread due to the core event bus being droppped!"); + } + }, + // Given human reaction time of ~250 milli this should be a good ballance. + _ = tokio::time::sleep(Duration::from_millis(200)) => { + match tx.send(buf.drain().map(|(_k, v)| v).collect::>()) { + Ok(_) => {}, + Err(_) => warn!("Error emitting invalidation manager events!"), + } + } + } + }); + } + + let mut rx = tx.subscribe(); + stream! { + while let Ok(msg) = rx.recv().await { + yield msg; + } + } + }) + }) +} diff --git a/core/src/job/job_manager.rs b/core/src/job/job_manager.rs index 7efed330c..fe9106bea 100644 --- a/core/src/job/job_manager.rs +++ b/core/src/job/job_manager.rs @@ -1,29 +1,20 @@ use crate::{ invalidate_query, - job::{worker::Worker, DynJob, Job, JobError}, + job::{worker::Worker, DynJob, Job, JobError, StatefulJob}, library::Library, - location::indexer::{ - indexer_job::{IndexerJob, INDEXER_JOB_NAME}, - shallow_indexer_job::{ShallowIndexerJob, SHALLOW_INDEXER_JOB_NAME}, - }, + location::indexer::{indexer_job::IndexerJob, shallow_indexer_job::ShallowIndexerJob}, object::{ file_identifier::{ - file_identifier_job::{FileIdentifierJob, FILE_IDENTIFIER_JOB_NAME}, - shallow_file_identifier_job::{ - ShallowFileIdentifierJob, SHALLOW_FILE_IDENTIFIER_JOB_NAME, - }, + file_identifier_job::FileIdentifierJob, + shallow_file_identifier_job::ShallowFileIdentifierJob, }, fs::{ - copy::{FileCopierJob, COPY_JOB_NAME}, - cut::{FileCutterJob, CUT_JOB_NAME}, - delete::{FileDeleterJob, DELETE_JOB_NAME}, - erase::{FileEraserJob, ERASE_JOB_NAME}, + copy::FileCopierJob, cut::FileCutterJob, delete::FileDeleterJob, erase::FileEraserJob, }, preview::{ - shallow_thumbnailer_job::{ShallowThumbnailerJob, SHALLOW_THUMBNAILER_JOB_NAME}, - thumbnailer_job::{ThumbnailerJob, THUMBNAILER_JOB_NAME}, + shallow_thumbnailer_job::ShallowThumbnailerJob, thumbnailer_job::ThumbnailerJob, }, - validation::validator_job::{ObjectValidatorJob, VALIDATOR_JOB_NAME}, + validation::validator_job::ObjectValidatorJob, }, prisma::{job, node}, }; @@ -36,10 +27,13 @@ use std::{ time::Duration, }; +use chrono::{DateTime, Utc}; +use futures::future::BoxFuture; use int_enum::IntEnum; use prisma_client_rust::Direction; use rspc::Type; use serde::{Deserialize, Serialize}; +use thiserror::Error; use tokio::{ sync::{broadcast, mpsc, Mutex, RwLock}, time::sleep, @@ -54,6 +48,40 @@ pub enum JobManagerEvent { IngestJob(Library, Box), } +#[derive(Error, Debug)] +pub enum JobManagerError { + #[error("Tried to dispatch a job that is already running: Job ")] + AlreadyRunningJob { name: &'static str, hash: u64 }, + + #[error("Failed to fetch job data from database: {0}")] + Database(#[from] prisma_client_rust::QueryError), + + #[error("Job error: {0}")] + Job(#[from] JobError), +} + +impl From for rspc::Error { + fn from(value: JobManagerError) -> Self { + match value { + JobManagerError::AlreadyRunningJob { .. } => Self::with_cause( + rspc::ErrorCode::BadRequest, + "Tried to spawn a job that is already running!".to_string(), + value, + ), + JobManagerError::Database(_) => Self::with_cause( + rspc::ErrorCode::InternalServerError, + "Error accessing the database".to_string(), + value, + ), + JobManagerError::Job(_) => Self::with_cause( + rspc::ErrorCode::InternalServerError, + "Job error".to_string(), + value, + ), + } + } +} + /// JobManager handles queueing and executing jobs using the `DynJob` /// Handling persisting JobReports to the database, pause/resuming, and /// @@ -94,40 +122,29 @@ impl JobManager { this } - pub async fn ingest(self: Arc, library: &Library, job: Box) { + pub async fn ingest( + self: Arc, + library: &Library, + job: Box, + ) -> Result<(), JobManagerError> { let job_hash = job.hash(); + + if self.current_jobs_hashes.read().await.contains(&job_hash) { + return Err(JobManagerError::AlreadyRunningJob { + name: job.name(), + hash: job_hash, + }); + } + debug!( "Ingesting job: ", job.name(), job_hash ); - if !self.current_jobs_hashes.read().await.contains(&job_hash) { - self.current_jobs_hashes.write().await.insert(job_hash); - self.dispatch_job(library, job).await; - } else { - debug!( - "Job already in queue: ", - job.name(), - job_hash - ); - } - } - - pub async fn ingest_queue(&self, job: Box) { - let job_hash = job.hash(); - debug!("Queueing job: ", job.name(), job_hash); - - if !self.current_jobs_hashes.read().await.contains(&job_hash) { - self.current_jobs_hashes.write().await.insert(job_hash); - self.job_queue.write().await.push_back(job); - } else { - debug!( - "Job already in queue: ", - job.name(), - job_hash - ); - } + self.current_jobs_hashes.write().await.insert(job_hash); + self.dispatch_job(library, job).await; + Ok(()) } pub async fn complete(self: Arc, library: &Library, job_id: Uuid, job_hash: u64) { @@ -156,9 +173,7 @@ impl JobManager { ret } - pub async fn get_history( - library: &Library, - ) -> Result, prisma_client_rust::QueryError> { + pub async fn get_history(library: &Library) -> Result, JobManagerError> { Ok(library .db .job() @@ -172,7 +187,7 @@ impl JobManager { .collect()) } - pub async fn clear_all_jobs(library: &Library) -> Result<(), prisma_client_rust::QueryError> { + pub async fn clear_all_jobs(library: &Library) -> Result<(), JobManagerError> { library.db.job().delete_many(vec![]).exec().await?; invalidate_query!(library, "jobs.getHistory"); @@ -184,14 +199,11 @@ impl JobManager { } pub async fn pause(&self) { - let running_workers_read_guard = self.running_workers.read().await; - if !running_workers_read_guard.is_empty() { + if !self.running_workers.read().await.is_empty() { self.shutdown_tx .send(()) .expect("Failed to send shutdown signal"); } - // Dropping our handle so jobs can finish - drop(running_workers_read_guard); loop { sleep(Duration::from_millis(50)).await; @@ -201,93 +213,107 @@ impl JobManager { } } - pub async fn resume_jobs(self: Arc, library: &Library) -> Result<(), JobError> { - let paused_jobs = library + pub async fn resume_jobs(self: Arc, library: &Library) -> Result<(), JobManagerError> { + for root_paused_job_report in library .db .job() - .find_many(vec![job::status::equals(JobStatus::Paused.int_value())]) + .find_many(vec![ + job::status::equals(JobStatus::Paused.int_value()), + job::parent_id::equals(None), // only fetch top-level jobs, they will resume their children + ]) .exec() - .await?; - - for paused_job_data in paused_jobs { - let paused_job = JobReport::from(paused_job_data); - - info!("Resuming job: {}, id: {}", paused_job.name, paused_job.id); - match paused_job.name.as_str() { - THUMBNAILER_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, ThumbnailerJob {})?) - .await; - } - SHALLOW_THUMBNAILER_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, ShallowThumbnailerJob {})?) - .await; - } - INDEXER_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, IndexerJob {})?) - .await; - } - SHALLOW_INDEXER_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, ShallowIndexerJob {})?) - .await; - } - FILE_IDENTIFIER_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, FileIdentifierJob {})?) - .await; - } - SHALLOW_FILE_IDENTIFIER_JOB_NAME => { - Arc::clone(&self) - .dispatch_job( - library, - Job::resume(paused_job, ShallowFileIdentifierJob {})?, - ) - .await; - } - VALIDATOR_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, ObjectValidatorJob {})?) - .await; - } - CUT_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, FileCutterJob {})?) - .await; - } - COPY_JOB_NAME => { - Arc::clone(&self) - .dispatch_job( - library, - Job::resume(paused_job, FileCopierJob { done_tx: None })?, - ) - .await; - } - DELETE_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, FileDeleterJob {})?) - .await; - } - ERASE_JOB_NAME => { - Arc::clone(&self) - .dispatch_job(library, Job::resume(paused_job, FileEraserJob {})?) - .await; - } - _ => { - error!( - "Unknown job type: {}, id: {}", - paused_job.name, paused_job.id - ); - return Err(JobError::UnknownJobName(paused_job.id, paused_job.name)); - } - }; + .await? + .into_iter() + .map(JobReport::from) + { + Arc::clone(&self) + .dispatch_job( + library, + Self::recursive_resume_job(root_paused_job_report, library).await?, + ) + .await; } Ok(()) } + fn recursive_resume_job( + parent: JobReport, + library: &Library, + ) -> BoxFuture, JobManagerError>> { + // Recursive async functions must return boxed futures + Box::pin(async move { + info!( + "Trying to resume Job ", + parent.name, parent.id + ); + + let maybe_children_job = if let Some(children_job_report) = library + .db + .job() + .find_first(vec![job::parent_id::equals(Some( + parent.id.as_bytes().to_vec(), + ))]) + .exec() + .await? + .map(JobReport::from) + { + Some(Self::recursive_resume_job(children_job_report, library).await?) + } else { + None + }; + + Self::get_resumable_job(parent, maybe_children_job) + }) + } + + fn get_resumable_job( + job_report: JobReport, + next_job: Option>, + ) -> Result, JobManagerError> { + match job_report.name.as_str() { + ::NAME => { + Job::resume(job_report, ThumbnailerJob {}, next_job) + } + ::NAME => { + Job::resume(job_report, ShallowThumbnailerJob {}, next_job) + } + ::NAME => Job::resume(job_report, IndexerJob {}, next_job), + ::NAME => { + Job::resume(job_report, ShallowIndexerJob {}, next_job) + } + ::NAME => { + Job::resume(job_report, FileIdentifierJob {}, next_job) + } + ::NAME => { + Job::resume(job_report, ShallowFileIdentifierJob {}, next_job) + } + ::NAME => { + Job::resume(job_report, ObjectValidatorJob {}, next_job) + } + ::NAME => { + Job::resume(job_report, FileCutterJob {}, next_job) + } + ::NAME => { + Job::resume(job_report, FileCopierJob {}, next_job) + } + ::NAME => { + Job::resume(job_report, FileDeleterJob {}, next_job) + } + ::NAME => { + Job::resume(job_report, FileEraserJob {}, next_job) + } + _ => { + error!( + "Unknown job type: {}, id: {}", + job_report.name, job_report.id + ); + Err(JobError::UnknownJobName(job_report.id, job_report.name)) + } + } + .map_err(Into::into) + } + async fn dispatch_job(self: Arc, library: &Library, mut job: Box) { // create worker to process job let mut running_workers = self.running_workers.write().await; @@ -295,7 +321,7 @@ impl JobManager { info!("Running job: {:?}", job.name()); let job_report = job - .report() + .report_mut() .take() .expect("critical error: missing job on worker"); @@ -341,9 +367,10 @@ pub struct JobReport { pub name: String, pub data: Option>, pub metadata: Option, - // client_id: i32, - pub date_created: chrono::DateTime, - pub date_modified: chrono::DateTime, + pub created_at: Option>, + pub updated_at: Option>, + + pub parent_id: Option, pub status: JobStatus, pub task_count: i32, @@ -371,12 +398,11 @@ impl From for JobReport { JobReport { id: Uuid::from_slice(&data.id).unwrap(), name: data.name, - // client_id: data.client_id, status: JobStatus::from_int(data.status).unwrap(), task_count: data.task_count, completed_task_count: data.completed_task_count, - date_created: data.date_created.into(), - date_modified: data.date_modified.into(), + created_at: Some(data.date_created.into()), + updated_at: Some(data.date_modified.into()), data: data.data, metadata: data.metadata.and_then(|m| { serde_json::from_slice(&m).unwrap_or_else(|e| -> Option { @@ -386,6 +412,8 @@ impl From for JobReport { }), message: String::new(), seconds_elapsed: data.seconds_elapsed, + // SAFETY: We created this uuid before + parent_id: data.parent_id.map(|id| Uuid::from_slice(&id).unwrap()), } } } @@ -395,20 +423,30 @@ impl JobReport { Self { id: uuid, name, - // client_id: 0, - date_created: chrono::Utc::now(), - date_modified: chrono::Utc::now(), + created_at: None, + updated_at: None, status: JobStatus::Queued, task_count: 0, data: None, metadata: None, + parent_id: None, completed_task_count: 0, message: String::new(), seconds_elapsed: 0, } } - pub async fn create(&self, library: &Library) -> Result<(), JobError> { + pub fn new_with_parent(uuid: Uuid, name: String, parent_id: Uuid) -> Self { + let mut report = Self::new(uuid, name); + report.parent_id = Some(parent_id); + report + } + + pub async fn create(&mut self, library: &Library) -> Result<(), JobError> { + let now = Utc::now(); + self.created_at = Some(now); + self.updated_at = Some(now); + library .db .job() @@ -417,13 +455,20 @@ impl JobReport { self.name.clone(), JobStatus::Running as i32, node::id::equals(library.node_local_id), - vec![job::data::set(self.data.clone())], + vec![ + job::data::set(self.data.clone()), + job::parent_id::set(self.parent_id.map(|id| id.as_bytes().to_vec())), + job::date_created::set(now.into()), + job::date_modified::set(now.into()), + ], ) .exec() .await?; Ok(()) } - pub async fn update(&self, library: &Library) -> Result<(), JobError> { + pub async fn update(&mut self, library: &Library) -> Result<(), JobError> { + let now = Utc::now(); + self.updated_at = Some(now); library .db .job() @@ -435,7 +480,7 @@ impl JobReport { job::metadata::set(serde_json::to_vec(&self.metadata).ok()), job::task_count::set(self.task_count), job::completed_task_count::set(self.completed_task_count), - job::date_modified::set(chrono::Utc::now().into()), + job::date_modified::set(now.into()), job::seconds_elapsed::set(self.seconds_elapsed), ], ) diff --git a/core/src/job/mod.rs b/core/src/job/mod.rs index 2884ba550..5868b3268 100644 --- a/core/src/job/mod.rs +++ b/core/src/job/mod.rs @@ -1,5 +1,6 @@ use crate::{ - location::{indexer::IndexerError, LocationError, LocationManagerError}, + library::Library, + location::indexer::IndexerError, object::{file_identifier::FileIdentifierJobError, preview::ThumbnailerError}, }; @@ -7,13 +8,14 @@ use std::{ collections::{hash_map::DefaultHasher, VecDeque}, fmt::Debug, hash::{Hash, Hasher}, + sync::Arc, }; use rmp_serde::{decode::Error as DecodeError, encode::Error as EncodeError}; use sd_crypto::Error as CryptoError; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use thiserror::Error; -use tracing::info; +use tracing::{debug, error, info}; use uuid::Uuid; mod job_manager; @@ -45,8 +47,6 @@ pub enum JobError { MissingJobDataState(Uuid, String), #[error("missing some job data: '{value}'")] MissingData { value: String }, - #[error("Location manager error: {0}")] - LocationManager(#[from] LocationManagerError), #[error("error converting/handling OS strings")] OsStr, #[error("error converting/handling paths")] @@ -55,8 +55,6 @@ pub enum JobError { // Specific job errors #[error("Indexer error: {0}")] IndexerError(#[from] IndexerError), - #[error("Location error: {0}")] - LocationError(#[from] LocationError), #[error("Thumbnailer error: {0}")] ThumbnailError(#[from] ThumbnailerError), #[error("Identifier error: {0}")] @@ -76,56 +74,139 @@ pub enum JobError { pub type JobResult = Result; pub type JobMetadata = Option; +/// `JobInitData` is a trait to represent the data being passed to initialize a `Job` +pub trait JobInitData: Serialize + DeserializeOwned + Send + Sync + Hash { + type Job: StatefulJob; + + fn hash(&self) -> u64 { + let mut s = DefaultHasher::new(); + ::NAME.hash(&mut s); + ::hash(self, &mut s); + s.finish() + } +} + #[async_trait::async_trait] pub trait StatefulJob: Send + Sync + Sized { - type Init: Serialize + DeserializeOwned + Send + Sync + Hash; + type Init: JobInitData; type Data: Serialize + DeserializeOwned + Send + Sync; type Step: Serialize + DeserializeOwned + Send + Sync; - fn name(&self) -> &'static str; + /// The name of the job is a unique human readable identifier for the job. + const NAME: &'static str; + + /// Construct a new instance of the job. This is used so the user can pass `Self::Init` into the `spawn_job` function and we can still run the job. + /// This does remove the flexibility of being able to pass arguments into the job's struct but with resumable jobs I view that as an anti-pattern anyway. + fn new() -> Self; + + /// initialize the steps for the job async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError>; + /// is called for each step in the job. These steps are created in the `Self::init` method. async fn execute_step( &self, ctx: WorkerContext, state: &mut JobState, ) -> Result<(), JobError>; + /// is called after all steps have been executed async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult; } #[async_trait::async_trait] pub trait DynJob: Send + Sync { - fn report(&mut self) -> &mut Option; + fn id(&self) -> Uuid; + fn parent_id(&self) -> Option; + fn report(&self) -> &Option; + fn report_mut(&mut self) -> &mut Option; fn name(&self) -> &'static str; - async fn run(&mut self, ctx: WorkerContext) -> JobResult; + async fn run(&mut self, job_manager: Arc, ctx: WorkerContext) -> JobResult; fn hash(&self) -> u64; + fn queue_next(&mut self, next_job: Box); + fn serialize_state(&self) -> Result, JobError>; + async fn register_children(&mut self, library: &Library) -> Result<(), JobError>; + async fn pause_children(&mut self, library: &Library) -> Result<(), JobError>; + async fn cancel_children(&mut self, library: &Library) -> Result<(), JobError>; } pub struct Job { report: Option, state: JobState, stateful_job: SJob, + next_job: Option>, } -impl Job { - pub fn new(init: SJob::Init, stateful_job: SJob) -> Box { +pub trait IntoJob { + fn into_job(self) -> Box; +} + +impl IntoJob for Init +where + SJob: StatefulJob + 'static, + Init: JobInitData, +{ + fn into_job(self) -> Box { + Job::new(self) + } +} + +impl IntoJob for Box> +where + SJob: StatefulJob + 'static, + Init: JobInitData, +{ + fn into_job(self) -> Box { + self + } +} + +impl Job +where + SJob: StatefulJob + 'static, + Init: JobInitData, +{ + pub fn new(init: Init) -> Box { Box::new(Self { - report: Some(JobReport::new( - Uuid::new_v4(), - stateful_job.name().to_string(), - )), + report: Some(JobReport::new(Uuid::new_v4(), SJob::NAME.to_string())), state: JobState { init, data: None, steps: VecDeque::new(), step_number: 0, }, - stateful_job, + stateful_job: SJob::new(), + next_job: None, }) } - pub fn resume(mut report: JobReport, stateful_job: SJob) -> Result, JobError> { + pub fn queue_next(mut self: Box, init: NextInit) -> Box + where + NextSJob: StatefulJob + 'static, + NextInit: JobInitData, + { + let last_job = Job::new_dependent( + init, + self.next_job + .as_ref() + .map(|job| job.id()) + // SAFETY: If we're queueing a next job then we should have a report yet + .unwrap_or(self.report.as_ref().unwrap().id), + ); + + if let Some(ref mut next) = self.next_job { + next.queue_next(last_job); + } else { + self.next_job = Some(last_job); + } + + self + } + + pub fn resume( + mut report: JobReport, + stateful_job: SJob, + next_job: Option>, + ) -> Result, JobError> { let job_state_data = if let Some(data) = report.data.take() { data } else { @@ -136,14 +217,26 @@ impl Job { report: Some(report), state: rmp_serde::from_slice(&job_state_data)?, stateful_job, + next_job, })) } -} -impl Hash for Job { - fn hash(&self, state: &mut H) { - self.name().hash(state); - self.state.hash(state); + fn new_dependent(init: Init, parent_id: Uuid) -> Box { + Box::new(Self { + report: Some(JobReport::new_with_parent( + Uuid::new_v4(), + SJob::NAME.to_string(), + parent_id, + )), + state: JobState { + init, + data: None, + steps: VecDeque::new(), + step_number: 0, + }, + stateful_job: SJob::new(), + next_job: None, + }) } } @@ -155,23 +248,30 @@ pub struct JobState { pub step_number: usize, } -impl Hash for JobState { - fn hash(&self, state: &mut H) { - self.init.hash(state); - } -} - #[async_trait::async_trait] -impl DynJob for Job { - fn report(&mut self) -> &mut Option { +impl DynJob for Job { + fn id(&self) -> Uuid { + // SAFETY: This method is using during queueing, so we still have a report + self.report().as_ref().unwrap().id + } + + fn parent_id(&self) -> Option { + self.report.as_ref().and_then(|r| r.parent_id) + } + + fn report(&self) -> &Option { + &self.report + } + + fn report_mut(&mut self) -> &mut Option { &mut self.report } fn name(&self) -> &'static str { - self.stateful_job.name() + ::NAME } - async fn run(&mut self, ctx: WorkerContext) -> JobResult { + async fn run(&mut self, job_manager: Arc, ctx: WorkerContext) -> JobResult { let mut job_should_run = true; // Checking if we have a brand new job, or if we are resuming an old one. @@ -215,14 +315,83 @@ impl DynJob for Job { self.state.step_number += 1; } - self.stateful_job + let metadata = self + .stateful_job .finalize(ctx.clone(), &mut self.state) - .await + .await?; + + if let Some(next_job) = self.next_job.take() { + debug!( + "Job '{}' requested to spawn '{}' now that it's complete!", + self.name(), + next_job.name() + ); + + if let Err(e) = job_manager.clone().ingest(&ctx.library, next_job).await { + error!("Failed to ingest next job: {e}"); + } + } + + Ok(metadata) } fn hash(&self) -> u64 { - let mut hasher = DefaultHasher::new(); - Hash::hash(self, &mut hasher); - hasher.finish() + ::hash(&self.state.init) + } + + fn queue_next(&mut self, next_job: Box) { + if let Some(ref mut next) = self.next_job { + next.queue_next(next_job); + } else { + self.next_job = Some(next_job); + } + } + + fn serialize_state(&self) -> Result, JobError> { + rmp_serde::to_vec_named(&self.state).map_err(Into::into) + } + + async fn register_children(&mut self, library: &Library) -> Result<(), JobError> { + if let Some(ref mut next_job) = self.next_job { + // SAFETY: As these children jobs haven't been run yet, they still have their report field + let next_job_report = next_job.report_mut().as_mut().unwrap(); + if next_job_report.created_at.is_none() { + next_job_report.create(library).await? + } + + next_job.register_children(library).await?; + } + + Ok(()) + } + + async fn pause_children(&mut self, library: &Library) -> Result<(), JobError> { + if let Some(ref mut next_job) = self.next_job { + let state = next_job.serialize_state()?; + + // SAFETY: As these children jobs haven't been run yet, they still have their report field + let mut report = next_job.report_mut().as_mut().unwrap(); + report.status = JobStatus::Paused; + report.data = Some(state); + report.update(library).await?; + next_job.pause_children(library).await?; + } + + Ok(()) + } + + async fn cancel_children(&mut self, library: &Library) -> Result<(), JobError> { + if let Some(ref mut next_job) = self.next_job { + let state = next_job.serialize_state()?; + + // SAFETY: As these children jobs haven't been run yet, they still have their report field + let mut report = next_job.report_mut().as_mut().unwrap(); + report.status = JobStatus::Canceled; + report.data = Some(state); + report.update(library).await?; + next_job.cancel_children(library).await?; + } + + Ok(()) } } diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index 1e4849111..49450f40b 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -102,18 +102,21 @@ impl Worker { let job_hash = job.hash(); let job_id = worker.report.id; - let old_status = worker.report.status; worker.report.status = JobStatus::Running; - if matches!(old_status, JobStatus::Queued) { + // If the report doesn't have a created_at date, it's a new report + if worker.report.created_at.is_none() { worker.report.create(&library).await?; } else { + // Otherwise it can be a job being resumed or a children job that was already been created worker.report.update(&library).await?; } drop(worker); - invalidate_query!(library, "jobs.isRunning"); + job.register_children(&library).await?; + + invalidate_query!(library, "jobs.getRunning"); // spawn task to handle receiving events from the worker tokio::spawn(Worker::track_progress( @@ -153,7 +156,7 @@ impl Worker { let (done_tx, done_rx) = oneshot::channel(); - match job.run(worker_ctx.clone()).await { + match job.run(job_manager.clone(), worker_ctx.clone()).await { Ok(metadata) => { // handle completion worker_ctx @@ -162,13 +165,22 @@ impl Worker { .expect("critical error: failed to send worker complete event"); } Err(JobError::Paused(state)) => { + info!("Job paused, we will pause all children jobs"); + if let Err(e) = job.pause_children(&library).await { + error!("Failed to pause children jobs: {e:#?}"); + } + worker_ctx .events_tx .send(WorkerEvent::Paused(state, done_tx)) .expect("critical error: failed to send worker pause event"); } Err(e) => { - error!("job '{}' failed with error: {:#?}", job_id, e); + error!("Job failed with error: {e:#?}; We will cancel all children jobs"); + if let Err(e) = job.cancel_children(&library).await { + error!("Failed to cancel children jobs: {e:#?}"); + } + worker_ctx .events_tx .send(WorkerEvent::Failed(done_tx)) @@ -236,7 +248,6 @@ impl Worker { error!("failed to update job report: {:#?}", e); } - invalidate_query!(library, "jobs.isRunning"); invalidate_query!(library, "jobs.getRunning"); invalidate_query!(library, "jobs.getHistory"); diff --git a/core/src/lib.rs b/core/src/lib.rs index 225f669a5..42b1d4d42 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -144,43 +144,8 @@ impl Node { ) .await?; - // Adding already existing locations for location management - for library in library_manager.get_all_libraries().await { - for location in library - .db - .location() - .find_many(vec![]) - .exec() - .await - .unwrap_or_else(|e| { - error!( - "Failed to get locations from database for location manager: {:#?}", - e - ); - vec![] - }) { - if let Err(e) = location_manager.add(location.id, library.clone()).await { - error!("Failed to add location to location manager: {:#?}", e); - } - } - } - debug!("Watching locations"); - // Trying to resume possible paused jobs - tokio::spawn({ - let library_manager = library_manager.clone(); - let jobs = jobs.clone(); - - async move { - for library in library_manager.get_all_libraries().await { - if let Err(e) = jobs.clone().resume_jobs(&library).await { - error!("Failed to resume jobs for library. {:#?}", e); - } - } - } - }); - tokio::spawn({ let library_manager = library_manager.clone(); diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 749fa959e..2ee22b408 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -1,6 +1,6 @@ use crate::{ api::CoreEvent, - job::DynJob, + job::{IntoJob, JobInitData, JobManagerError, StatefulJob}, location::{file_path_helper::LastFilePathIdManager, LocationManager}, node::NodeConfigManager, object::preview::THUMBNAIL_CACHE_DIR_NAME, @@ -56,12 +56,19 @@ impl Debug for Library { } impl Library { - pub(crate) async fn spawn_job(&self, job: Box) { - self.node_context.jobs.clone().ingest(self, job).await; - } - - pub(crate) async fn queue_job(&self, job: Box) { - self.node_context.jobs.ingest_queue(job).await; + pub(crate) async fn spawn_job( + &self, + jobable: impl IntoJob, + ) -> Result<(), JobManagerError> + where + SJob: StatefulJob + 'static, + Init: JobInitData + 'static, + { + self.node_context + .jobs + .clone() + .ingest(self, jobable.into_job()) + .await } pub(crate) fn emit(&self, event: CoreEvent) { diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index eb3a8259d..c8ddafa27 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -23,7 +23,7 @@ use std::{ }; use thiserror::Error; use tokio::sync::RwLock; -use tracing::debug; +use tracing::{debug, error}; use uuid::Uuid; use super::{Library, LibraryConfig, LibraryConfigWrapped}; @@ -227,9 +227,9 @@ impl LibraryManager { .collect() } - pub(crate) async fn get_all_libraries(&self) -> Vec { - self.libraries.read().await.clone() - } + // pub(crate) async fn get_all_libraries(&self) -> Vec { + // self.libraries.read().await.clone() + // } pub(crate) async fn edit( &self, @@ -260,6 +260,31 @@ impl LibraryManager { invalidate_query!(library, "library.list"); + for library in self.libraries.read().await.iter() { + for location in library + .db + .location() + .find_many(vec![]) + .exec() + .await + .unwrap_or_else(|e| { + error!( + "Failed to get locations from database for location manager: {:#?}", + e + ); + vec![] + }) { + if let Err(e) = self + .node_context + .location_manager + .add(location.id, library.clone()) + .await + { + error!("Failed to add location to location manager: {:#?}", e); + } + } + } + Ok(()) } @@ -351,7 +376,7 @@ impl LibraryManager { } }); - Ok(Library { + let library = Library { id, local_id: node_data.id, config, @@ -361,6 +386,18 @@ impl LibraryManager { last_file_path_id_manager: Arc::new(LastFilePathIdManager::new()), node_local_id: node_data.id, node_context, - }) + }; + + if let Err(e) = library + .node_context + .jobs + .clone() + .resume_jobs(&library) + .await + { + error!("Failed to resume jobs for library. {:#?}", e); + } + + Ok(library) } } diff --git a/core/src/location/file_path_helper.rs b/core/src/location/file_path_helper.rs index 71f5d2ec2..3d2ac4ff3 100644 --- a/core/src/location/file_path_helper.rs +++ b/core/src/location/file_path_helper.rs @@ -1,4 +1,3 @@ -use crate::location::Library; use crate::prisma::{file_path, location, PrismaClient}; use std::{ @@ -13,7 +12,6 @@ use dashmap::{mapref::entry::Entry, DashMap}; use futures::future::try_join_all; use prisma_client_rust::{Direction, QueryError}; use serde::{Deserialize, Serialize}; -use serde_json::json; use thiserror::Error; use tokio::{fs, io}; use tracing::error; @@ -318,7 +316,7 @@ impl LastFilePathIdManager { #[cfg(feature = "location-watcher")] pub async fn create_file_path( &self, - Library { db, sync, .. }: &Library, + crate::location::Library { db, sync, .. }: &crate::location::Library, MaterializedPath { materialized_path, is_dir, @@ -333,6 +331,7 @@ impl LastFilePathIdManager { // Keeping a reference in that map for the entire duration of the function, so we keep it locked use crate::sync; + use serde_json::json; let mut last_id_ref = match self.last_id_by_location.entry(location_id) { Entry::Occupied(ocupied) => ocupied.into_ref(), @@ -513,6 +512,7 @@ pub fn filter_existing_file_path_params( /// With this function we try to do a loose filtering of file paths, to avoid having to do check /// twice for directories and for files. This is because directories have a trailing `/` or `\` in /// the materialized path +#[allow(unused)] pub fn loose_find_existing_file_path_params( MaterializedPath { materialized_path, diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 01ea2c9e4..6f13dac59 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -1,5 +1,5 @@ use crate::{ - job::{JobError, JobResult, JobState, StatefulJob, WorkerContext}, + job::{JobError, JobInitData, JobResult, JobState, StatefulJob, WorkerContext}, library::Library, location::file_path_helper::{ ensure_sub_path_is_directory, ensure_sub_path_is_in_location, @@ -27,21 +27,26 @@ use super::{ /// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database. const BATCH_SIZE: usize = 1000; -pub const INDEXER_JOB_NAME: &str = "indexer"; /// A `IndexerJob` is a stateful job that walks a directory and indexes all files. /// First it walks the directory and generates a list of files to index, chunked into /// batches of [`BATCH_SIZE`]. Then for each chunk it write the file metadata to the database. pub struct IndexerJob; +impl JobInitData for IndexerJobInit { + type Job = IndexerJob; +} + #[async_trait::async_trait] impl StatefulJob for IndexerJob { type Init = IndexerJobInit; type Data = IndexerJobData; type Step = IndexerJobStep; - fn name(&self) -> &'static str { - INDEXER_JOB_NAME + const NAME: &'static str = "indexer"; + + fn new() -> Self { + Self {} } /// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`. @@ -272,7 +277,6 @@ impl StatefulJob for IndexerJob { }) } - /// Logs some metadata about the indexer job async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { finalize_indexer(&state.init.location.path, state, ctx) } diff --git a/core/src/location/indexer/shallow_indexer_job.rs b/core/src/location/indexer/shallow_indexer_job.rs index c9a5eefaf..418b08f6d 100644 --- a/core/src/location/indexer/shallow_indexer_job.rs +++ b/core/src/location/indexer/shallow_indexer_job.rs @@ -1,5 +1,5 @@ use crate::{ - job::{JobError, JobResult, JobState, StatefulJob, WorkerContext}, + job::{JobError, JobInitData, JobResult, JobState, StatefulJob, WorkerContext}, library::Library, location::file_path_helper::{ ensure_sub_path_is_directory, ensure_sub_path_is_in_location, @@ -31,7 +31,6 @@ use super::{ /// BATCH_SIZE is the number of files to index at each step, writing the chunk of files metadata in the database. const BATCH_SIZE: usize = 1000; -pub const SHALLOW_INDEXER_JOB_NAME: &str = "shallow_indexer"; /// `ShallowIndexerJobInit` receives a `location::Data` object to be indexed /// and possibly a `sub_path` to be indexed. The `sub_path` is used when @@ -54,14 +53,20 @@ impl Hash for ShallowIndexerJobInit { /// batches of [`BATCH_SIZE`]. Then for each chunk it write the file metadata to the database. pub struct ShallowIndexerJob; +impl JobInitData for ShallowIndexerJobInit { + type Job = ShallowIndexerJob; +} + #[async_trait::async_trait] impl StatefulJob for ShallowIndexerJob { type Init = ShallowIndexerJobInit; type Data = IndexerJobData; type Step = IndexerJobStep; - fn name(&self) -> &'static str { - SHALLOW_INDEXER_JOB_NAME + const NAME: &'static str = "shallow_indexer"; + + fn new() -> Self { + Self {} } /// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`. diff --git a/core/src/location/manager/mod.rs b/core/src/location/manager/mod.rs index 75e00fd64..ae1e9c306 100644 --- a/core/src/location/manager/mod.rs +++ b/core/src/location/manager/mod.rs @@ -1,4 +1,4 @@ -use crate::library::Library; +use crate::{job::JobManagerError, library::Library}; use std::{ collections::BTreeSet, @@ -102,6 +102,8 @@ pub enum LocationManagerError { FilePathError(#[from] FilePathError), #[error("Corrupted location pub_id on database: (error: {0})")] CorruptedLocationPubId(#[from] uuid::Error), + #[error("Job Manager error: (error: {0})")] + JobManager(#[from] JobManagerError), } type OnlineLocations = BTreeSet>; diff --git a/core/src/location/manager/watcher/utils.rs b/core/src/location/manager/watcher/utils.rs index 5f347635b..910508287 100644 --- a/core/src/location/manager/watcher/utils.rs +++ b/core/src/location/manager/watcher/utils.rs @@ -126,7 +126,7 @@ pub(super) async fn create_dir( info!("Created path: {}", created_path.materialized_path); // scan the new directory - scan_location_sub_path(library, location, &created_path.materialized_path).await; + scan_location_sub_path(library, location, &created_path.materialized_path).await?; invalidate_query!(library, "locations.getExplorerData"); diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index e85ce48b0..286ee3560 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -1,15 +1,14 @@ use crate::{ invalidate_query, - job::Job, + job::{Job, JobManagerError}, library::Library, object::{ file_identifier::{ - file_identifier_job::{FileIdentifierJob, FileIdentifierJobInit}, - shallow_file_identifier_job::{ShallowFileIdentifierJob, ShallowFileIdentifierJobInit}, + file_identifier_job::FileIdentifierJobInit, + shallow_file_identifier_job::ShallowFileIdentifierJobInit, }, preview::{ - shallow_thumbnailer_job::{ShallowThumbnailerJob, ShallowThumbnailerJobInit}, - thumbnailer_job::{ThumbnailerJob, ThumbnailerJobInit}, + shallow_thumbnailer_job::ShallowThumbnailerJobInit, thumbnailer_job::ThumbnailerJobInit, }, }, prisma::{file_path, indexer_rules_in_location, location, node, object}, @@ -39,11 +38,7 @@ mod metadata; pub use error::LocationError; use file_path_helper::file_path_just_object_id; -use indexer::{ - indexer_job::IndexerJob, - shallow_indexer_job::{ShallowIndexerJob, ShallowIndexerJobInit}, - IndexerJobInit, -}; +use indexer::{shallow_indexer_job::ShallowIndexerJobInit, IndexerJobInit}; pub use manager::{LocationManager, LocationManagerError}; use metadata::SpacedriveLocationMetadataFile; @@ -323,43 +318,30 @@ async fn link_location_and_indexer_rules( pub async fn scan_location( library: &Library, location: location_with_indexer_rules::Data, -) -> Result<(), LocationError> { +) -> Result<(), JobManagerError> { if location.node_id != library.node_local_id { return Ok(()); } - library - .queue_job(Job::new( - FileIdentifierJobInit { - location: location::Data::from(&location), - sub_path: None, - }, - FileIdentifierJob {}, - )) - .await; + let location_base_data = location::Data::from(&location); library - .queue_job(Job::new( - ThumbnailerJobInit { - location: location::Data::from(&location), - sub_path: None, - background: true, - }, - ThumbnailerJob {}, - )) - .await; - - library - .spawn_job(Job::new( - IndexerJobInit { + .spawn_job( + Job::new(IndexerJobInit { location, sub_path: None, - }, - IndexerJob {}, - )) - .await; - - Ok(()) + }) + .queue_next(FileIdentifierJobInit { + location: location_base_data.clone(), + sub_path: None, + }) + .queue_next(ThumbnailerJobInit { + location: location_base_data, + sub_path: None, + background: true, + }), + ) + .await } #[cfg(feature = "location-watcher")] @@ -367,82 +349,61 @@ pub async fn scan_location_sub_path( library: &Library, location: location_with_indexer_rules::Data, sub_path: impl AsRef, -) { +) -> Result<(), JobManagerError> { let sub_path = sub_path.as_ref().to_path_buf(); if location.node_id != library.node_local_id { - return; + return Ok(()); } - library - .queue_job(Job::new( - FileIdentifierJobInit { - location: location::Data::from(&location), - sub_path: Some(sub_path.clone()), - }, - FileIdentifierJob {}, - )) - .await; + let location_base_data = location::Data::from(&location); library - .queue_job(Job::new( - ThumbnailerJobInit { - location: location::Data::from(&location), - sub_path: Some(sub_path.clone()), - background: true, - }, - ThumbnailerJob {}, - )) - .await; - - library - .spawn_job(Job::new( - IndexerJobInit { + .spawn_job( + Job::new(IndexerJobInit { location, + sub_path: Some(sub_path.clone()), + }) + .queue_next(FileIdentifierJobInit { + location: location_base_data.clone(), + sub_path: Some(sub_path.clone()), + }) + .queue_next(ThumbnailerJobInit { + location: location_base_data, sub_path: Some(sub_path), - }, - IndexerJob {}, - )) - .await; + background: true, + }), + ) + .await } pub async fn light_scan_location( library: &Library, location: location_with_indexer_rules::Data, sub_path: impl AsRef, -) -> Result<(), LocationError> { +) -> Result<(), JobManagerError> { let sub_path = sub_path.as_ref().to_path_buf(); if location.node_id != library.node_local_id { return Ok(()); } + let location_base_data = location::Data::from(&location); + library - .queue_job(Job::new( - ShallowFileIdentifierJobInit { - location: location::Data::from(&location), + .spawn_job( + Job::new(ShallowIndexerJobInit { + location, sub_path: sub_path.clone(), - }, - ShallowFileIdentifierJob {}, - )) - .await; - - library - .queue_job(Job::new( - ShallowThumbnailerJobInit { - location: location::Data::from(&location), + }) + .queue_next(ShallowFileIdentifierJobInit { + location: location_base_data.clone(), sub_path: sub_path.clone(), - }, - ShallowThumbnailerJob {}, - )) - .await; - - library - .spawn_job(Job::new( - ShallowIndexerJobInit { location, sub_path }, - ShallowIndexerJob {}, - )) - .await; - - Ok(()) + }) + .queue_next(ShallowThumbnailerJobInit { + location: location_base_data, + sub_path, + }), + ) + .await } pub async fn relink_location( diff --git a/core/src/object/file_identifier/file_identifier_job.rs b/core/src/object/file_identifier/file_identifier_job.rs index 643f2e2de..60951f84f 100644 --- a/core/src/object/file_identifier/file_identifier_job.rs +++ b/core/src/object/file_identifier/file_identifier_job.rs @@ -1,5 +1,7 @@ use crate::{ - job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, library::Library, location::file_path_helper::{ ensure_sub_path_is_directory, ensure_sub_path_is_in_location, @@ -22,8 +24,6 @@ use super::{ FileIdentifierReport, FilePathIdAndLocationIdCursor, CHUNK_SIZE, }; -pub const FILE_IDENTIFIER_JOB_NAME: &str = "file_identifier"; - pub struct FileIdentifierJob {} /// `FileIdentifierJobInit` takes file_paths without a file_id from an entire location @@ -53,14 +53,20 @@ pub struct FileIdentifierJobState { maybe_sub_materialized_path: Option>, } +impl JobInitData for FileIdentifierJobInit { + type Job = FileIdentifierJob; +} + #[async_trait::async_trait] impl StatefulJob for FileIdentifierJob { type Init = FileIdentifierJobInit; type Data = FileIdentifierJobState; type Step = (); - fn name(&self) -> &'static str { - FILE_IDENTIFIER_JOB_NAME + const NAME: &'static str = "file_identifier"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { @@ -108,7 +114,7 @@ impl StatefulJob for FileIdentifierJob { if orphan_count == 0 { return Err(JobError::EarlyFinish { - name: self.name().to_string(), + name: ::NAME.to_string(), reason: "Found no orphan file paths to process".to_string(), }); } @@ -166,7 +172,7 @@ impl StatefulJob for FileIdentifierJob { get_orphan_file_paths(&ctx.library.db, cursor, maybe_sub_materialized_path).await?; process_identifier_file_paths( - self.name(), + ::NAME, location, &file_paths, state.step_number, diff --git a/core/src/object/file_identifier/shallow_file_identifier_job.rs b/core/src/object/file_identifier/shallow_file_identifier_job.rs index 607a41ad3..696401488 100644 --- a/core/src/object/file_identifier/shallow_file_identifier_job.rs +++ b/core/src/object/file_identifier/shallow_file_identifier_job.rs @@ -1,5 +1,7 @@ use crate::{ - job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, library::Library, location::file_path_helper::{ ensure_sub_path_is_directory, ensure_sub_path_is_in_location, @@ -22,8 +24,6 @@ use super::{ FileIdentifierReport, FilePathIdAndLocationIdCursor, CHUNK_SIZE, }; -pub const SHALLOW_FILE_IDENTIFIER_JOB_NAME: &str = "shallow_file_identifier"; - pub struct ShallowFileIdentifierJob {} /// `ShallowFileIdentifierJobInit` takes file_paths without a file_id from a specific path @@ -50,14 +50,20 @@ pub struct ShallowFileIdentifierJobState { sub_path_id: i32, } +impl JobInitData for ShallowFileIdentifierJobInit { + type Job = ShallowFileIdentifierJob; +} + #[async_trait::async_trait] impl StatefulJob for ShallowFileIdentifierJob { type Init = ShallowFileIdentifierJobInit; type Data = ShallowFileIdentifierJobState; type Step = (); - fn name(&self) -> &'static str { - SHALLOW_FILE_IDENTIFIER_JOB_NAME + const NAME: &'static str = "shallow_file_identifier"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { @@ -113,7 +119,7 @@ impl StatefulJob for ShallowFileIdentifierJob { if orphan_count == 0 { return Err(JobError::EarlyFinish { - name: self.name().to_string(), + name: ::NAME.to_string(), reason: "Found no orphan file paths to process".to_string(), }); } @@ -167,7 +173,7 @@ impl StatefulJob for ShallowFileIdentifierJob { let file_paths = get_orphan_file_paths(&ctx.library.db, cursor, *sub_path_id).await?; process_identifier_file_paths( - self.name(), + ::NAME, location, &file_paths, state.step_number, diff --git a/core/src/object/fs/copy.rs b/core/src/object/fs/copy.rs index f0b9380c2..ae800ecf7 100644 --- a/core/src/object/fs/copy.rs +++ b/core/src/object/fs/copy.rs @@ -1,17 +1,19 @@ -use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}; +use crate::{ + invalidate_query, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, +}; use std::{hash::Hash, path::PathBuf}; use serde::{Deserialize, Serialize}; use specta::Type; -use tokio::sync::oneshot; -use tracing::{error, trace}; +use tracing::trace; use super::{context_menu_fs_info, get_path_from_location_id, osstr_to_string, FsInfo}; -pub struct FileCopierJob { - pub done_tx: Option>, -} +pub struct FileCopierJob {} #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FileCopierJobState { @@ -48,7 +50,9 @@ impl From for FileCopierJobStep { } } -pub const COPY_JOB_NAME: &str = "file_copier"; +impl JobInitData for FileCopierJobInit { + type Job = FileCopierJob; +} #[async_trait::async_trait] impl StatefulJob for FileCopierJob { @@ -56,8 +60,10 @@ impl StatefulJob for FileCopierJob { type Data = FileCopierJobState; type Step = FileCopierJobStep; - fn name(&self) -> &'static str { - COPY_JOB_NAME + const NAME: &'static str = "file_copier"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { @@ -179,12 +185,8 @@ impl StatefulJob for FileCopierJob { Ok(()) } - async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState) -> JobResult { - if let Some(done_tx) = self.done_tx.take() { - if done_tx.send(()).is_err() { - error!("Failed to send done signal on FileCopierJob"); - } - } + async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { + invalidate_query!(ctx.library, "locations.getExplorerData"); Ok(Some(serde_json::to_value(&state.init)?)) } diff --git a/core/src/object/fs/cut.rs b/core/src/object/fs/cut.rs index 20e58307e..a4db3e32e 100644 --- a/core/src/object/fs/cut.rs +++ b/core/src/object/fs/cut.rs @@ -1,4 +1,9 @@ -use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}; +use crate::{ + invalidate_query, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, +}; use std::{hash::Hash, path::PathBuf}; @@ -27,7 +32,9 @@ pub struct FileCutterJobStep { pub target_directory: PathBuf, } -pub const CUT_JOB_NAME: &str = "file_cutter"; +impl JobInitData for FileCutterJobInit { + type Job = FileCutterJob; +} #[async_trait::async_trait] impl StatefulJob for FileCutterJob { @@ -35,8 +42,10 @@ impl StatefulJob for FileCutterJob { type Data = FileCutterJobState; type Step = FileCutterJobStep; - fn name(&self) -> &'static str { - CUT_JOB_NAME + const NAME: &'static str = "file_cutter"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { @@ -85,7 +94,9 @@ impl StatefulJob for FileCutterJob { Ok(()) } - async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { + invalidate_query!(ctx.library, "locations.getExplorerData"); + Ok(Some(serde_json::to_value(&state.init)?)) } } diff --git a/core/src/object/fs/decrypt.rs b/core/src/object/fs/decrypt.rs index 7fab131ef..327fc6bc2 100644 --- a/core/src/object/fs/decrypt.rs +++ b/core/src/object/fs/decrypt.rs @@ -4,7 +4,12 @@ use specta::Type; use std::{collections::VecDeque, path::PathBuf}; use tokio::fs::File; -use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}; +use crate::{ + invalidate_query, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, +}; use super::{context_menu_fs_info, FsInfo, BYTES_EXT}; pub struct FileDecryptorJob; @@ -27,16 +32,20 @@ pub struct FileDecryptorJobStep { pub fs_info: FsInfo, } -const JOB_NAME: &str = "file_decryptor"; +impl JobInitData for FileDecryptorJobInit { + type Job = FileDecryptorJob; +} #[async_trait::async_trait] impl StatefulJob for FileDecryptorJob { - type Data = FileDecryptorJobState; type Init = FileDecryptorJobInit; + type Data = FileDecryptorJobState; type Step = FileDecryptorJobStep; - fn name(&self) -> &'static str { - JOB_NAME + const NAME: &'static str = "file_decryptor"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { @@ -145,7 +154,9 @@ impl StatefulJob for FileDecryptorJob { Ok(()) } - async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { + invalidate_query!(ctx.library, "locations.getExplorerData"); + // mark job as successful Ok(Some(serde_json::to_value(&state.init)?)) } diff --git a/core/src/object/fs/delete.rs b/core/src/object/fs/delete.rs index dc1daf0b5..22f696ba3 100644 --- a/core/src/object/fs/delete.rs +++ b/core/src/object/fs/delete.rs @@ -1,4 +1,9 @@ -use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}; +use crate::{ + invalidate_query, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, +}; use std::hash::Hash; @@ -18,7 +23,9 @@ pub struct FileDeleterJobInit { pub path_id: i32, } -pub const DELETE_JOB_NAME: &str = "file_deleter"; +impl JobInitData for FileDeleterJobInit { + type Job = FileDeleterJob; +} #[async_trait::async_trait] impl StatefulJob for FileDeleterJob { @@ -26,8 +33,10 @@ impl StatefulJob for FileDeleterJob { type Data = FileDeleterJobState; type Step = FsInfo; - fn name(&self) -> &'static str { - DELETE_JOB_NAME + const NAME: &'static str = "file_deleter"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { @@ -64,7 +73,9 @@ impl StatefulJob for FileDeleterJob { Ok(()) } - async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { + invalidate_query!(ctx.library, "locations.getExplorerData"); + Ok(Some(serde_json::to_value(&state.init)?)) } } diff --git a/core/src/object/fs/encrypt.rs b/core/src/object/fs/encrypt.rs index 88e262cd5..5b265fd4c 100644 --- a/core/src/object/fs/encrypt.rs +++ b/core/src/object/fs/encrypt.rs @@ -1,4 +1,4 @@ -use crate::{job::*, library::Library}; +use crate::{invalidate_query, job::*, library::Library}; use std::path::PathBuf; @@ -12,7 +12,7 @@ use sd_crypto::{ use serde::{Deserialize, Serialize}; use specta::Type; use tokio::{fs::File, io::AsyncReadExt}; -use tracing::warn; +use tracing::{error, warn}; use super::{context_menu_fs_info, FsInfo, BYTES_EXT}; @@ -43,7 +43,9 @@ pub struct Metadata { pub date_created: chrono::DateTime, } -const JOB_NAME: &str = "file_encryptor"; +impl JobInitData for FileEncryptorJobInit { + type Job = FileEncryptorJob; +} #[async_trait::async_trait] impl StatefulJob for FileEncryptorJob { @@ -51,8 +53,10 @@ impl StatefulJob for FileEncryptorJob { type Data = FileEncryptorJobState; type Step = FsInfo; - fn name(&self) -> &'static str { - JOB_NAME + const NAME: &'static str = "file_encryptor"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { @@ -122,7 +126,17 @@ impl StatefulJob for FileEncryptorJob { ctx.library.clone(), &output_path, ) - .await?; + .await + .map_or_else( + |e| { + error!( + "Failed to make location manager ignore the path {}; Error: {e:#?}", + output_path.display() + ); + None + }, + Some, + ); let mut reader = File::open(&info.fs_path).await?; let mut writer = File::create(output_path).await?; @@ -223,7 +237,9 @@ impl StatefulJob for FileEncryptorJob { Ok(()) } - async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { + invalidate_query!(ctx.library, "locations.getExplorerData"); + // mark job as successful Ok(Some(serde_json::to_value(&state.init)?)) } diff --git a/core/src/object/fs/erase.rs b/core/src/object/fs/erase.rs index a5e799d67..09043727c 100644 --- a/core/src/object/fs/erase.rs +++ b/core/src/object/fs/erase.rs @@ -1,4 +1,9 @@ -use crate::job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}; +use crate::{ + invalidate_query, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, +}; use std::{hash::Hash, path::PathBuf}; @@ -42,7 +47,9 @@ impl From for FileEraserJobStep { } } -pub const ERASE_JOB_NAME: &str = "file_eraser"; +impl JobInitData for FileEraserJobInit { + type Job = FileEraserJob; +} #[async_trait::async_trait] impl StatefulJob for FileEraserJob { @@ -50,8 +57,10 @@ impl StatefulJob for FileEraserJob { type Data = FsInfo; type Step = FileEraserJobStep; - fn name(&self) -> &'static str { - ERASE_JOB_NAME + const NAME: &'static str = "file_eraser"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { @@ -118,7 +127,7 @@ impl StatefulJob for FileEraserJob { Ok(()) } - async fn finalize(&mut self, _ctx: WorkerContext, state: &mut JobState) -> JobResult { + async fn finalize(&mut self, ctx: WorkerContext, state: &mut JobState) -> JobResult { if let Some(ref info) = state.data { if info.path_data.is_dir { tokio::fs::remove_dir_all(&info.fs_path).await?; @@ -127,6 +136,8 @@ impl StatefulJob for FileEraserJob { warn!("missing job state, unable to fully finalise erase job"); } + invalidate_query!(ctx.library, "locations.getExplorerData"); + Ok(Some(serde_json::to_value(&state.init)?)) } } diff --git a/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs b/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs index 97b4f99e4..abea12151 100644 --- a/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs +++ b/core/src/object/preview/thumbnail/shallow_thumbnailer_job.rs @@ -1,5 +1,7 @@ use crate::{ - job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, library::Library, location::{ file_path_helper::{ @@ -32,8 +34,6 @@ use super::{ #[cfg(feature = "ffmpeg")] use super::FILTERED_VIDEO_EXTENSIONS; -pub const SHALLOW_THUMBNAILER_JOB_NAME: &str = "shallow_thumbnailer"; - pub struct ShallowThumbnailerJob {} #[derive(Serialize, Deserialize, Clone)] @@ -49,14 +49,20 @@ impl Hash for ShallowThumbnailerJobInit { } } +impl JobInitData for ShallowThumbnailerJobInit { + type Job = ShallowThumbnailerJob; +} + #[async_trait::async_trait] impl StatefulJob for ShallowThumbnailerJob { type Init = ShallowThumbnailerJobInit; type Data = ThumbnailerJobState; type Step = ThumbnailerJobStep; - fn name(&self) -> &'static str { - SHALLOW_THUMBNAILER_JOB_NAME + const NAME: &'static str = "shallow_thumbnailer"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { diff --git a/core/src/object/preview/thumbnail/thumbnailer_job.rs b/core/src/object/preview/thumbnail/thumbnailer_job.rs index 15e3389c4..87648ebb8 100644 --- a/core/src/object/preview/thumbnail/thumbnailer_job.rs +++ b/core/src/object/preview/thumbnail/thumbnailer_job.rs @@ -1,5 +1,7 @@ use crate::{ - job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, library::Library, location::file_path_helper::{ ensure_sub_path_is_directory, ensure_sub_path_is_in_location, @@ -25,8 +27,6 @@ use super::{ #[cfg(feature = "ffmpeg")] use super::FILTERED_VIDEO_EXTENSIONS; -pub const THUMBNAILER_JOB_NAME: &str = "thumbnailer"; - pub struct ThumbnailerJob {} #[derive(Serialize, Deserialize, Clone)] @@ -45,14 +45,20 @@ impl Hash for ThumbnailerJobInit { } } +impl JobInitData for ThumbnailerJobInit { + type Job = ThumbnailerJob; +} + #[async_trait::async_trait] impl StatefulJob for ThumbnailerJob { type Init = ThumbnailerJobInit; type Data = ThumbnailerJobState; type Step = ThumbnailerJobStep; - fn name(&self) -> &'static str { - THUMBNAILER_JOB_NAME + const NAME: &'static str = "thumbnailer"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { diff --git a/core/src/object/validation/validator_job.rs b/core/src/object/validation/validator_job.rs index a9f13863b..2a0f3c526 100644 --- a/core/src/object/validation/validator_job.rs +++ b/core/src/object/validation/validator_job.rs @@ -1,5 +1,7 @@ use crate::{ - job::{JobError, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext}, + job::{ + JobError, JobInitData, JobReportUpdate, JobResult, JobState, StatefulJob, WorkerContext, + }, library::Library, location::file_path_helper::{file_path_for_object_validator, MaterializedPath}, prisma::{file_path, location}, @@ -14,8 +16,6 @@ use tracing::info; use super::hash::file_checksum; -pub const VALIDATOR_JOB_NAME: &str = "object_validator"; - // The Validator is able to: // - generate a full byte checksum for Objects in a Location // - generate checksums for all Objects missing without one @@ -36,14 +36,20 @@ pub struct ObjectValidatorJobInit { pub background: bool, } +impl JobInitData for ObjectValidatorJobInit { + type Job = ObjectValidatorJob; +} + #[async_trait::async_trait] impl StatefulJob for ObjectValidatorJob { type Init = ObjectValidatorJobInit; type Data = ObjectValidatorJobState; type Step = file_path_for_object_validator::Data; - fn name(&self) -> &'static str { - VALIDATOR_JOB_NAME + const NAME: &'static str = "object_validator"; + + fn new() -> Self { + Self {} } async fn init(&self, ctx: WorkerContext, state: &mut JobState) -> Result<(), JobError> { diff --git a/core/src/sync/manager.rs b/core/src/sync/manager.rs index 31c346819..fe745e23a 100644 --- a/core/src/sync/manager.rs +++ b/core/src/sync/manager.rs @@ -44,53 +44,53 @@ impl SyncManager { pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>( &self, tx: &PrismaClient, - (ops, queries): (Vec, I), + (_ops, queries): (Vec, I), ) -> prisma_client_rust::Result<::ReturnValue> { - let owned = ops - .iter() - .filter_map(|op| match &op.typ { - CRDTOperationType::Owned(owned_op) => Some(tx.owned_operation().create( - op.id.as_bytes().to_vec(), - op.timestamp.0 as i64, - to_vec(&owned_op.items).unwrap(), - owned_op.model.clone(), - node::pub_id::equals(op.node.as_bytes().to_vec()), - vec![], - )), - _ => None, - }) - .collect::>(); - - let shared = ops - .iter() - .filter_map(|op| match &op.typ { - CRDTOperationType::Shared(shared_op) => { - let kind = match &shared_op.data { - SharedOperationData::Create(_) => "c", - SharedOperationData::Update { .. } => "u", - SharedOperationData::Delete => "d", - }; - - Some(tx.shared_operation().create( - op.id.as_bytes().to_vec(), - op.timestamp.0 as i64, - shared_op.model.to_string(), - to_vec(&shared_op.record_id).unwrap(), - kind.to_string(), - to_vec(&shared_op.data).unwrap(), - node::pub_id::equals(op.node.as_bytes().to_vec()), - vec![], - )) - } - _ => None, - }) - .collect::>(); - #[cfg(feature = "sync-messages")] let res = { + let owned = _ops + .iter() + .filter_map(|op| match &op.typ { + CRDTOperationType::Owned(owned_op) => Some(tx.owned_operation().create( + op.id.as_bytes().to_vec(), + op.timestamp.0 as i64, + to_vec(&owned_op.items).unwrap(), + owned_op.model.clone(), + node::pub_id::equals(op.node.as_bytes().to_vec()), + vec![], + )), + _ => None, + }) + .collect::>(); + + let shared = _ops + .iter() + .filter_map(|op| match &op.typ { + CRDTOperationType::Shared(shared_op) => { + let kind = match &shared_op.data { + SharedOperationData::Create(_) => "c", + SharedOperationData::Update { .. } => "u", + SharedOperationData::Delete => "d", + }; + + Some(tx.shared_operation().create( + op.id.as_bytes().to_vec(), + op.timestamp.0 as i64, + shared_op.model.to_string(), + to_vec(&shared_op.record_id).unwrap(), + kind.to_string(), + to_vec(&shared_op.data).unwrap(), + node::pub_id::equals(op.node.as_bytes().to_vec()), + vec![], + )) + } + _ => None, + }) + .collect::>(); + let (res, _) = tx._batch((queries, (owned, shared))).await?; - for op in ops { + for op in _ops { self.tx.send(SyncMessage::Created(op)).ok(); } diff --git a/interface/app/$libraryId/Layout/Sidebar/IsRunningJob.tsx b/interface/app/$libraryId/Layout/Sidebar/IsRunningJob.tsx index 03e657803..9177ba882 100644 --- a/interface/app/$libraryId/Layout/Sidebar/IsRunningJob.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/IsRunningJob.tsx @@ -3,7 +3,8 @@ import { Loader } from '@sd/ui'; import { useLibraryQuery } from '~/../packages/client/src'; export default () => { - const { data: isRunningJob } = useLibraryQuery(['jobs.isRunning']); + const { data: runningJobs } = useLibraryQuery(['jobs.getRunning']); + const isRunningJob = runningJobs?.length !== undefined && runningJobs?.length > 0; return isRunningJob ? ( diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index 7a3127419..269d0440a 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -7,7 +7,6 @@ export type Procedures = { { key: "files.get", input: LibraryArgs, result: { id: number, pub_id: number[], kind: number, key_id: number | null, hidden: boolean, favorite: boolean, important: boolean, has_thumbnail: boolean, has_thumbstrip: boolean, has_video_preview: boolean, ipfs_id: string | null, note: string | null, date_created: string, file_paths: FilePath[], media_data: MediaData | null } | null } | { key: "jobs.getHistory", input: LibraryArgs, result: JobReport[] } | { key: "jobs.getRunning", input: LibraryArgs, result: JobReport[] } | - { key: "jobs.isRunning", input: LibraryArgs, result: boolean } | { key: "keys.getDefault", input: LibraryArgs, result: string | null } | { key: "keys.getKey", input: LibraryArgs, result: string } | { key: "keys.getSecretKey", input: LibraryArgs, result: string | null } | @@ -78,7 +77,7 @@ export type Procedures = { { key: "tags.delete", input: LibraryArgs, result: null } | { key: "tags.update", input: LibraryArgs, result: null }, subscriptions: - { key: "invalidateQuery", input: never, result: InvalidateOperationEvent } | + { key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } | { key: "jobs.newThumbnail", input: LibraryArgs, result: string } | { key: "locations.online", input: never, result: number[][] } | { key: "p2p.events", input: never, result: P2PEvent } | @@ -162,9 +161,9 @@ export type IndexerRule = { id: number, kind: number, name: string, parameters: */ export type IndexerRuleCreateArgs = { kind: RuleKind, name: string, parameters: number[] } -export type InvalidateOperationEvent = { key: string, arg: any } +export type InvalidateOperationEvent = { key: string, arg: any, result: any | null } -export type JobReport = { id: string, name: string, data: number[] | null, metadata: any | null, date_created: string, date_modified: string, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number } +export type JobReport = { id: string, name: string, data: number[] | null, metadata: any | null, created_at: string | null, updated_at: string | null, parent_id: string | null, status: JobStatus, task_count: number, completed_task_count: number, message: string, seconds_elapsed: number } export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Failed" | "Paused" diff --git a/packages/client/src/rspc.ts b/packages/client/src/rspc.ts index 8a336aabf..5488355da 100644 --- a/packages/client/src/rspc.ts +++ b/packages/client/src/rspc.ts @@ -82,13 +82,20 @@ export const useLibraryMutation = libraryHooks.useMutation; export function useInvalidateQuery() { const context = rspc.useContext(); - rspc.useSubscription(['invalidateQuery'], { - onData: (invalidateOperation) => { - const key = [invalidateOperation.key]; - if (invalidateOperation.arg !== null) { - key.concat(invalidateOperation.arg); + rspc.useSubscription(['invalidation.listen'], { + onData: (ops) => { + for (const op of ops) { + const key = [op.key]; + if (op.arg !== null) { + key.concat(op.arg); + } + + if (op.result !== null) { + context.queryClient.setQueryData(key, op.result); + } else { + context.queryClient.invalidateQueries(key); + } } - context.queryClient.invalidateQueries(key); } }); }