diff --git a/Cargo.lock b/Cargo.lock index 3989e138c..2df75e10d 100644 Binary files a/Cargo.lock and b/Cargo.lock differ diff --git a/core/Cargo.toml b/core/Cargo.toml index 95e5f7546..6e6640a65 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,10 +24,10 @@ ring = "0.17.0-alpha.10" int-enum = "0.4.0" # Project dependencies -ts-rs = { version = "6.1", features = ["chrono-impl", "uuid-impl"] } +ts-rs = { version = "6.2", features = ["chrono-impl", "uuid-impl", "serde-compat"] } prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.5.0" } walkdir = "^2.3.2" -uuid = "0.8" +uuid = { version = "^0.8.2", features = ["v4", "serde"]} sysinfo = "0.23.9" thiserror = "1.0.30" core-derive = { path = "./derive" } diff --git a/core/src/encode/metadata.rs b/core/src/encode/metadata.rs index cde41fffe..9c86229c2 100644 --- a/core/src/encode/metadata.rs +++ b/core/src/encode/metadata.rs @@ -1,5 +1,4 @@ -extern crate ffmpeg_next as ffmpeg; -use ffmpeg::format; +use ffmpeg_next::format; #[derive(Default, Debug)] pub struct MediaItem { @@ -125,10 +124,10 @@ pub struct AudioStream { // } // media_item.steams.push(stream_item); // } -// println!("{:#?}", media_item); +// info!("{:#?}", media_item); // } -// Err(error) => println!("error: {}", error), +// Err(error) => error!("error: {}", error), // } // Ok(()) // } diff --git a/core/src/encode/thumb.rs b/core/src/encode/thumb.rs index f4665551e..065ca20d5 100644 --- a/core/src/encode/thumb.rs +++ b/core/src/encode/thumb.rs @@ -1,21 +1,21 @@ -use crate::job::{JobReportUpdate, JobResult}; -use crate::library::LibraryContext; use crate::{ - job::{Job, WorkerContext}, + job::{Job, JobReportUpdate, JobResult, WorkerContext}, + library::LibraryContext, prisma::file_path, + sys, CoreEvent, }; -use crate::{sys, CoreEvent}; -use futures::executor::block_on; -use image::*; -use log::{error, info}; -use std::fs; +use image::{self, imageops, DynamicImage, GenericImageView}; +use log::{debug, error, info}; +use std::error::Error; +use std::ops::Deref; use std::path::{Path, PathBuf}; -use webp::*; +use tokio::fs; +use webp::Encoder; #[derive(Debug, Clone)] pub struct ThumbnailJob { pub location_id: i32, - pub path: String, + pub path: PathBuf, pub background: bool, } @@ -30,116 +30,113 @@ impl Job for ThumbnailJob { } async fn run(&self, ctx: WorkerContext) -> JobResult { let library_ctx = ctx.library_ctx(); - let thumbnail_dir = Path::new(&library_ctx.config().data_directory()) + let thumbnail_dir = library_ctx + .config() + .data_directory() .join(THUMBNAIL_CACHE_DIR_NAME) - .join(format!("{}", self.location_id)); + .join(self.location_id.to_string()); let location = sys::get_location(&library_ctx, self.location_id).await?; info!( - "Searching for images in location {} at path {}", - location.id, self.path - ); - - info!( - "Searching for images in location {} at path {}", + "Searching for images in location {} at path {:#?}", location.id, self.path ); // create all necessary directories if they don't exist - fs::create_dir_all(&thumbnail_dir)?; + fs::create_dir_all(&thumbnail_dir).await?; let root_path = location.path.unwrap(); // query database for all files in this location that need thumbnails let image_files = get_images(&library_ctx, self.location_id, &self.path).await?; info!("Found {:?} files", image_files.len()); - let is_background = self.background.clone(); - tokio::task::spawn_blocking(move || { - ctx.progress(vec![ - JobReportUpdate::TaskCount(image_files.len()), - JobReportUpdate::Message(format!( - "Preparing to process {} files", - image_files.len() - )), - ]); + ctx.progress(vec![ + JobReportUpdate::TaskCount(image_files.len()), + JobReportUpdate::Message(format!("Preparing to process {} files", image_files.len())), + ]); - for (i, image_file) in image_files.iter().enumerate() { - ctx.progress(vec![JobReportUpdate::Message(format!( - "Processing {}", - image_file.materialized_path.clone() - ))]); + for (i, image_file) in image_files.iter().enumerate() { + ctx.progress(vec![JobReportUpdate::Message(format!( + "Processing {}", + image_file.materialized_path + ))]); - // assemble the file path - let path = Path::new(&root_path).join(&image_file.materialized_path); - error!("image_file {:?}", image_file); + // assemble the file path + let path = Path::new(&root_path).join(&image_file.materialized_path); + debug!("image_file {:?}", image_file); - // get cas_id, if none found skip - let cas_id = match image_file.file() { - Ok(file) => { - if let Some(f) = file { - f.cas_id.clone() - } else { - continue; - } - } - Err(_) => { - error!("Error getting cas_id {:?}", image_file.materialized_path); + // get cas_id, if none found skip + let cas_id = match image_file.file() { + Ok(file) => { + if let Some(f) = file { + f.cas_id.clone() + } else { + info!( + "skipping thumbnail generation for {}", + image_file.materialized_path + ); continue; } - }; - - // Define and write the WebP-encoded file to a given path - let output_path = Path::new(&thumbnail_dir) - .join(&cas_id) - .with_extension("webp"); - - // check if file exists at output path - if !output_path.exists() { - info!("Writing {:?} to {:?}", path, output_path); - generate_thumbnail(&path, &output_path) - .map_err(|e| { - info!("Error generating thumb {:?}", e); - }) - .unwrap_or(()); - - ctx.progress(vec![JobReportUpdate::CompletedTaskCount(i + 1)]); - - if !is_background { - block_on(ctx.library_ctx().emit(CoreEvent::NewThumbnail { cas_id })); - }; - } else { - info!("Thumb exists, skipping... {}", output_path.display()); } + Err(_) => { + error!("Error getting cas_id {:?}", image_file.materialized_path); + continue; + } + }; + + // Define and write the WebP-encoded file to a given path + let output_path = thumbnail_dir.join(&cas_id).with_extension("webp"); + + // check if file exists at output path + if !output_path.exists() { + info!("Writing {:?} to {:?}", path, output_path); + tokio::spawn(async move { + if let Err(e) = generate_thumbnail(&path, &output_path).await { + error!("Error generating thumb {:?}", e); + } + }); + + ctx.progress(vec![JobReportUpdate::CompletedTaskCount(i + 1)]); + + if !self.background { + ctx.library_ctx() + .emit(CoreEvent::NewThumbnail { cas_id }) + .await; + }; + } else { + info!("Thumb exists, skipping... {}", output_path.display()); } - }) - .await?; + } Ok(()) } } -pub fn generate_thumbnail( - file_path: &PathBuf, - output_path: &PathBuf, -) -> Result<(), Box> { +pub async fn generate_thumbnail>( + file_path: P, + output_path: P, +) -> Result<(), Box> { // Using `image` crate, open the included .jpg file let img = image::open(file_path)?; let (w, h) = img.dimensions(); // Optionally, resize the existing photo and convert back into DynamicImage - let img: DynamicImage = image::DynamicImage::ImageRgba8(imageops::resize( + let img = DynamicImage::ImageRgba8(imageops::resize( &img, (w as f32 * THUMBNAIL_SIZE_FACTOR) as u32, (h as f32 * THUMBNAIL_SIZE_FACTOR) as u32, imageops::FilterType::Triangle, )); // Create the WebP encoder for the above image - let encoder: Encoder = Encoder::from_image(&img)?; + let encoder = Encoder::from_image(&img)?; // Encode the image at a specified quality 0-100 - let webp: WebPMemory = encoder.encode(THUMBNAIL_QUALITY); - std::fs::write(&output_path, &*webp)?; + // Type WebPMemory is !Send, which makes the Future in this function !Send, + // this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec + // which implies on a unwanted clone... + let webp = encoder.encode(THUMBNAIL_QUALITY).deref().to_owned(); + fs::write(output_path, &webp).await?; Ok(()) } @@ -147,7 +144,7 @@ pub fn generate_thumbnail( pub async fn get_images( ctx: &LibraryContext, location_id: i32, - path: &str, + path: impl AsRef, ) -> Result, std::io::Error> { let mut params = vec![ file_path::location_id::equals(Some(location_id)), @@ -160,8 +157,10 @@ pub async fn get_images( ]), ]; - if !path.is_empty() { - params.push(file_path::materialized_path::starts_with(path.to_string())) + let path_str = path.as_ref().to_string_lossy().to_string(); + + if !path_str.is_empty() { + params.push(file_path::materialized_path::starts_with(path_str)) } let image_files = ctx diff --git a/core/src/file/cas/checksum.rs b/core/src/file/cas/checksum.rs index 78bd09aca..08c12675c 100644 --- a/core/src/file/cas/checksum.rs +++ b/core/src/file/cas/checksum.rs @@ -1,36 +1,27 @@ use data_encoding::HEXLOWER; use ring::digest::{Context, SHA256}; -use std::convert::TryInto; -use std::fs::File; - -use std::io; use std::path::PathBuf; +use tokio::{ + fs::File, + io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom}, +}; static SAMPLE_COUNT: u64 = 4; static SAMPLE_SIZE: u64 = 10000; -fn read_at(file: &File, offset: u64, size: u64) -> Result, io::Error> { +async fn read_at(file: &mut File, offset: u64, size: u64) -> Result, io::Error> { let mut buf = vec![0u8; size as usize]; - #[cfg(target_family = "unix")] - { - use std::os::unix::prelude::*; - file.read_exact_at(&mut buf, offset)?; - } - - #[cfg(target_family = "windows")] - { - use std::os::windows::prelude::*; - file.seek_read(&mut buf, offset)?; - } + file.seek(SeekFrom::Start(offset)).await?; + file.read_exact(&mut buf).await?; Ok(buf) } -pub fn generate_cas_id(path: PathBuf, size: u64) -> Result { +pub async fn generate_cas_id(path: PathBuf, size: u64) -> Result { // open file reference - let file = File::open(path)?; + let mut file = File::open(path).await?; let mut context = Context::new(&SHA256); @@ -39,20 +30,16 @@ pub fn generate_cas_id(path: PathBuf, size: u64) -> Result { // if size is small enough, just read the whole thing if SAMPLE_COUNT * SAMPLE_SIZE > size { - let buf = read_at(&file, 0, size.try_into().unwrap())?; + let buf = read_at(&mut file, 0, size).await?; context.update(&buf); } else { // loop over samples for i in 0..SAMPLE_COUNT { - let buf = read_at( - &file, - (size / SAMPLE_COUNT) * i, - SAMPLE_SIZE.try_into().unwrap(), - )?; + let buf = read_at(&mut file, (size / SAMPLE_COUNT) * i, SAMPLE_SIZE).await?; context.update(&buf); } // sample end of file - let buf = read_at(&file, size - SAMPLE_SIZE, SAMPLE_SIZE.try_into().unwrap())?; + let buf = read_at(&mut file, size - SAMPLE_SIZE, SAMPLE_SIZE).await?; context.update(&buf); } diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index 251ae4120..a05684eda 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -8,13 +8,13 @@ use crate::{ sys::get_location, }; use chrono::{DateTime, FixedOffset}; -use futures::executor::block_on; -use log::info; +use futures::future::join_all; +use log::{error, info}; use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::path::Path; -use std::{fs, io}; +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; +use tokio::{fs, io}; // FileIdentifierJob takes file_paths without a file_id and uniquely identifies them // first: generating the cas_id and extracting metadata @@ -22,7 +22,7 @@ use std::{fs, io}; #[derive(Debug)] pub struct FileIdentifierJob { pub location_id: i32, - pub path: String, + pub path: PathBuf, } // we break this job into chunks of 100 to improve performance @@ -38,7 +38,7 @@ impl Job for FileIdentifierJob { info!("Identifying orphan file paths..."); let location = get_location(&ctx.library_ctx(), self.location_id).await?; - let location_path = location.path.unwrap_or("".to_string()); + let location_path = location.path.unwrap_or_else(|| "".to_string()); let total_count = count_orphan_file_paths(&ctx.library_ctx(), location.id.into()).await?; info!("Found {} orphan file paths", total_count); @@ -49,138 +49,155 @@ impl Job for FileIdentifierJob { // update job with total task count based on orphan file_paths count ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]); - // dedicated tokio thread for task - let _ctx = tokio::task::spawn_blocking(move || { - let db = ctx.library_ctx().db; - let mut completed: usize = 0; - let mut cursor: i32 = 1; - // loop until task count is complete - while completed < task_count { - // link file_path ids to a CreateFile struct containing unique file data - let mut chunk: HashMap = HashMap::new(); - let mut cas_lookup: HashMap = HashMap::new(); + let mut completed: usize = 0; + let mut cursor: i32 = 1; + // loop until task count is complete + while completed < task_count { + // link file_path ids to a CreateFile struct containing unique file data + let mut chunk: HashMap = HashMap::new(); + let mut cas_lookup: HashMap = HashMap::new(); - // get chunk of orphans to process - let file_paths = match block_on(get_orphan_file_paths(&ctx.library_ctx(), cursor)) { - Ok(file_paths) => file_paths, + // get chunk of orphans to process + let file_paths = match get_orphan_file_paths(&ctx.library_ctx(), cursor).await { + Ok(file_paths) => file_paths, + Err(e) => { + info!("Error getting orphan file paths: {:#?}", e); + continue; + } + }; + info!( + "Processing {:?} orphan files. ({} completed of {})", + file_paths.len(), + completed, + task_count + ); + + // analyze each file_path + for file_path in &file_paths { + // get the cas_id and extract metadata + match prepare_file(&location_path, file_path).await { + Ok(file) => { + let cas_id = file.cas_id.clone(); + // create entry into chunks for created file data + chunk.insert(file_path.id, file); + cas_lookup.insert(cas_id, file_path.id); + } Err(e) => { - info!("Error getting orphan file paths: {}", e); + info!("Error processing file: {:#?}", e); continue; } }; - info!( - "Processing {:?} orphan files. ({} completed of {})", - file_paths.len(), - completed, - task_count - ); + } - // analyze each file_path - for file_path in file_paths.iter() { - // get the cas_id and extract metadata - match prepare_file(&location_path, file_path) { - Ok(file) => { - let cas_id = file.cas_id.clone(); - // create entry into chunks for created file data - chunk.insert(file_path.id, file); - cas_lookup.insert(cas_id, file_path.id); - } - Err(e) => { - info!("Error processing file: {}", e); - continue; - } - }; + // find all existing files by cas id + let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); + let existing_files = ctx + .library_ctx() + .db + .file() + .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) + .exec() + .await?; + + info!("Found {} existing files", existing_files.len()); + + // link those existing files to their file paths + // Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker + let library_ctx = ctx.library_ctx(); + let prisma_file_path = library_ctx.db.file_path(); + for result in join_all(existing_files.iter().map(|file| { + prisma_file_path + .find_unique(file_path::id::equals( + *cas_lookup.get(&file.cas_id).unwrap(), + )) + .update(vec![file_path::file_id::set(Some(file.id))]) + .exec() + })) + .await + { + if let Err(e) = result { + error!("Error linking file: {:#?}", e); } + } - // find all existing files by cas id - let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); - let existing_files: Vec = block_on( - db.file() - .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) - .exec(), - ) - .unwrap(); - info!("Found {} existing files", existing_files.len()); + let existing_files_cas_ids = existing_files + .iter() + .map(|file| file.cas_id.clone()) + .collect::>(); - // link those existing files to their file paths - for file in existing_files.iter() { - let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); - block_on( - db.file_path() - .find_unique(file_path::id::equals(file_path_id.clone())) - .update(vec![file_path::file_id::set(Some(file.id.clone()))]) - .exec(), - ) - .unwrap(); - } + // extract files that don't already exist in the database + let new_files = chunk + .iter() + .map(|(_id, create_file)| create_file) + .filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id)) + .collect::>(); - // extract files that don't already exist in the database - let new_files: Vec<&CreateFile> = chunk - .iter() - .map(|(_, c)| c) - .filter(|c| !existing_files.iter().any(|d| d.cas_id == c.cas_id)) - .collect(); + // assemble prisma values for new unique files + let mut values = Vec::with_capacity(new_files.len() * 3); + for file in &new_files { + values.extend([ + PrismaValue::String(file.cas_id.clone()), + PrismaValue::Int(file.size_in_bytes), + PrismaValue::DateTime(file.date_created), + ]); + } - // assemble prisma values for new unique files - let mut values: Vec = Vec::new(); - for file in new_files.iter() { - values.extend([ - PrismaValue::String(file.cas_id.clone()), - PrismaValue::Int(file.size_in_bytes.clone()), - PrismaValue::DateTime(file.date_created.clone()), - ]); - } - - // create new file records with assembled values - let created_files: Vec = block_on(db._query_raw(Raw::new( + // create new file records with assembled values + let created_files: Vec = ctx + .library_ctx() + .db + ._query_raw(Raw::new( &format!( "INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {} ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", vec!["({}, {}, {})"; new_files.len()].join(",") ), values, - ))) + )) + .await .unwrap_or_else(|e| { - info!("Error inserting files: {}", e); + error!("Error inserting files: {:#?}", e); Vec::new() }); + // This code is duplicates, is this right? + for result in join_all(created_files.iter().map(|file| { // associate newly created files with their respective file_paths - for file in created_files.iter() { - // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk - // - insert many could work, but I couldn't find a good way to do this in a single SQL query - let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); - block_on( - db.file_path() - .find_unique(file_path::id::equals(file_path_id.clone())) - .update(vec![file_path::file_id::set(Some(file.id.clone()))]) - .exec(), - ) - .unwrap(); + // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk + // - insert many could work, but I couldn't find a good way to do this in a single SQL query + prisma_file_path + .find_unique(file_path::id::equals( + *cas_lookup.get(&file.cas_id).unwrap(), + )) + .update(vec![file_path::file_id::set(Some(file.id))]) + .exec() + })) + .await + { + if let Err(e) = result { + error!("Error linking file: {:#?}", e); } - - // handle loop end - let last_row = match file_paths.last() { - Some(l) => l, - None => { - break; - } - }; - cursor = last_row.id; - completed += 1; - - ctx.progress(vec![ - JobReportUpdate::CompletedTaskCount(completed), - JobReportUpdate::Message(format!( - "Processed {} of {} orphan files", - completed * CHUNK_SIZE, - total_count - )), - ]); } - ctx - }) - .await?; + + // handle loop end + let last_row = match file_paths.last() { + Some(l) => l, + None => { + break; + } + }; + cursor = last_row.id; + completed += 1; + + ctx.progress(vec![ + JobReportUpdate::CompletedTaskCount(completed), + JobReportUpdate::Message(format!( + "Processed {} of {} orphan files", + completed * CHUNK_SIZE, + total_count + )), + ]); + } // let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; Ok(()) @@ -209,12 +226,11 @@ pub async fn get_orphan_file_paths( ctx: &LibraryContext, cursor: i32, ) -> Result, FileError> { - let db = &ctx.db; info!( "discovering {} orphan file paths at cursor: {:?}", CHUNK_SIZE, cursor ); - let files = db + ctx.db .file_path() .find_many(vec![ file_path::file_id::equals(None), @@ -224,9 +240,8 @@ pub async fn get_orphan_file_paths( .cursor(file_path::id::cursor(cursor)) .take(CHUNK_SIZE as i64) .exec() - .await?; - - Ok(files) + .await + .map_err(|e| e.into()) } #[derive(Deserialize, Serialize, Debug)] @@ -242,13 +257,15 @@ pub struct FileCreated { pub cas_id: String, } -pub fn prepare_file( - location_path: &str, +pub async fn prepare_file( + location_path: impl AsRef, file_path: &file_path::Data, ) -> Result { - let path = Path::new(&location_path).join(Path::new(file_path.materialized_path.as_str())); + let path = location_path + .as_ref() + .join(file_path.materialized_path.as_str()); - let metadata = fs::metadata(&path)?; + let metadata = fs::metadata(&path).await?; // let date_created: DateTime = metadata.created().unwrap().into(); @@ -256,7 +273,7 @@ pub fn prepare_file( let cas_id = { if !file_path.is_dir { - let mut ret = generate_cas_id(path.clone(), size.clone()).unwrap(); + let mut ret = generate_cas_id(path, size).await?; ret.truncate(16); ret } else { diff --git a/core/src/file/explorer/open.rs b/core/src/file/explorer/open.rs index d4e5f484d..966a27d83 100644 --- a/core/src/file/explorer/open.rs +++ b/core/src/file/explorer/open.rs @@ -6,29 +6,32 @@ use crate::{ sys::get_location, tag::{Tag, TagError, TagOnFile, TagWithFiles}, }; +use log::info; use std::path::Path; pub async fn open_dir( ctx: &LibraryContext, - location_id: &i32, - path: &str, + location_id: i32, + path: impl AsRef, ) -> Result { // get location - let location = get_location(ctx, location_id.clone()).await?; + let location = get_location(ctx, location_id).await?; + + let path_str = path.as_ref().to_string_lossy().to_string(); let directory = ctx .db .file_path() .find_first(vec![ file_path::location_id::equals(Some(location.id)), - file_path::materialized_path::equals(path.into()), + file_path::materialized_path::equals(path_str), file_path::is_dir::equals(true), ]) .exec() .await? - .ok_or(FileError::DirectoryNotFound(path.to_string()))?; + .ok_or_else(|| FileError::DirectoryNotFound(path.as_ref().to_path_buf()))?; - println!("DIRECTORY: {:?}", directory); + info!("DIRECTORY: {:?}", directory); let mut file_paths: Vec = ctx .db @@ -46,10 +49,12 @@ pub async fn open_dir( for file_path in &mut file_paths { if let Some(file) = &mut file_path.file { - let thumb_path = Path::new(&ctx.config().data_directory()) + let thumb_path = ctx + .config() + .data_directory() .join(THUMBNAIL_CACHE_DIR_NAME) - .join(format!("{}", location.id)) - .join(file.cas_id.clone()) + .join(location.id.to_string()) + .join(&file.cas_id) .with_extension("webp"); file.has_thumbnail = thumb_path.exists(); @@ -69,7 +74,7 @@ pub async fn open_tag(ctx: &LibraryContext, tag_id: i32) -> Result = ctx diff --git a/core/src/file/indexer/mod.rs b/core/src/file/indexer/mod.rs index c5e54d036..942a1e3b8 100644 --- a/core/src/file/indexer/mod.rs +++ b/core/src/file/indexer/mod.rs @@ -1,15 +1,17 @@ use crate::job::{Job, JobReportUpdate, JobResult, WorkerContext}; +use std::path::PathBuf; use self::scan::ScanProgress; mod scan; +// Re-exporting pub use scan::*; -pub use scan::scan_path; +use scan::scan_path; #[derive(Debug)] pub struct IndexerJob { - pub path: String, + pub path: PathBuf, } #[async_trait::async_trait] @@ -18,7 +20,7 @@ impl Job for IndexerJob { "indexer" } async fn run(&self, ctx: WorkerContext) -> JobResult { - scan_path(&ctx.library_ctx(), self.path.as_str(), move |p| { + scan_path(&ctx.library_ctx(), &self.path, move |p| { ctx.progress( p.iter() .map(|p| match p.clone() { diff --git a/core/src/file/indexer/scan.rs b/core/src/file/indexer/scan.rs index 0651b7b40..87c830f6a 100644 --- a/core/src/file/indexer/scan.rs +++ b/core/src/file/indexer/scan.rs @@ -8,7 +8,13 @@ use prisma_client_rust::raw; use prisma_client_rust::raw::Raw; use serde::{Deserialize, Serialize}; use std::ffi::OsStr; -use std::{collections::HashMap, fs, path::Path, path::PathBuf, time::Instant}; +use std::fmt::Debug; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + time::Instant, +}; +use tokio::fs; use walkdir::{DirEntry, WalkDir}; #[derive(Clone)] @@ -23,12 +29,10 @@ static BATCH_SIZE: usize = 100; // creates a vector of valid path buffers from a directory pub async fn scan_path( ctx: &LibraryContext, - path: &str, + path: impl AsRef + Debug, on_progress: impl Fn(Vec) + Send + Sync + 'static, ) -> JobResult { - let path = path.to_string(); - - let location = create_location(&ctx, &path).await?; + let location = create_location(ctx, &path).await?; // query db to highers id, so we can increment it for the new files indexed #[derive(Deserialize, Serialize, Debug)] @@ -42,15 +46,16 @@ pub async fn scan_path( .await { Ok(rows) => rows[0].id.unwrap_or(0), - Err(e) => panic!("Error querying for next file id: {}", e), + Err(e) => panic!("Error querying for next file id: {:#?}", e), }; //check is path is a directory - if !PathBuf::from(&path).is_dir() { + if !path.as_ref().is_dir() { // return Err(anyhow::anyhow!("{} is not a directory", &path)); - panic!("{} is not a directory", &path); + panic!("{:#?} is not a directory", path); } - let dir_path = path.clone(); + + let path_buf = path.as_ref().to_path_buf(); // spawn a dedicated thread to scan the directory for performance let (paths, scan_start, on_progress) = tokio::task::spawn_blocking(move || { @@ -67,10 +72,9 @@ pub async fn scan_path( next_file_id }; // walk through directory recursively - for entry in WalkDir::new(&dir_path).into_iter().filter_entry(|dir| { - let approved = - !is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir); - approved + for entry in WalkDir::new(path_buf).into_iter().filter_entry(|dir| { + // check if entry is approved + !is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir) }) { // extract directory entry or log and continue if failed let entry = match entry { @@ -86,7 +90,7 @@ pub async fn scan_path( let parent_path = path .parent() - .unwrap_or(Path::new("")) + .unwrap_or_else(|| Path::new("")) .to_str() .unwrap_or(""); let parent_dir_id = dirs.get(&*parent_path); @@ -100,7 +104,7 @@ pub async fn scan_path( }; on_progress(vec![ - ScanProgress::Message(format!("{}", path_str)), + ScanProgress::Message(format!("Scanning {}", path_str)), ScanProgress::ChunkCount(paths.len() / BATCH_SIZE), ]); @@ -122,8 +126,7 @@ pub async fn scan_path( } (paths, scan_start, on_progress) }) - .await - .unwrap(); + .await?; let db_write_start = Instant::now(); let scan_read_time = scan_start.elapsed(); @@ -143,7 +146,7 @@ pub async fn scan_path( for (file_path, file_id, parent_dir_id, is_dir) in chunk { files.extend( - match prepare_values(&file_path, *file_id, &location, parent_dir_id, *is_dir) { + match prepare_values(file_path, *file_id, &location, parent_dir_id, *is_dir).await { Ok(values) => values.to_vec(), Err(e) => { error!("Error creating file model from path {:?}: {}", file_path, e); @@ -178,14 +181,14 @@ pub async fn scan_path( } // reads a file at a path and creates an ActiveModel with metadata -fn prepare_values( +async fn prepare_values( file_path: &PathBuf, id: i32, location: &LocationResource, parent_id: &Option, is_dir: bool, ) -> Result<[PrismaValue; 8], std::io::Error> { - let metadata = fs::metadata(&file_path)?; + let metadata = fs::metadata(&file_path).await?; let location_path = Path::new(location.path.as_ref().unwrap().as_str()); // let size = metadata.len(); let name; @@ -215,7 +218,6 @@ fn prepare_values( PrismaValue::String(name), PrismaValue::String(extension.to_lowercase()), parent_id - .clone() .map(|id| PrismaValue::Int(id as i64)) .unwrap_or(PrismaValue::Null), PrismaValue::DateTime(date_created.into()), @@ -237,7 +239,7 @@ fn is_hidden(entry: &DirEntry) -> bool { entry .file_name() .to_str() - .map(|s| s.starts_with(".")) + .map(|s| s.starts_with('.')) .unwrap_or(false) } @@ -266,7 +268,7 @@ fn is_app_bundle(entry: &DirEntry) -> bool { .map(|s| s.contains(".app") | s.contains(".bundle")) .unwrap_or(false); - let is_app_bundle = is_dir && contains_dot; + // let is_app_bundle = is_dir && contains_dot; // if is_app_bundle { // let path_buff = entry.path(); // let path = path_buff.to_str().unwrap(); @@ -274,5 +276,5 @@ fn is_app_bundle(entry: &DirEntry) -> bool { // self::path(&path, ); // } - is_app_bundle + is_dir && contains_dot } diff --git a/core/src/file/mod.rs b/core/src/file/mod.rs index 5bfa9423e..6def304ad 100644 --- a/core/src/file/mod.rs +++ b/core/src/file/mod.rs @@ -1,6 +1,7 @@ use chrono::{DateTime, Utc}; use int_enum::IntEnum; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; use thiserror::Error; use ts_rs::TS; @@ -79,46 +80,52 @@ pub enum FileKind { Alias = 8, } -impl Into for file::Data { - fn into(self) -> File { - File { - id: self.id, - cas_id: self.cas_id, - integrity_checksum: self.integrity_checksum, - kind: IntEnum::from_int(self.kind).unwrap(), - size_in_bytes: self.size_in_bytes.to_string(), - // encryption: EncryptionAlgorithm::from_int(self.encryption).unwrap(), - ipfs_id: self.ipfs_id, - hidden: self.hidden, - favorite: self.favorite, - important: self.important, - has_thumbnail: self.has_thumbnail, - has_thumbstrip: self.has_thumbstrip, - has_video_preview: self.has_video_preview, - note: self.note, - date_created: self.date_created.into(), - date_modified: self.date_modified.into(), - date_indexed: self.date_indexed.into(), +impl From for File { + fn from(data: file::Data) -> Self { + Self { + id: data.id, + cas_id: data.cas_id, + integrity_checksum: data.integrity_checksum, + kind: IntEnum::from_int(data.kind).unwrap(), + size_in_bytes: data.size_in_bytes.to_string(), + // encryption: EncryptionAlgorithm::from_int(data.encryption).unwrap(), + ipfs_id: data.ipfs_id, + hidden: data.hidden, + favorite: data.favorite, + important: data.important, + has_thumbnail: data.has_thumbnail, + has_thumbstrip: data.has_thumbstrip, + has_video_preview: data.has_video_preview, + note: data.note, + date_created: data.date_created.into(), + date_modified: data.date_modified.into(), + date_indexed: data.date_indexed.into(), paths: vec![], } } } -impl Into for file_path::Data { - fn into(mut self) -> FilePath { - FilePath { - id: self.id, - is_dir: self.is_dir, - materialized_path: self.materialized_path, - file_id: self.file_id, - parent_id: self.parent_id, - location_id: self.location_id.unwrap_or(0), - date_indexed: self.date_indexed.into(), - name: self.name, - extension: self.extension, - date_created: self.date_created.into(), - date_modified: self.date_modified.into(), - file: self.file.take().unwrap_or(None).map(|file| (*file).into()), +impl From> for File { + fn from(data: Box) -> Self { + Self::from(*data) + } +} + +impl From for FilePath { + fn from(data: file_path::Data) -> Self { + Self { + id: data.id, + is_dir: data.is_dir, + materialized_path: data.materialized_path, + file_id: data.file_id, + parent_id: data.parent_id, + location_id: data.location_id.unwrap_or(0), + date_indexed: data.date_indexed.into(), + name: data.name, + extension: data.extension, + date_created: data.date_created.into(), + date_modified: data.date_modified.into(), + file: data.file.unwrap_or(None).map(Into::into), } } } @@ -133,9 +140,9 @@ pub struct DirectoryWithContents { #[derive(Error, Debug)] pub enum FileError { #[error("Directory not found (path: {0:?})")] - DirectoryNotFound(String), + DirectoryNotFound(PathBuf), #[error("File not found (path: {0:?})")] - FileNotFound(String), + FileNotFound(PathBuf), #[error("Database error")] DatabaseError(#[from] prisma::QueryError), #[error("System error")] @@ -160,7 +167,7 @@ pub async fn set_note( library_id: ctx.id.to_string(), query: LibraryQuery::GetExplorerDir { limit: 0, - path: "".to_string(), + path: PathBuf::new(), location_id: 0, }, })) @@ -187,7 +194,7 @@ pub async fn favorite( library_id: ctx.id.to_string(), query: LibraryQuery::GetExplorerDir { limit: 0, - path: "".to_string(), + path: PathBuf::new(), location_id: 0, }, })) diff --git a/core/src/job/jobs.rs b/core/src/job/jobs.rs index cb4f42cab..0cca9b8ad 100644 --- a/core/src/job/jobs.rs +++ b/core/src/job/jobs.rs @@ -7,7 +7,7 @@ use crate::{ prisma::{job, node}, }; use int_enum::IntEnum; -use log::info; +use log::{error, info}; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, VecDeque}, @@ -25,8 +25,8 @@ pub type JobResult = Result<(), Box>; #[async_trait::async_trait] pub trait Job: Send + Sync + Debug { - async fn run(&self, ctx: WorkerContext) -> JobResult; fn name(&self) -> &'static str; + async fn run(&self, ctx: WorkerContext) -> JobResult; } pub enum JobManagerEvent { @@ -43,7 +43,7 @@ pub struct JobManager { impl JobManager { pub fn new() -> Arc { - let (internal_sender, mut internal_reciever) = mpsc::unbounded_channel(); + let (internal_sender, mut internal_receiver) = mpsc::unbounded_channel(); let this = Arc::new(Self { job_queue: RwLock::new(VecDeque::new()), running_workers: RwLock::new(HashMap::new()), @@ -52,7 +52,7 @@ impl JobManager { let this2 = this.clone(); tokio::spawn(async move { - while let Some(event) = internal_reciever.recv().await { + while let Some(event) = internal_receiver.recv().await { match event { JobManagerEvent::IngestJob(ctx, job) => this2.clone().ingest(&ctx, job).await, } @@ -73,7 +73,7 @@ impl JobManager { let wrapped_worker = Arc::new(Mutex::new(worker)); - Worker::spawn(self.clone(), wrapped_worker.clone(), ctx).await; + Worker::spawn(Arc::clone(&self), Arc::clone(&wrapped_worker), ctx.clone()).await; running_workers.insert(id, wrapped_worker); } else { @@ -95,7 +95,7 @@ impl JobManager { self.internal_sender .send(JobManagerEvent::IngestJob(ctx.clone(), job)) .unwrap_or_else(|_| { - println!("Failed to ingest job!"); + error!("Failed to ingest job!"); }); } } @@ -111,9 +111,8 @@ impl JobManager { } // pub async fn queue_pending_job(ctx: &LibraryContext) -> Result<(), JobError> { - // let db = &ctx.db; - - // let _next_job = db + // let _next_job = ctx + // .db // .job() // .find_first(vec![job::status::equals(JobStatus::Queued.int_value())]) // .exec() @@ -123,14 +122,14 @@ impl JobManager { // } pub async fn get_history(ctx: &LibraryContext) -> Result, JobError> { - let db = &ctx.db; - let jobs = db + let jobs = ctx + .db .job() .find_many(vec![job::status::not(JobStatus::Running.int_value())]) .exec() .await?; - Ok(jobs.into_iter().map(|j| j.into()).collect()) + Ok(jobs.into_iter().map(Into::into).collect()) } } @@ -165,20 +164,20 @@ pub struct JobReport { } // convert database struct into a resource struct -impl Into for job::Data { - fn into(self) -> JobReport { +impl From for JobReport { + fn from(data: job::Data) -> JobReport { JobReport { - id: self.id, - name: self.name, - // client_id: self.client_id, - status: JobStatus::from_int(self.status).unwrap(), - task_count: self.task_count, - completed_task_count: self.completed_task_count, - date_created: self.date_created.into(), - date_modified: self.date_modified.into(), - data: self.data, + id: data.id, + name: data.name, + // client_id: data.client_id, + status: JobStatus::from_int(data.status).unwrap(), + task_count: data.task_count, + completed_task_count: data.completed_task_count, + date_created: data.date_created.into(), + date_modified: data.date_modified.into(), + data: data.data, message: String::new(), - seconds_elapsed: self.seconds_elapsed, + seconds_elapsed: data.seconds_elapsed, } } } @@ -203,7 +202,7 @@ impl JobReport { pub async fn create(&self, ctx: &LibraryContext) -> Result<(), JobError> { let mut params = Vec::new(); - if let Some(_) = &self.data { + if self.data.is_some() { params.push(job::data::set(self.data.clone())) } diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index e0893e2ad..143335786 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -3,7 +3,7 @@ use super::{ Job, JobManager, }; use crate::{library::LibraryContext, ClientQuery, CoreEvent, LibraryQuery}; - +use log::error; use std::{sync::Arc, time::Duration}; use tokio::{ sync::{ @@ -12,6 +12,8 @@ use tokio::{ }, time::{sleep, Instant}, }; +use uuid::Uuid; + // used to update the worker state from inside the worker thread pub enum WorkerEvent { Progressed(Vec), @@ -58,7 +60,7 @@ pub struct Worker { impl Worker { pub fn new(job: Box) -> Self { let (worker_sender, worker_receiver) = unbounded_channel(); - let uuid = uuid::Uuid::new_v4().to_string(); + let uuid = Uuid::new_v4().to_string(); let name = job.name(); Self { @@ -71,7 +73,7 @@ impl Worker { pub async fn spawn( job_manager: Arc, worker: Arc>, - ctx: &LibraryContext, + ctx: LibraryContext, ) { // we capture the worker receiver channel so state can be updated from inside the worker let mut worker_mut = worker.lock().await; @@ -88,23 +90,23 @@ impl Worker { worker_mut.job_report.status = JobStatus::Running; - let ctx = ctx.clone(); - worker_mut.job_report.create(&ctx).await.unwrap_or(()); // spawn task to handle receiving events from the worker + let library_ctx = ctx.clone(); tokio::spawn(Worker::track_progress( worker.clone(), worker_receiver, - ctx.clone(), + library_ctx.clone(), )); let uuid = worker_mut.job_report.id.clone(); // spawn task to handle running the job + tokio::spawn(async move { let worker_ctx = WorkerContext { uuid, - library_ctx: ctx.clone(), + library_ctx, sender: worker_sender, }; let job_start = Instant::now(); @@ -123,14 +125,12 @@ impl Worker { } }); - match job.run(worker_ctx.clone()).await { - Ok(_) => { - worker_ctx.sender.send(WorkerEvent::Completed).unwrap_or(()); - } - Err(err) => { - println!("job '{}' failed with error: {}", worker_ctx.uuid, err); - worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(()); - } + if let Err(e) = job.run(worker_ctx.clone()).await { + error!("job '{}' failed with error: {}", worker_ctx.uuid, e); + worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(()); + } else { + // handle completion + worker_ctx.sender.send(WorkerEvent::Completed).unwrap_or(()); } job_manager.complete(&ctx, worker_ctx.uuid).await; diff --git a/core/src/lib.rs b/core/src/lib.rs index 29dfecd2e..05cda65a0 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,19 +1,22 @@ use crate::{file::cas::FileIdentifierJob, prisma::file as prisma_file, prisma::location}; use job::{JobManager, JobReport}; use library::{LibraryConfig, LibraryConfigWrapped, LibraryManager}; +use log::error; use node::{NodeConfig, NodeConfigManager}; use serde::{Deserialize, Serialize}; use std::{ - fs, path::{Path, PathBuf}, sync::Arc, }; use tag::{Tag, TagWithFiles}; use thiserror::Error; -use tokio::sync::{ - mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot, +use tokio::{ + fs, + sync::{ + mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, + }, }; use ts_rs::TS; @@ -79,7 +82,7 @@ pub struct NodeContext { impl NodeContext { pub async fn emit(&self, event: CoreEvent) { self.event_sender.send(event).await.unwrap_or_else(|e| { - println!("Failed to emit event. {:?}", e); + error!("Failed to emit event. {:#?}", e); }); } } @@ -103,11 +106,15 @@ pub struct Node { impl Node { // create new instance of node, run startup tasks - pub async fn new(data_dir: PathBuf) -> (NodeController, mpsc::Receiver, Node) { - fs::create_dir_all(&data_dir).unwrap(); + pub async fn new( + data_dir: impl AsRef, + ) -> (NodeController, mpsc::Receiver, Node) { + fs::create_dir_all(&data_dir).await.unwrap(); let (event_sender, event_recv) = mpsc::channel(100); - let config = NodeConfigManager::new(data_dir.clone()).await.unwrap(); + let config = NodeConfigManager::new(data_dir.as_ref().to_owned()) + .await + .unwrap(); let jobs = JobManager::new(); let node_ctx = NodeContext { event_sender: event_sender.clone(), @@ -117,7 +124,7 @@ impl Node { let node = Node { config, - library_manager: LibraryManager::new(Path::new(&data_dir).join("libraries"), node_ctx) + library_manager: LibraryManager::new(data_dir.as_ref().join("libraries"), node_ctx) .await .unwrap(), query_channel: unbounded_channel(), @@ -139,8 +146,8 @@ impl Node { pub fn get_context(&self) -> NodeContext { NodeContext { event_sender: self.event_sender.clone(), - config: self.config.clone(), - jobs: self.jobs.clone(), + config: Arc::clone(&self.config), + jobs: Arc::clone(&self.jobs), } } @@ -305,12 +312,12 @@ impl Node { } // return contents of a directory for the explorer LibraryQuery::GetExplorerDir { - path, location_id, + path, limit: _, - } => CoreResponse::GetExplorerDir( - file::explorer::open_dir(&ctx, &location_id, &path).await?, - ), + } => CoreResponse::GetExplorerDir(Box::new( + file::explorer::open_dir(&ctx, location_id, path).await?, + )), LibraryQuery::GetJobHistory => { CoreResponse::GetJobHistory(JobManager::get_history(&ctx).await?) } @@ -390,7 +397,7 @@ pub enum LibraryCommand { }, // Locations LocCreate { - path: String, + path: PathBuf, }, LocUpdate { id: i32, @@ -411,12 +418,12 @@ pub enum LibraryCommand { }, GenerateThumbsForLocation { id: i32, - path: String, + path: PathBuf, }, // PurgeDatabase, IdentifyUniqueFiles { id: i32, - path: String, + path: PathBuf, }, } @@ -448,7 +455,7 @@ pub enum LibraryQuery { GetRunningJobs, GetExplorerDir { location_id: i32, - path: String, + path: PathBuf, limit: i32, }, GetLibraryStatistics, @@ -488,15 +495,15 @@ pub enum CoreResponse { Error(String), GetLibraries(Vec), GetVolumes(Vec), - TagCreateResponse(tag::Tag), - GetTag(Option), - GetTags(Vec), + TagCreateResponse(Tag), + GetTag(Option), + GetTags(Vec), GetLocation(sys::LocationResource), GetLocations(Vec), - GetExplorerDir(file::DirectoryWithContents), + GetExplorerDir(Box), GetNode(NodeState), LocCreate(sys::LocationResource), - OpenTag(Vec), + OpenTag(Vec), GetRunningJobs(Vec), GetJobHistory(Vec), GetLibraryStatistics(library::Statistics), @@ -527,5 +534,5 @@ pub enum CoreResource { Location(sys::LocationResource), File(file::File), Job(JobReport), - Tag(tag::Tag), + Tag(Tag), } diff --git a/core/src/library/library_config.rs b/core/src/library/library_config.rs index 5d9baf65a..f3ab140f1 100644 --- a/core/src/library/library_config.rs +++ b/core/src/library/library_config.rs @@ -41,10 +41,7 @@ impl LibraryConfig { file_dir: PathBuf, config: &LibraryConfig, ) -> Result<(), LibraryManagerError> { - File::create(file_dir) - .map_err(LibraryManagerError::IOError)? - .write_all(serde_json::to_string(config)?.as_bytes()) - .map_err(LibraryManagerError::IOError)?; + File::create(file_dir)?.write_all(serde_json::to_string(config)?.as_bytes())?; Ok(()) } @@ -54,7 +51,7 @@ impl LibraryConfig { config_path: PathBuf, ) -> Result<(), LibraryManagerError> { match current_version { - None => Err(LibraryManagerError::MigrationError(format!( + None => Err(LibraryManagerError::Migration(format!( "Your Spacedrive library at '{}' is missing the `version` field", config_path.display() ))), diff --git a/core/src/library/library_manager.rs b/core/src/library/library_manager.rs index 45e9a2acd..89917a4cf 100644 --- a/core/src/library/library_manager.rs +++ b/core/src/library/library_manager.rs @@ -31,17 +31,17 @@ pub struct LibraryManager { #[derive(Error, Debug)] pub enum LibraryManagerError { #[error("error saving or loading the config from the filesystem")] - IOError(#[from] io::Error), + IO(#[from] io::Error), #[error("error serializing or deserializing the JSON in the config file")] - JsonError(#[from] serde_json::Error), + Json(#[from] serde_json::Error), #[error("Database error")] - DatabaseError(#[from] prisma::QueryError), + Database(#[from] prisma::QueryError), #[error("Library not found error")] - LibraryNotFoundError, + LibraryNotFound, #[error("error migrating the config file")] - MigrationError(String), + Migration(String), #[error("failed to parse uuid")] - UuidError(#[from] uuid::Error), + Uuid(#[from] uuid::Error), } impl LibraryManager { @@ -66,7 +66,7 @@ impl LibraryManager { let config_path = entry.path(); let library_id = match Path::new(&config_path) .file_stem() - .map(|v| v.to_str().map(|v| Uuid::from_str(v))) + .map(|v| v.to_str().map(Uuid::from_str)) { Some(Some(Ok(id))) => id, _ => { @@ -119,17 +119,14 @@ impl LibraryManager { pub(crate) async fn create(&self, config: LibraryConfig) -> Result<(), LibraryManagerError> { let id = Uuid::new_v4(); LibraryConfig::save( - Path::new(&self.libraries_dir).join(format!("{}.sdlibrary", id.to_string())), + Path::new(&self.libraries_dir).join(format!("{id}.sdlibrary")), &config, ) .await?; let library = Self::load( id, - &Path::new(&self.libraries_dir) - .join(format!("{}.db", id.to_string())) - .to_str() - .unwrap(), + self.libraries_dir.join(format!("{id}.db")), config, self.node_context.clone(), ) @@ -167,7 +164,7 @@ impl LibraryManager { let library = libraries .iter_mut() .find(|lib| lib.id == Uuid::from_str(&id).unwrap()) - .ok_or(LibraryManagerError::LibraryNotFoundError)?; + .ok_or(LibraryManagerError::LibraryNotFound)?; // update the library if let Some(name) = name { @@ -178,7 +175,7 @@ impl LibraryManager { } LibraryConfig::save( - Path::new(&self.libraries_dir).join(format!("{}.sdlibrary", id.to_string())), + Path::new(&self.libraries_dir).join(format!("{id}.sdlibrary")), &library.config, ) .await?; @@ -197,14 +194,10 @@ impl LibraryManager { let library = libraries .iter() .find(|l| l.id == id) - .ok_or(LibraryManagerError::LibraryNotFoundError)?; + .ok_or(LibraryManagerError::LibraryNotFound)?; - fs::remove_file( - Path::new(&self.libraries_dir).join(format!("{}.db", library.id.to_string())), - )?; - fs::remove_file( - Path::new(&self.libraries_dir).join(format!("{}.sdlibrary", library.id.to_string())), - )?; + fs::remove_file(Path::new(&self.libraries_dir).join(format!("{}.db", library.id)))?; + fs::remove_file(Path::new(&self.libraries_dir).join(format!("{}.sdlibrary", library.id)))?; libraries.retain(|l| l.id != id); @@ -221,18 +214,18 @@ impl LibraryManager { .await .iter() .find(|lib| lib.id.to_string() == library_id) - .map(|v| v.clone()) + .map(Clone::clone) } /// load the library from a given path pub(crate) async fn load( id: Uuid, - db_path: &str, + db_path: impl AsRef, config: LibraryConfig, node_context: NodeContext, ) -> Result { let db = Arc::new( - load_and_migrate(&format!("file:{}", db_path)) + load_and_migrate(&format!("file:{}", db_path.as_ref().to_string_lossy())) .await .unwrap(), ); diff --git a/core/src/library/loader.rs b/core/src/library/loader.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/core/src/library/loader.rs @@ -0,0 +1 @@ + diff --git a/core/src/library/statistics.rs b/core/src/library/statistics.rs index f1df98dd0..5c078e30b 100644 --- a/core/src/library/statistics.rs +++ b/core/src/library/statistics.rs @@ -1,12 +1,12 @@ use crate::{prisma::statistics::*, sys::Volume}; use fs_extra::dir::get_size; use serde::{Deserialize, Serialize}; -use std::fs; +use tokio::fs; use ts_rs::TS; use super::{LibraryContext, LibraryError}; -#[derive(Debug, Serialize, Deserialize, TS, Clone)] +#[derive(Debug, Serialize, Deserialize, TS, Clone, Default)] #[ts(export)] pub struct Statistics { pub total_file_count: i32, @@ -18,29 +18,15 @@ pub struct Statistics { pub library_db_size: String, } -impl Into for Data { - fn into(self) -> Statistics { - Statistics { - total_file_count: self.total_file_count, - total_bytes_used: self.total_bytes_used, - total_bytes_capacity: self.total_bytes_capacity, - total_bytes_free: self.total_bytes_free, - total_unique_bytes: self.total_unique_bytes, - preview_media_bytes: self.preview_media_bytes, - library_db_size: String::new(), - } - } -} - -impl Default for Statistics { - fn default() -> Self { +impl From for Statistics { + fn from(data: Data) -> Self { Self { - total_file_count: 0, - total_bytes_used: String::new(), - total_bytes_capacity: String::new(), - total_bytes_free: String::new(), - total_unique_bytes: String::new(), - preview_media_bytes: String::new(), + total_file_count: data.total_file_count, + total_bytes_used: data.total_bytes_used, + total_bytes_capacity: data.total_bytes_capacity, + total_bytes_free: data.total_bytes_free, + total_unique_bytes: data.total_unique_bytes, + preview_media_bytes: data.preview_media_bytes, library_db_size: String::new(), } } @@ -48,18 +34,14 @@ impl Default for Statistics { impl Statistics { pub async fn retrieve(ctx: &LibraryContext) -> Result { - let library_statistics_db = match ctx + let library_statistics_db = ctx .db .statistics() .find_unique(id::equals(ctx.node_local_id)) .exec() .await? - { - Some(library_statistics_db) => library_statistics_db.into(), - // create the default values if database has no entry - None => Statistics::default(), - }; - Ok(library_statistics_db.into()) + .map_or_else(Default::default, Into::into); + Ok(library_statistics_db) } pub async fn calculate(ctx: &LibraryContext) -> Result { @@ -72,9 +54,9 @@ impl Statistics { // TODO: get from database, not sys let volumes = Volume::get_volumes(); - Volume::save(&ctx).await?; + Volume::save(ctx).await?; - // println!("{:?}", volumes); + // info!("{:?}", volumes); let mut available_capacity: u64 = 0; let mut total_capacity: u64 = 0; @@ -85,15 +67,12 @@ impl Statistics { } } - let library_db_size = match fs::metadata(ctx.config().data_directory()) { + let library_db_size = match fs::metadata(ctx.config().data_directory()).await { Ok(metadata) => metadata.len(), Err(_) => 0, }; - let mut thumbsnails_dir = ctx.config().data_directory(); - thumbsnails_dir.push("thumbnails"); - - let thumbnail_folder_size = get_size(&thumbsnails_dir); + let thumbnail_folder_size = get_size(ctx.config().data_directory().join("thumbnails")); let statistics = Statistics { library_db_size: library_db_size.to_string(), @@ -109,7 +88,7 @@ impl Statistics { id::equals(1), vec![library_db_size::set(statistics.library_db_size.clone())], vec![ - total_file_count::set(statistics.total_file_count.clone()), + total_file_count::set(statistics.total_file_count), total_bytes_used::set(statistics.total_bytes_used.clone()), total_bytes_capacity::set(statistics.total_bytes_capacity.clone()), total_bytes_free::set(statistics.total_bytes_free.clone()), diff --git a/core/src/node/config.rs b/core/src/node/config.rs index c45833a1c..ea1a09f1a 100644 --- a/core/src/node/config.rs +++ b/core/src/node/config.rs @@ -45,11 +45,11 @@ pub struct NodeConfig { #[derive(Error, Debug)] pub enum NodeConfigError { #[error("error saving or loading the config from the filesystem")] - IOError(#[from] io::Error), + IO(#[from] io::Error), #[error("error serializing or deserializing the JSON in the config file")] - JsonError(#[from] serde_json::Error), + Json(#[from] serde_json::Error), #[error("error migrating the config file")] - MigrationError(String), + Migration(String), } impl NodeConfig { @@ -141,7 +141,7 @@ impl NodeConfigManager { ) -> Result<(), NodeConfigError> { match current_version { None => { - Err(NodeConfigError::MigrationError(format!("Your Spacedrive config file stored at '{}' is missing the `version` field. If you just upgraded please delete the file and restart Spacedrive! Please note this upgrade will stop using your old 'library.db' as the folder structure has changed.", config_path.display()))) + Err(NodeConfigError::Migration(format!("Your Spacedrive config file stored at '{}' is missing the `version` field. If you just upgraded please delete the file and restart Spacedrive! Please note this upgrade will stop using your old 'library.db' as the folder structure has changed.", config_path.display()))) } _ => Ok(()), } diff --git a/core/src/node/mod.rs b/core/src/node/mod.rs index 502a64f15..5276ab015 100644 --- a/core/src/node/mod.rs +++ b/core/src/node/mod.rs @@ -15,17 +15,24 @@ pub struct LibraryNode { pub last_seen: DateTime, } -impl Into for node::Data { - fn into(self) -> LibraryNode { - LibraryNode { - uuid: self.pub_id, - name: self.name, - platform: IntEnum::from_int(self.platform).unwrap(), - last_seen: self.last_seen.into(), +impl From for LibraryNode { + fn from(data: node::Data) -> Self { + Self { + uuid: data.pub_id, + name: data.name, + platform: IntEnum::from_int(data.platform).unwrap(), + last_seen: data.last_seen.into(), } } } +impl From> for LibraryNode { + fn from(data: Box) -> Self { + Self::from(*data) + } +} + +#[allow(clippy::upper_case_acronyms)] #[repr(i32)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, TS, Eq, PartialEq, IntEnum)] #[ts(export)] diff --git a/core/src/node/state.rs b/core/src/node/state.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/core/src/node/state.rs @@ -0,0 +1 @@ + diff --git a/core/src/sys/locations.rs b/core/src/sys/locations.rs index 13ed06fbb..2daa65c3d 100644 --- a/core/src/sys/locations.rs +++ b/core/src/sys/locations.rs @@ -5,10 +5,18 @@ use crate::{ prisma::{file_path, location}, ClientQuery, CoreEvent, LibraryQuery, }; + +use log::info; use serde::{Deserialize, Serialize}; -use std::{fs, io, io::Write, path::Path}; +use std::fmt::Debug; +use std::path::{Path, PathBuf}; use thiserror::Error; +use tokio::{ + fs::{metadata, File}, + io::{self, AsyncWriteExt}, +}; use ts_rs::TS; +use uuid::Uuid; use super::SysError; @@ -27,26 +35,26 @@ pub struct LocationResource { pub date_created: chrono::DateTime, } -impl Into for location::Data { - fn into(mut self) -> LocationResource { +impl From for LocationResource { + fn from(data: location::Data) -> Self { LocationResource { - id: self.id, - name: self.name, - path: self.local_path, - total_capacity: self.total_capacity, - available_capacity: self.available_capacity, - is_removable: self.is_removable, - node: self.node.take().unwrap_or(None).map(|node| (*node).into()), - is_online: self.is_online, - date_created: self.date_created.into(), + id: data.id, + name: data.name, + path: data.local_path, + total_capacity: data.total_capacity, + available_capacity: data.available_capacity, + is_removable: data.is_removable, + node: data.node.unwrap_or(None).map(Into::into), + is_online: data.is_online, + date_created: data.date_created.into(), } } } #[derive(Serialize, Deserialize, Default)] pub struct DotSpacedrive { - pub location_uuid: String, - pub library_uuid: String, + pub location_uuid: Uuid, + pub library_uuid: Uuid, } static DOTFILE_NAME: &str = ".spacedrive"; @@ -69,24 +77,26 @@ pub async fn get_location( location_id: i32, ) -> Result { // get location by location_id from db and include location_paths - let location = match ctx - .db + ctx.db .location() .find_unique(location::id::equals(location_id)) .exec() .await? - { - Some(location) => location, - None => Err(LocationError::NotFound(location_id.to_string()))?, - }; - Ok(location.into()) + .map(Into::into) + .ok_or_else(|| LocationError::IdNotFound(location_id).into()) } -pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: String) { - ctx.spawn_job(Box::new(IndexerJob { path: path.clone() })) - .await; - ctx.queue_job(Box::new(FileIdentifierJob { location_id, path })) - .await; +pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: impl AsRef) { + let path_buf = path.as_ref().to_path_buf(); + ctx.spawn_job(Box::new(IndexerJob { + path: path_buf.clone(), + })) + .await; + ctx.queue_job(Box::new(FileIdentifierJob { + location_id, + path: path_buf, + })) + .await; // TODO: make a way to stop jobs so this can be canceled without rebooting app // ctx.queue_job(Box::new(ThumbnailJob { // location_id, @@ -97,19 +107,18 @@ pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: String) pub async fn new_location_and_scan( ctx: &LibraryContext, - path: &str, + path: impl AsRef + Debug, ) -> Result { - let location = create_location(&ctx, path).await?; + let location = create_location(ctx, &path).await?; - scan_location(&ctx, location.id, path.to_string()).await; + scan_location(ctx, location.id, path).await; Ok(location) } pub async fn get_locations(ctx: &LibraryContext) -> Result, SysError> { - let db = &ctx.db; - - let locations = db + let locations = ctx + .db .location() .find_many(vec![]) .with(location::node::fetch()) @@ -117,121 +126,105 @@ pub async fn get_locations(ctx: &LibraryContext) -> Result .await?; // turn locations into LocationResource - let locations: Vec = locations - .into_iter() - .map(|location| location.into()) - .collect(); - - Ok(locations) + Ok(locations.into_iter().map(LocationResource::from).collect()) } pub async fn create_location( ctx: &LibraryContext, - path: &str, + path: impl AsRef + Debug, ) -> Result { + let path = path.as_ref(); + // check if we have access to this location - if !Path::new(path).exists() { - Err(LocationError::NotFound(path.to_string()))?; + if !path.exists() { + return Err(LocationError::PathNotFound(path.to_owned()).into()); } - // if on windows - if cfg!(target_family = "windows") { - // try and create a dummy file to see if we can write to this location - match fs::File::create(format!("{}/{}", path.clone(), ".spacewrite")) { - Ok(file) => file, - Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?, - }; - - match fs::remove_file(format!("{}/{}", path.clone(), ".spacewrite")) { - Ok(_) => (), - Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?, - } - } else { - // unix allows us to test this more directly - match fs::File::open(&path) { - Ok(_) => println!("Path is valid, creating location for '{}'", &path), - Err(e) => Err(LocationError::FileReadError(e))?, - } + if metadata(path) + .await + .map_err(|e| LocationError::DotfileReadFailure(e, path.to_owned()))? + .permissions() + .readonly() + { + return Err(LocationError::ReadonlyDotFileLocationFailure(path.to_owned()).into()); } + let path_string = path.to_string_lossy().to_string(); + // check if location already exists - let location = match ctx + let location_resource = if let Some(location) = ctx .db .location() - .find_first(vec![location::local_path::equals(Some(path.to_string()))]) + .find_first(vec![location::local_path::equals(Some( + path_string.clone(), + ))]) .exec() .await? { - Some(location) => location, - None => { - println!( - "Location does not exist, creating new location for '{}'", - &path - ); - let uuid = uuid::Uuid::new_v4(); + location.into() + } else { + info!( + "Location does not exist, creating new location for '{}'", + path_string + ); + let uuid = Uuid::new_v4(); - let p = Path::new(&path); + let location = ctx + .db + .location() + .create( + location::pub_id::set(uuid.to_string()), + vec![ + location::name::set(Some( + path.file_name().unwrap().to_string_lossy().to_string(), + )), + location::is_online::set(true), + location::local_path::set(Some(path_string)), + location::node_id::set(Some(ctx.node_local_id)), + ], + ) + .exec() + .await?; - let location = ctx - .db - .location() - .create( - location::pub_id::set(uuid.to_string()), - vec![ - location::name::set(Some( - p.file_name().unwrap().to_string_lossy().to_string(), - )), - location::is_online::set(true), - location::local_path::set(Some(path.to_string())), - location::node_id::set(Some(ctx.node_local_id)), - ], - ) - .exec() - .await?; + info!("Created location: {:?}", location); - println!("Created location: {:?}", location); + // write a file called .spacedrive to path containing the location id in JSON format + let mut dotfile = File::create(path.with_file_name(DOTFILE_NAME)) + .await + .map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?; - // write a file called .spacedrive to path containing the location id in JSON format - let mut dotfile = match fs::File::create(format!("{}/{}", path.clone(), DOTFILE_NAME)) { - Ok(file) => file, - Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?, - }; + let data = DotSpacedrive { + location_uuid: uuid, + library_uuid: ctx.id, + }; - let data = DotSpacedrive { - location_uuid: uuid.to_string(), - library_uuid: ctx.id.to_string(), - }; + let json_bytes = serde_json::to_vec(&data) + .map_err(|e| LocationError::DotfileSerializeFailure(e, path.to_owned()))?; - let json = match serde_json::to_string(&data) { - Ok(json) => json, - Err(e) => Err(LocationError::DotfileSerializeFailure(e, path.to_string()))?, - }; + dotfile + .write_all(&json_bytes) + .await + .map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?; - match dotfile.write_all(json.as_bytes()) { - Ok(_) => (), - Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?, - } + // ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::GetLocations)) + // .await; - // ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::GetLocations)) - // .await; - - location - } + location.into() }; - Ok(location.into()) + Ok(location_resource) } pub async fn delete_location(ctx: &LibraryContext, location_id: i32) -> Result<(), SysError> { - let db = &ctx.db; - - db.file_path() + ctx.db + .file_path() .find_many(vec![file_path::location_id::equals(Some(location_id))]) .delete() .exec() .await?; - db.location() + ctx.db + .location() .find_unique(location::id::equals(location_id)) .delete() .exec() @@ -243,7 +236,7 @@ pub async fn delete_location(ctx: &LibraryContext, location_id: i32) -> Result<( })) .await; - println!("Location {} deleted", location_id); + info!("Location {} deleted", location_id); Ok(()) } @@ -251,15 +244,21 @@ pub async fn delete_location(ctx: &LibraryContext, location_id: i32) -> Result<( #[derive(Error, Debug)] pub enum LocationError { #[error("Failed to create location (uuid {uuid:?})")] - CreateFailure { uuid: String }, - #[error("Failed to read location dotfile")] - DotfileReadFailure(io::Error), + CreateFailure { uuid: Uuid }, + #[error("Failed to read location dotfile (path: {1:?})")] + DotfileReadFailure(io::Error, PathBuf), #[error("Failed to serialize dotfile for location (at path: {1:?})")] - DotfileSerializeFailure(serde_json::Error, String), - #[error("Location not found (uuid: {1:?})")] - DotfileWriteFailure(io::Error, String), - #[error("Location not found (uuid: {0:?})")] - NotFound(String), + DotfileSerializeFailure(serde_json::Error, PathBuf), + #[error("Dotfile location is read only (at path: {0:?})")] + ReadonlyDotFileLocationFailure(PathBuf), + #[error("Failed to write dotfile (path: {1:?})")] + DotfileWriteFailure(io::Error, PathBuf), + #[error("Location not found (path: {0:?})")] + PathNotFound(PathBuf), + #[error("Location not found (uuid: {0})")] + UuidNotFound(Uuid), + #[error("Location not found (id: {0})")] + IdNotFound(i32), #[error("Failed to open file from local os")] FileReadError(io::Error), #[error("Failed to read mounted volumes from local os")] diff --git a/core/src/sys/mod.rs b/core/src/sys/mod.rs index d9565e399..8eafbf28b 100644 --- a/core/src/sys/mod.rs +++ b/core/src/sys/mod.rs @@ -11,11 +11,11 @@ use crate::{job, prisma}; #[derive(Error, Debug)] pub enum SysError { #[error("Location error")] - LocationError(#[from] LocationError), + Location(#[from] LocationError), #[error("Error with system volumes")] - VolumeError(String), + Volume(String), #[error("Error from job runner")] - JobError(#[from] job::JobError), + Job(#[from] job::JobError), #[error("Database error")] - DatabaseError(#[from] prisma::QueryError), + Database(#[from] prisma::QueryError), } diff --git a/core/src/sys/volumes.rs b/core/src/sys/volumes.rs index 3268787a7..ca9666e5c 100644 --- a/core/src/sys/volumes.rs +++ b/core/src/sys/volumes.rs @@ -33,7 +33,7 @@ impl Volume { .volume() .upsert( node_id_mount_point_name( - ctx.node_local_id.clone(), + ctx.node_local_id, volume.mount_point.to_string(), volume.name.to_string(), ), @@ -63,7 +63,7 @@ impl Volume { Ok(()) } pub fn get_volumes() -> Result, SysError> { - let all_volumes: Vec = System::new_all() + Ok(System::new_all() .disks() .iter() .map(|disk| { @@ -119,15 +119,8 @@ impl Volume { is_root_filesystem: mount_point == "/", } }) - .collect(); - - let volumes = all_volumes - .clone() - .into_iter() .filter(|volume| !volume.mount_point.starts_with("/System")) - .collect(); - - Ok(volumes) + .collect()) } } diff --git a/core/src/tag/mod.rs b/core/src/tag/mod.rs index 61d343333..415c1780c 100644 --- a/core/src/tag/mod.rs +++ b/core/src/tag/mod.rs @@ -39,29 +39,29 @@ pub struct TagOnFile { pub date_created: chrono::DateTime, } -impl Into for tag::Data { - fn into(self) -> Tag { - Tag { - id: self.id, - pub_id: self.pub_id, - name: self.name, - color: self.color, - total_files: self.total_files, - redundancy_goal: self.redundancy_goal, - date_created: self.date_created.into(), - date_modified: self.date_modified.into(), +impl From for Tag { + fn from(data: tag::Data) -> Self { + Self { + id: data.id, + pub_id: data.pub_id, + name: data.name, + color: data.color, + total_files: data.total_files, + redundancy_goal: data.redundancy_goal, + date_created: data.date_created.into(), + date_modified: data.date_modified.into(), } } } -impl Into for tag_on_file::Data { - fn into(self) -> TagOnFile { - TagOnFile { - tag_id: self.tag_id, - tag: self.tag.map(|t| (*t).into()), - file_id: self.file_id, - file: self.file.map(|f| (*f).into()), - date_created: self.date_created.into(), +impl From for TagOnFile { + fn from(data: tag_on_file::Data) -> Self { + Self { + tag_id: data.tag_id, + tag: data.tag.map(|t| (*t).into()), + file_id: data.file_id, + file: data.file.map(|f| (*f).into()), + date_created: data.date_created.into(), } } } diff --git a/core/src/util/db.rs b/core/src/util/db.rs index 0d1b3c33b..661e9f8d3 100644 --- a/core/src/util/db.rs +++ b/core/src/util/db.rs @@ -12,10 +12,10 @@ static MIGRATIONS_DIR: Dir = include_dir!("$CARGO_MANIFEST_DIR/prisma/migrations #[derive(Error, Debug)] pub enum MigrationError { #[error("An error occurred while initialising a new database connection")] - DatabaseIntialisation(#[from] NewClientError), + DatabaseInitialization(#[from] NewClientError), #[error("An error occurred with the database while applying migrations")] DatabaseError(#[from] prisma_client_rust::queries::Error), - #[error("An error occured reading the embedded migration files. {0}. Please report to Spacedrive developers!")] + #[error("An error occurred reading the embedded migration files. {0}. Please report to Spacedrive developers!")] InvalidEmbeddedMigration(&'static str), } @@ -28,7 +28,7 @@ pub async fn load_and_migrate(db_url: &str) -> Result Result Result, _>>()?; // We sort the migrations so they are always applied in the correct order - migration_directories.sort_by(|(_, a_time, _), (_, b_time, _)| a_time.cmp(&b_time)); + migration_directories.sort_by(|(_, a_time, _), (_, b_time, _)| a_time.cmp(b_time)); for (name, _, dir) in migration_directories { let migration_file_raw = dir @@ -105,7 +103,7 @@ pub async fn load_and_migrate(db_url: &str) -> Result>(); + let steps = migration_file_raw.split(';').collect::>(); let steps = &steps[0..steps.len() - 1]; for (i, step) in steps.iter().enumerate() { client._execute_raw(raw!(*step)).await?;