From ef2f60ff33de0ff5b03dd1de937b2c7eef645171 Mon Sep 17 00:00:00 2001 From: Ericson Fogo Soares Date: Tue, 28 Jun 2022 22:56:49 -0300 Subject: [PATCH 1/4] Fixing Clippy warnings * Using tokio on all filesystem operations * Some minor tweaks to be more consistent on paths between &str, AsRef and PathBuf * Using logging instead of println --- Cargo.lock | Bin 167334 -> 167624 bytes core/Cargo.toml | 2 +- core/src/encode/metadata.rs | 7 +- core/src/encode/thumb.rs | 70 +++++---- core/src/file/cas/checksum.rs | 37 ++--- core/src/file/cas/identifier.rs | 44 +++--- core/src/file/explorer/open.rs | 40 +++--- core/src/file/indexer/mod.rs | 12 +- core/src/file/indexer/scan.rs | 65 +++++---- core/src/file/mod.rs | 83 ++++++----- core/src/job/jobs.rs | 41 +++--- core/src/job/worker.rs | 9 +- core/src/lib.rs | 65 +++++---- core/src/library/loader.rs | 45 +++--- core/src/library/statistics.rs | 71 ++++----- core/src/node/mod.rs | 57 ++++---- core/src/node/state.rs | 64 +++++---- core/src/sys/locations.rs | 245 ++++++++++++++++---------------- core/src/sys/mod.rs | 8 +- core/src/sys/volumes.rs | 21 +-- core/src/util/db.rs | 30 ++-- 21 files changed, 518 insertions(+), 498 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca71bf05e6c80fe1745988f1e5a2a2837b8930c0..d0af4fd42471cf581af24e09047db2aa2e810e8a 100644 GIT binary patch delta 150 zcmZ2Bo9o0}u7)j)vyM&w%*`aSecds}#3nHXrQ+10lvE`hE(N91(##ay+{EOf{OSKo z8I`$^__}3`64S#xn1bw0O)S#P4J`~4O)V`AQqvMm4UCM<6O)q6QxmuuN println!("error: {}", error), +// Err(error) => error!("error: {}", error), // } // Ok(()) // } diff --git a/core/src/encode/thumb.rs b/core/src/encode/thumb.rs index bb148d46b..d9d23d587 100644 --- a/core/src/encode/thumb.rs +++ b/core/src/encode/thumb.rs @@ -1,22 +1,21 @@ -use crate::job::JobReportUpdate; -use crate::node::get_nodestate; use crate::{ - job::{Job, WorkerContext}, + job::{Job, JobReportUpdate, WorkerContext}, + node::get_nodestate, prisma::file_path, - CoreContext, + sys, CoreContext, CoreEvent, }; -use crate::{sys, CoreEvent}; use futures::executor::block_on; -use image::*; +use image::{self, imageops, DynamicImage, GenericImageView}; use log::{error, info}; -use std::fs; +use std::error::Error; use std::path::{Path, PathBuf}; -use webp::*; +use tokio::fs; +use webp::Encoder; #[derive(Debug, Clone)] pub struct ThumbnailJob { pub location_id: i32, - pub path: String, + pub path: PathBuf, pub background: bool, } @@ -29,30 +28,34 @@ impl Job for ThumbnailJob { fn name(&self) -> &'static str { "thumbnailer" } - async fn run(&self, ctx: WorkerContext) -> Result<(), Box> { - let config = get_nodestate(); - let core_ctx = ctx.core_ctx.clone(); - let location = sys::get_location(&core_ctx, self.location_id).await?; + async fn run(&self, ctx: WorkerContext) -> Result<(), Box> { + let config = get_nodestate(); + + let location = sys::get_location(&ctx.core_ctx, self.location_id).await?; info!( - "Searching for images in location {} at path {}", + "Searching for images in location {} at path {:#?}", location.id, self.path ); // create all necessary directories if they don't exist fs::create_dir_all( - Path::new(&config.data_path) + config + .data_path + .as_ref() + .unwrap() .join(THUMBNAIL_CACHE_DIR_NAME) .join(format!("{}", self.location_id)), - )?; + ) + .await?; let root_path = location.path.unwrap(); // query database for all files in this location that need thumbnails - let image_files = get_images(&core_ctx, self.location_id, &self.path).await?; + let image_files = get_images(&ctx.core_ctx, self.location_id, &self.path).await?; info!("Found {:?} files", image_files.len()); - let is_background = self.background.clone(); + let is_background = self.background; tokio::task::spawn_blocking(move || { ctx.progress(vec![ @@ -89,7 +92,10 @@ impl Job for ThumbnailJob { }; // Define and write the WebP-encoded file to a given path - let output_path = Path::new(&config.data_path) + let output_path = config + .data_path + .as_ref() + .unwrap() .join(THUMBNAIL_CACHE_DIR_NAME) .join(format!("{}", location.id)) .join(&cas_id) @@ -98,7 +104,7 @@ impl Job for ThumbnailJob { // check if file exists at output path if !output_path.exists() { info!("Writing {:?} to {:?}", path, output_path); - generate_thumbnail(&path, &output_path) + block_on(generate_thumbnail(&path, &output_path)) .map_err(|e| { info!("Error generating thumb {:?}", e); }) @@ -120,27 +126,27 @@ impl Job for ThumbnailJob { } } -pub fn generate_thumbnail( - file_path: &PathBuf, - output_path: &PathBuf, -) -> Result<(), Box> { +pub async fn generate_thumbnail>( + file_path: P, + output_path: P, +) -> Result<(), Box> { // Using `image` crate, open the included .jpg file let img = image::open(file_path)?; let (w, h) = img.dimensions(); // Optionally, resize the existing photo and convert back into DynamicImage - let img: DynamicImage = image::DynamicImage::ImageRgba8(imageops::resize( + let img = DynamicImage::ImageRgba8(imageops::resize( &img, (w as f32 * THUMBNAIL_SIZE_FACTOR) as u32, (h as f32 * THUMBNAIL_SIZE_FACTOR) as u32, imageops::FilterType::Triangle, )); // Create the WebP encoder for the above image - let encoder: Encoder = Encoder::from_image(&img)?; + let encoder = Encoder::from_image(&img)?; // Encode the image at a specified quality 0-100 - let webp: WebPMemory = encoder.encode(THUMBNAIL_QUALITY); + let webp = encoder.encode(THUMBNAIL_QUALITY); - std::fs::write(&output_path, &*webp)?; + fs::write(output_path, &*webp).await?; Ok(()) } @@ -148,7 +154,7 @@ pub fn generate_thumbnail( pub async fn get_images( ctx: &CoreContext, location_id: i32, - path: &str, + path: impl AsRef, ) -> Result, std::io::Error> { let mut params = vec![ file_path::location_id::equals(Some(location_id)), @@ -161,8 +167,10 @@ pub async fn get_images( ]), ]; - if !path.is_empty() { - params.push(file_path::materialized_path::starts_with(path.to_string())) + let path_str = path.as_ref().to_string_lossy().to_string(); + + if !path_str.is_empty() { + params.push(file_path::materialized_path::starts_with(path_str)) } let image_files = ctx diff --git a/core/src/file/cas/checksum.rs b/core/src/file/cas/checksum.rs index 78bd09aca..08c12675c 100644 --- a/core/src/file/cas/checksum.rs +++ b/core/src/file/cas/checksum.rs @@ -1,36 +1,27 @@ use data_encoding::HEXLOWER; use ring::digest::{Context, SHA256}; -use std::convert::TryInto; -use std::fs::File; - -use std::io; use std::path::PathBuf; +use tokio::{ + fs::File, + io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom}, +}; static SAMPLE_COUNT: u64 = 4; static SAMPLE_SIZE: u64 = 10000; -fn read_at(file: &File, offset: u64, size: u64) -> Result, io::Error> { +async fn read_at(file: &mut File, offset: u64, size: u64) -> Result, io::Error> { let mut buf = vec![0u8; size as usize]; - #[cfg(target_family = "unix")] - { - use std::os::unix::prelude::*; - file.read_exact_at(&mut buf, offset)?; - } - - #[cfg(target_family = "windows")] - { - use std::os::windows::prelude::*; - file.seek_read(&mut buf, offset)?; - } + file.seek(SeekFrom::Start(offset)).await?; + file.read_exact(&mut buf).await?; Ok(buf) } -pub fn generate_cas_id(path: PathBuf, size: u64) -> Result { +pub async fn generate_cas_id(path: PathBuf, size: u64) -> Result { // open file reference - let file = File::open(path)?; + let mut file = File::open(path).await?; let mut context = Context::new(&SHA256); @@ -39,20 +30,16 @@ pub fn generate_cas_id(path: PathBuf, size: u64) -> Result { // if size is small enough, just read the whole thing if SAMPLE_COUNT * SAMPLE_SIZE > size { - let buf = read_at(&file, 0, size.try_into().unwrap())?; + let buf = read_at(&mut file, 0, size).await?; context.update(&buf); } else { // loop over samples for i in 0..SAMPLE_COUNT { - let buf = read_at( - &file, - (size / SAMPLE_COUNT) * i, - SAMPLE_SIZE.try_into().unwrap(), - )?; + let buf = read_at(&mut file, (size / SAMPLE_COUNT) * i, SAMPLE_SIZE).await?; context.update(&buf); } // sample end of file - let buf = read_at(&file, size - SAMPLE_SIZE, SAMPLE_SIZE.try_into().unwrap())?; + let buf = read_at(&mut file, size - SAMPLE_SIZE, SAMPLE_SIZE).await?; context.update(&buf); } diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index 94e5c5aaf..de9a54483 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -13,8 +13,9 @@ use log::info; use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::path::Path; -use std::{fs, io}; +use std::error::Error; +use std::path::{Path, PathBuf}; +use tokio::{fs, io}; // FileIdentifierJob takes file_paths without a file_id and uniquely identifies them // first: generating the cas_id and extracting metadata @@ -22,7 +23,7 @@ use std::{fs, io}; #[derive(Debug)] pub struct FileIdentifierJob { pub location_id: i32, - pub path: String, + pub path: PathBuf, } // we break this job into chunks of 100 to improve performance @@ -33,11 +34,12 @@ impl Job for FileIdentifierJob { fn name(&self) -> &'static str { "file_identifier" } - async fn run(&self, ctx: WorkerContext) -> Result<(), Box> { + + async fn run(&self, ctx: WorkerContext) -> Result<(), Box> { info!("Identifying orphan file paths..."); let location = get_location(&ctx.core_ctx, self.location_id).await?; - let location_path = location.path.unwrap_or("".to_string()); + let location_path = location.path.unwrap_or_else(|| "".to_string()); let total_count = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; info!("Found {} orphan file paths", total_count); @@ -63,7 +65,7 @@ impl Job for FileIdentifierJob { let file_paths = match block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)) { Ok(file_paths) => file_paths, Err(e) => { - info!("Error getting orphan file paths: {}", e); + info!("Error getting orphan file paths: {:#?}", e); continue; } }; @@ -77,7 +79,7 @@ impl Job for FileIdentifierJob { // analyze each file_path for file_path in file_paths.iter() { // get the cas_id and extract metadata - match prepare_file(&location_path, file_path) { + match block_on(prepare_file(&location_path, file_path)) { Ok(file) => { let cas_id = file.cas_id.clone(); // create entry into chunks for created file data @@ -85,7 +87,7 @@ impl Job for FileIdentifierJob { cas_lookup.insert(cas_id, file_path.id); } Err(e) => { - info!("Error processing file: {}", e); + info!("Error processing file: {:#?}", e); continue; } }; @@ -106,8 +108,8 @@ impl Job for FileIdentifierJob { let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); block_on( db.file_path() - .find_unique(file_path::id::equals(file_path_id.clone())) - .update(vec![file_path::file_id::set(Some(file.id.clone()))]) + .find_unique(file_path::id::equals(*file_path_id)) + .update(vec![file_path::file_id::set(Some(file.id))]) .exec(), ) .unwrap(); @@ -125,8 +127,8 @@ impl Job for FileIdentifierJob { for file in new_files.iter() { values.extend([ PrismaValue::String(file.cas_id.clone()), - PrismaValue::Int(file.size_in_bytes.clone()), - PrismaValue::DateTime(file.date_created.clone()), + PrismaValue::Int(file.size_in_bytes), + PrismaValue::DateTime(file.date_created), ]); } @@ -140,7 +142,7 @@ impl Job for FileIdentifierJob { values, ))) .unwrap_or_else(|e| { - info!("Error inserting files: {}", e); + info!("Error inserting files: {:#?}", e); Vec::new() }); @@ -151,8 +153,8 @@ impl Job for FileIdentifierJob { let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); block_on( db.file_path() - .find_unique(file_path::id::equals(file_path_id.clone())) - .update(vec![file_path::file_id::set(Some(file.id.clone()))]) + .find_unique(file_path::id::equals(*file_path_id)) + .update(vec![file_path::file_id::set(Some(file.id))]) .exec(), ) .unwrap(); @@ -241,13 +243,15 @@ pub struct FileCreated { pub cas_id: String, } -pub fn prepare_file( - location_path: &str, +pub async fn prepare_file( + location_path: impl AsRef, file_path: &file_path::Data, ) -> Result { - let path = Path::new(&location_path).join(Path::new(file_path.materialized_path.as_str())); + let path = location_path + .as_ref() + .join(file_path.materialized_path.as_str()); - let metadata = fs::metadata(&path)?; + let metadata = fs::metadata(&path).await?; // let date_created: DateTime = metadata.created().unwrap().into(); @@ -255,7 +259,7 @@ pub fn prepare_file( let cas_id = { if !file_path.is_dir { - let mut ret = generate_cas_id(path.clone(), size.clone()).unwrap(); + let mut ret = generate_cas_id(path, size).await?; ret.truncate(16); ret } else { diff --git a/core/src/file/explorer/open.rs b/core/src/file/explorer/open.rs index bedfba5a0..901fa3589 100644 --- a/core/src/file/explorer/open.rs +++ b/core/src/file/explorer/open.rs @@ -6,33 +6,37 @@ use crate::{ sys::get_location, CoreContext, }; +use log::info; use std::path::Path; pub async fn open_dir( ctx: &CoreContext, - location_id: &i32, - path: &str, + location_id: i32, + path: impl AsRef, ) -> Result { - let db = &ctx.database; let config = get_nodestate(); // get location - let location = get_location(ctx, location_id.clone()).await?; + let location = get_location(ctx, location_id).await?; - let directory = db + let path_str = path.as_ref().to_string_lossy().to_string(); + + let directory = ctx + .database .file_path() .find_first(vec![ file_path::location_id::equals(Some(location.id)), - file_path::materialized_path::equals(path.into()), + file_path::materialized_path::equals(path_str), file_path::is_dir::equals(true), ]) .exec() .await? - .ok_or(FileError::DirectoryNotFound(path.to_string()))?; + .ok_or_else(|| FileError::DirectoryNotFound(path.as_ref().to_path_buf()))?; - println!("DIRECTORY: {:?}", directory); + info!("DIRECTORY: {:?}", directory); - let mut file_paths: Vec = db + let mut file_paths: Vec = ctx + .database .file_path() .find_many(vec![ file_path::location_id::equals(Some(location.id)), @@ -45,15 +49,17 @@ pub async fn open_dir( .map(Into::into) .collect(); - for file_path in &mut file_paths { - if let Some(file) = &mut file_path.file { - let thumb_path = Path::new(&config.data_path) - .join(THUMBNAIL_CACHE_DIR_NAME) - .join(format!("{}", location.id)) - .join(file.cas_id.clone()) - .with_extension("webp"); + if let Some(ref data_path) = config.data_path { + for file_path in &mut file_paths { + if let Some(file) = &mut file_path.file { + let thumb_path = data_path + .join(THUMBNAIL_CACHE_DIR_NAME) + .join(location.id.to_string()) + .join(file.cas_id.clone()) + .with_extension("webp"); - file.has_thumbnail = thumb_path.exists(); + file.has_thumbnail = thumb_path.exists(); + } } } diff --git a/core/src/file/indexer/mod.rs b/core/src/file/indexer/mod.rs index 4415b252c..d4e428a4d 100644 --- a/core/src/file/indexer/mod.rs +++ b/core/src/file/indexer/mod.rs @@ -1,15 +1,18 @@ use crate::job::{Job, JobReportUpdate, WorkerContext}; +use std::error::Error; +use std::path::PathBuf; use self::scan::ScanProgress; mod scan; +// Re-exporting pub use scan::*; -pub use scan::scan_path; +use scan::scan_path; #[derive(Debug)] pub struct IndexerJob { - pub path: String, + pub path: PathBuf, } #[async_trait::async_trait] @@ -17,9 +20,8 @@ impl Job for IndexerJob { fn name(&self) -> &'static str { "indexer" } - async fn run(&self, ctx: WorkerContext) -> Result<(), Box> { - let core_ctx = ctx.core_ctx.clone(); - scan_path(&core_ctx, self.path.as_str(), move |p| { + async fn run(&self, ctx: WorkerContext) -> Result<(), Box> { + scan_path(&ctx.core_ctx.clone(), &self.path, move |p| { ctx.progress( p.iter() .map(|p| match p.clone() { diff --git a/core/src/file/indexer/scan.rs b/core/src/file/indexer/scan.rs index 19c3ee53d..6dd7a5bb7 100644 --- a/core/src/file/indexer/scan.rs +++ b/core/src/file/indexer/scan.rs @@ -1,13 +1,22 @@ -use crate::sys::{create_location, LocationResource}; -use crate::CoreContext; -use chrono::{DateTime, FixedOffset, Utc}; +use crate::{ + sys::{create_location, LocationResource}, + CoreContext, +}; + +use chrono::{DateTime, Utc}; use log::{error, info}; use prisma_client_rust::prisma_models::PrismaValue; use prisma_client_rust::raw; use prisma_client_rust::raw::Raw; use serde::{Deserialize, Serialize}; use std::ffi::OsStr; -use std::{collections::HashMap, fs, path::Path, path::PathBuf, time::Instant}; +use std::fmt::Debug; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + time::Instant, +}; +use tokio::fs; use walkdir::{DirEntry, WalkDir}; #[derive(Clone)] @@ -22,13 +31,10 @@ static BATCH_SIZE: usize = 100; // creates a vector of valid path buffers from a directory pub async fn scan_path( ctx: &CoreContext, - path: &str, + path: impl AsRef + Debug, on_progress: impl Fn(Vec) + Send + Sync + 'static, ) -> Result<(), Box> { - let db = &ctx.database; - let path = path.to_string(); - - let location = create_location(&ctx, &path).await?; + let location = create_location(ctx, &path).await?; // query db to highers id, so we can increment it for the new files indexed #[derive(Deserialize, Serialize, Debug)] @@ -36,20 +42,22 @@ pub async fn scan_path( id: Option, } // grab the next id so we can increment in memory for batch inserting - let first_file_id = match db + let first_file_id = match ctx + .database ._query_raw::(raw!("SELECT MAX(id) id FROM file_paths")) .await { Ok(rows) => rows[0].id.unwrap_or(0), - Err(e) => panic!("Error querying for next file id: {}", e), + Err(e) => panic!("Error querying for next file id: {:#?}", e), }; //check is path is a directory - if !PathBuf::from(&path).is_dir() { + if !path.as_ref().is_dir() { // return Err(anyhow::anyhow!("{} is not a directory", &path)); - panic!("{} is not a directory", &path); + panic!("{:#?} is not a directory", path); } - let dir_path = path.clone(); + + let path_buf = path.as_ref().to_path_buf(); // spawn a dedicated thread to scan the directory for performance let (paths, scan_start, on_progress) = tokio::task::spawn_blocking(move || { @@ -66,10 +74,9 @@ pub async fn scan_path( next_file_id }; // walk through directory recursively - for entry in WalkDir::new(&dir_path).into_iter().filter_entry(|dir| { - let approved = - !is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir); - approved + for entry in WalkDir::new(path_buf).into_iter().filter_entry(|dir| { + // check if entry is approved + !is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir) }) { // extract directory entry or log and continue if failed let entry = match entry { @@ -85,7 +92,7 @@ pub async fn scan_path( let parent_path = path .parent() - .unwrap_or(Path::new("")) + .unwrap_or_else(|| Path::new("")) .to_str() .unwrap_or(""); let parent_dir_id = dirs.get(&*parent_path); @@ -99,7 +106,7 @@ pub async fn scan_path( }; on_progress(vec![ - ScanProgress::Message(format!("{}", path_str)), + ScanProgress::Message(format!("Scanning {}", path_str)), ScanProgress::ChunkCount(paths.len() / BATCH_SIZE), ]); @@ -121,8 +128,7 @@ pub async fn scan_path( } (paths, scan_start, on_progress) }) - .await - .unwrap(); + .await?; let db_write_start = Instant::now(); let scan_read_time = scan_start.elapsed(); @@ -142,7 +148,7 @@ pub async fn scan_path( for (file_path, file_id, parent_dir_id, is_dir) in chunk { files.extend( - match prepare_values(&file_path, *file_id, &location, parent_dir_id, *is_dir) { + match prepare_values(file_path, *file_id, &location, parent_dir_id, *is_dir).await { Ok(values) => values.to_vec(), Err(e) => { error!("Error creating file model from path {:?}: {}", file_path, e); @@ -162,7 +168,7 @@ pub async fn scan_path( files ); - let count = db._execute_raw(raw).await; + let count = ctx.database._execute_raw(raw).await; info!("Inserted {:?} records", count); } @@ -177,14 +183,14 @@ pub async fn scan_path( } // reads a file at a path and creates an ActiveModel with metadata -fn prepare_values( +async fn prepare_values( file_path: &PathBuf, id: i32, location: &LocationResource, parent_id: &Option, is_dir: bool, ) -> Result<[PrismaValue; 8], std::io::Error> { - let metadata = fs::metadata(&file_path)?; + let metadata = fs::metadata(&file_path).await?; let location_path = Path::new(location.path.as_ref().unwrap().as_str()); // let size = metadata.len(); let name; @@ -214,7 +220,6 @@ fn prepare_values( PrismaValue::String(name), PrismaValue::String(extension.to_lowercase()), parent_id - .clone() .map(|id| PrismaValue::Int(id as i64)) .unwrap_or(PrismaValue::Null), PrismaValue::DateTime(date_created.into()), @@ -236,7 +241,7 @@ fn is_hidden(entry: &DirEntry) -> bool { entry .file_name() .to_str() - .map(|s| s.starts_with(".")) + .map(|s| s.starts_with('.')) .unwrap_or(false) } @@ -265,7 +270,7 @@ fn is_app_bundle(entry: &DirEntry) -> bool { .map(|s| s.contains(".app") | s.contains(".bundle")) .unwrap_or(false); - let is_app_bundle = is_dir && contains_dot; + // let is_app_bundle = is_dir && contains_dot; // if is_app_bundle { // let path_buff = entry.path(); // let path = path_buff.to_str().unwrap(); @@ -273,5 +278,5 @@ fn is_app_bundle(entry: &DirEntry) -> bool { // self::path(&path, ); // } - is_app_bundle + is_dir && contains_dot } diff --git a/core/src/file/mod.rs b/core/src/file/mod.rs index bc632ecec..9959af3ed 100644 --- a/core/src/file/mod.rs +++ b/core/src/file/mod.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use int_enum::IntEnum; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -77,46 +78,52 @@ pub enum FileKind { Alias = 8, } -impl Into for file::Data { - fn into(self) -> File { - File { - id: self.id, - cas_id: self.cas_id, - integrity_checksum: self.integrity_checksum, - kind: IntEnum::from_int(self.kind).unwrap(), - size_in_bytes: self.size_in_bytes.to_string(), - // encryption: EncryptionAlgorithm::from_int(self.encryption).unwrap(), - ipfs_id: self.ipfs_id, - hidden: self.hidden, - favorite: self.favorite, - important: self.important, - has_thumbnail: self.has_thumbnail, - has_thumbstrip: self.has_thumbstrip, - has_video_preview: self.has_video_preview, - note: self.note, - date_created: self.date_created.into(), - date_modified: self.date_modified.into(), - date_indexed: self.date_indexed.into(), +impl From for File { + fn from(data: file::Data) -> Self { + Self { + id: data.id, + cas_id: data.cas_id, + integrity_checksum: data.integrity_checksum, + kind: IntEnum::from_int(data.kind).unwrap(), + size_in_bytes: data.size_in_bytes.to_string(), + // encryption: EncryptionAlgorithm::from_int(data.encryption).unwrap(), + ipfs_id: data.ipfs_id, + hidden: data.hidden, + favorite: data.favorite, + important: data.important, + has_thumbnail: data.has_thumbnail, + has_thumbstrip: data.has_thumbstrip, + has_video_preview: data.has_video_preview, + note: data.note, + date_created: data.date_created.into(), + date_modified: data.date_modified.into(), + date_indexed: data.date_indexed.into(), paths: vec![], } } } -impl Into for file_path::Data { - fn into(mut self) -> FilePath { - FilePath { - id: self.id, - is_dir: self.is_dir, - materialized_path: self.materialized_path, - file_id: self.file_id, - parent_id: self.parent_id, - location_id: self.location_id.unwrap_or(0), - date_indexed: self.date_indexed.into(), - name: self.name, - extension: self.extension, - date_created: self.date_created.into(), - date_modified: self.date_modified.into(), - file: self.file.take().unwrap_or(None).map(|file| (*file).into()), +impl From> for File { + fn from(data: Box) -> Self { + Self::from(*data) + } +} + +impl From for FilePath { + fn from(data: file_path::Data) -> Self { + Self { + id: data.id, + is_dir: data.is_dir, + materialized_path: data.materialized_path, + file_id: data.file_id, + parent_id: data.parent_id, + location_id: data.location_id.unwrap_or(0), + date_indexed: data.date_indexed.into(), + name: data.name, + extension: data.extension, + date_created: data.date_created.into(), + date_modified: data.date_modified.into(), + file: data.file.unwrap_or(None).map(Into::into), } } } @@ -131,9 +138,9 @@ pub struct DirectoryWithContents { #[derive(Error, Debug)] pub enum FileError { #[error("Directory not found (path: {0:?})")] - DirectoryNotFound(String), + DirectoryNotFound(PathBuf), #[error("File not found (path: {0:?})")] - FileNotFound(String), + FileNotFound(PathBuf), #[error("Database error")] DatabaseError(#[from] prisma::QueryError), #[error("System error")] @@ -145,7 +152,7 @@ pub async fn set_note( id: i32, note: Option, ) -> Result { - let response = ctx + let _response = ctx .database .file() .find_unique(file::id::equals(id)) diff --git a/core/src/job/jobs.rs b/core/src/job/jobs.rs index efacd6cb8..3a0b56c5a 100644 --- a/core/src/job/jobs.rs +++ b/core/src/job/jobs.rs @@ -23,8 +23,8 @@ const MAX_WORKERS: usize = 1; #[async_trait::async_trait] pub trait Job: Send + Sync + Debug { - async fn run(&self, ctx: WorkerContext) -> Result<(), Box>; fn name(&self) -> &'static str; + async fn run(&self, ctx: WorkerContext) -> Result<(), Box>; } // jobs struct is maintained by the core @@ -52,7 +52,7 @@ impl Jobs { let wrapped_worker = Arc::new(Mutex::new(worker)); - Worker::spawn(wrapped_worker.clone(), ctx).await; + Worker::spawn(Arc::clone(&wrapped_worker), ctx).await; self.running_workers.insert(id, wrapped_worker); } else { @@ -84,9 +84,8 @@ impl Jobs { } pub async fn queue_pending_job(ctx: &CoreContext) -> Result<(), JobError> { - let db = &ctx.database; - - let next_job = db + let _next_job = ctx + .database .job() .find_first(vec![job::status::equals(JobStatus::Queued.int_value())]) .exec() @@ -96,14 +95,14 @@ impl Jobs { } pub async fn get_history(ctx: &CoreContext) -> Result, JobError> { - let db = &ctx.database; - let jobs = db + let jobs = ctx + .database .job() .find_many(vec![job::status::not(JobStatus::Running.int_value())]) .exec() .await?; - Ok(jobs.into_iter().map(|j| j.into()).collect()) + Ok(jobs.into_iter().map(Into::into).collect()) } } @@ -138,20 +137,20 @@ pub struct JobReport { } // convert database struct into a resource struct -impl Into for job::Data { - fn into(self) -> JobReport { +impl From for JobReport { + fn from(data: job::Data) -> JobReport { JobReport { - id: self.id, - name: self.name, - // client_id: self.client_id, - status: JobStatus::from_int(self.status).unwrap(), - task_count: self.task_count, - completed_task_count: self.completed_task_count, - date_created: self.date_created.into(), - date_modified: self.date_modified.into(), - data: self.data, + id: data.id, + name: data.name, + // client_id: data.client_id, + status: JobStatus::from_int(data.status).unwrap(), + task_count: data.task_count, + completed_task_count: data.completed_task_count, + date_created: data.date_created.into(), + date_modified: data.date_modified.into(), + data: data.data, message: String::new(), - seconds_elapsed: self.seconds_elapsed, + seconds_elapsed: data.seconds_elapsed, } } } @@ -177,7 +176,7 @@ impl JobReport { let mut params = Vec::new(); - if let Some(_) = &self.data { + if self.data.is_some() { params.push(job::data::set(self.data.clone())) } diff --git a/core/src/job/worker.rs b/core/src/job/worker.rs index 6022e603e..1acf4f250 100644 --- a/core/src/job/worker.rs +++ b/core/src/job/worker.rs @@ -3,6 +3,7 @@ use super::{ Job, }; use crate::{ClientQuery, CoreContext, CoreEvent, InternalEvent}; +use log::error; use std::{sync::Arc, time::Duration}; use tokio::{ sync::{ @@ -11,6 +12,8 @@ use tokio::{ }, time::{sleep, Instant}, }; +use uuid::Uuid; + // used to update the worker state from inside the worker thread pub enum WorkerEvent { Progressed(Vec), @@ -53,7 +56,7 @@ pub struct Worker { impl Worker { pub fn new(job: Box) -> Self { let (worker_sender, worker_receiver) = unbounded_channel(); - let uuid = uuid::Uuid::new_v4().to_string(); + let uuid = Uuid::new_v4().to_string(); let name = job.name(); Self { @@ -80,7 +83,7 @@ impl Worker { worker_mut.job_report.status = JobStatus::Running; - worker_mut.job_report.create(&ctx).await.unwrap_or(()); + worker_mut.job_report.create(ctx).await.unwrap_or(()); // spawn task to handle receiving events from the worker tokio::spawn(Worker::track_progress( @@ -116,7 +119,7 @@ impl Worker { let result = job.run(worker_ctx.clone()).await; if let Err(e) = result { - println!("job failed {:?}", e); + error!("job failed {:?}", e); worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(()); } else { // handle completion diff --git a/core/src/lib.rs b/core/src/lib.rs index bb096a666..f0b0833d2 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -3,13 +3,18 @@ use crate::{ prisma::file as prisma_file, prisma::location, util::db::create_connection, }; use job::{Job, JobReport, Jobs}; +use log::{error, info}; use prisma::PrismaClient; use serde::{Deserialize, Serialize}; -use std::{fs, sync::Arc}; +use std::path::PathBuf; +use std::sync::Arc; use thiserror::Error; -use tokio::sync::{ - mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot, +use tokio::{ + fs, + sync::{ + mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, + }, }; use ts_rs::TS; @@ -83,19 +88,19 @@ impl CoreContext { self.internal_sender .send(InternalEvent::JobIngest(job)) .unwrap_or_else(|e| { - println!("Failed to spawn job. {:?}", e); + error!("Failed to spawn job. {:?}", e); }); } pub fn queue_job(&self, job: Box) { self.internal_sender .send(InternalEvent::JobQueue(job)) .unwrap_or_else(|e| { - println!("Failed to queue job. {:?}", e); + error!("Failed to queue job. {:?}", e); }); } pub async fn emit(&self, event: CoreEvent) { self.event_sender.send(event).await.unwrap_or_else(|e| { - println!("Failed to emit event. {:?}", e); + error!("Failed to emit event. {:?}", e); }); } } @@ -127,23 +132,23 @@ pub struct Node { impl Node { // create new instance of node, run startup tasks - pub async fn new(mut data_dir: std::path::PathBuf) -> (Node, mpsc::Receiver) { + pub async fn new(mut data_dir: PathBuf) -> (Node, mpsc::Receiver) { let (event_sender, event_recv) = mpsc::channel(100); - data_dir = data_dir.join("spacedrive"); - let data_dir = data_dir.to_str().unwrap(); + data_dir.push("spacedrive"); // create data directory if it doesn't exist - fs::create_dir_all(&data_dir).unwrap(); + fs::create_dir_all(&data_dir).await.unwrap(); // prepare basic client state - let mut state = NodeState::new(data_dir, "diamond-mastering-space-dragon").unwrap(); + let mut state = NodeState::new(data_dir.clone(), "diamond-mastering-space-dragon").unwrap(); // load from disk state .read_disk() - .unwrap_or(println!("Error: No node state found, creating new one...")); + .await + .unwrap_or_else(|_| error!("Error: No node state found, creating new one...")); - state.save(); + state.save().await; - println!("Node State: {:?}", state); + info!("Node State: {:?}", state); // connect to default library let database = Arc::new( @@ -213,32 +218,32 @@ impl Node { } // load library database + initialize client with db pub async fn initializer(&self) { - println!("Initializing..."); + info!("Initializing..."); let ctx = self.get_context(); - if self.state.libraries.len() == 0 { + if self.state.libraries.is_empty() { match library::create(&ctx, None).await { - Ok(library) => println!("Created new library: {:?}", library), - Err(e) => println!("Error creating library: {:?}", e), + Ok(library) => info!("Created new library: {:?}", library), + Err(e) => error!("Error creating library: {:?}", e), } } else { for library in self.state.libraries.iter() { // init database for library match library::load(&ctx, &library.library_path, &library.library_uuid).await { - Ok(library) => println!("Loaded library: {:?}", library), - Err(e) => println!("Error loading library: {:?}", e), + Ok(library) => info!("Loaded library: {:?}", library), + Err(e) => error!("Error loading library: {:?}", e), } } } // init node data within library - match node::LibraryNode::create(&self).await { - Ok(_) => println!("Spacedrive online"), - Err(e) => println!("Error initializing node: {:?}", e), + match node::LibraryNode::create(self).await { + Ok(_) => info!("Spacedrive online"), + Err(e) => error!("Error initializing node: {:?}", e), }; } async fn exec_command(&mut self, cmd: ClientCommand) -> Result { - println!("Core command: {:?}", cmd); + info!("Core command: {:?}", cmd); let ctx = self.get_context(); Ok(match cmd { // CRUD for locations @@ -299,7 +304,7 @@ impl Node { CoreResponse::Success(()) } // ClientCommand::PurgeDatabase => { - // println!("Purging database..."); + // info!("Purging database..."); // fs::remove_file(Path::new(&self.state.data_path).join("library.db")).unwrap(); // CoreResponse::Success(()) // } @@ -334,7 +339,7 @@ impl Node { location_id, limit: _, } => CoreResponse::LibGetExplorerDir( - file::explorer::open_dir(&ctx, &location_id, &path).await?, + file::explorer::open_dir(&ctx, location_id, &path).await?, ), ClientQuery::LibGetTags => todo!(), ClientQuery::JobGetRunning => { @@ -369,15 +374,15 @@ pub enum ClientCommand { TagAssign { file_id: i32, tag_id: i32 }, TagDelete { id: i32 }, // Locations - LocCreate { path: String }, + LocCreate { path: PathBuf }, LocUpdate { id: i32, name: Option }, LocDelete { id: i32 }, LocRescan { id: i32 }, // System SysVolumeUnmount { id: i32 }, - GenerateThumbsForLocation { id: i32, path: String }, + GenerateThumbsForLocation { id: i32, path: PathBuf }, // PurgeDatabase, - IdentifyUniqueFiles { id: i32, path: String }, + IdentifyUniqueFiles { id: i32, path: PathBuf }, } // represents an event this library can emit diff --git a/core/src/library/loader.rs b/core/src/library/loader.rs index c2fa47beb..da4826f7b 100644 --- a/core/src/library/loader.rs +++ b/core/src/library/loader.rs @@ -1,16 +1,20 @@ +use log::info; +use std::fmt::Debug; +use std::path::{Path, PathBuf}; use uuid::Uuid; -use crate::node::{get_nodestate, LibraryState}; -use crate::prisma::library; -use crate::util::db::{run_migrations, DatabaseError}; -use crate::CoreContext; +use crate::{ + node::{get_nodestate, LibraryState}, + prisma::library, + util::db::{run_migrations, DatabaseError}, + CoreContext, +}; pub static LIBRARY_DB_NAME: &str = "library.db"; pub static DEFAULT_NAME: &str = "My Library"; -pub fn get_library_path(data_path: &str) -> String { - let path = data_path.to_owned(); - format!("{}/{}", path, LIBRARY_DB_NAME) +pub fn get_library_path(data_path: impl AsRef) -> PathBuf { + data_path.as_ref().join(LIBRARY_DB_NAME) } // pub async fn get(core: &Node) -> Result { @@ -19,7 +23,7 @@ pub fn get_library_path(data_path: &str) -> String { // let library_state = config.get_current_library(); -// println!("{:?}", library_state); +// info!("{:?}", library_state); // // get library from db // let library = match db @@ -42,19 +46,19 @@ pub fn get_library_path(data_path: &str) -> String { pub async fn load( ctx: &CoreContext, - library_path: &str, + library_path: impl AsRef + Debug, library_id: &str, ) -> Result<(), DatabaseError> { let mut config = get_nodestate(); - println!("Initializing library: {} {}", &library_id, library_path); + info!("Initializing library: {} {:#?}", &library_id, library_path); if config.current_library_uuid != library_id { config.current_library_uuid = library_id.to_string(); - config.save(); + config.save().await; } // create connection with library database & run migrations - run_migrations(&ctx).await?; + run_migrations(ctx).await?; // if doesn't exist, mark as offline Ok(()) } @@ -64,36 +68,35 @@ pub async fn create(ctx: &CoreContext, name: Option) -> Result<(), ()> { let uuid = Uuid::new_v4().to_string(); - println!("Creating library {:?}, UUID: {:?}", name, uuid); + info!("Creating library {:?}, UUID: {:?}", name, uuid); let library_state = LibraryState { library_uuid: uuid.clone(), - library_path: get_library_path(&config.data_path), + library_path: get_library_path(config.data_path.as_ref().unwrap()), ..LibraryState::default() }; - run_migrations(&ctx).await.unwrap(); + run_migrations(ctx).await.unwrap(); config.libraries.push(library_state); config.current_library_uuid = uuid; - config.save(); + config.save().await; - let db = &ctx.database; - - let library = db + let library = ctx + .database .library() .create( library::pub_id::set(config.current_library_uuid), - library::name::set(name.unwrap_or(DEFAULT_NAME.into())), + library::name::set(name.unwrap_or_else(|| DEFAULT_NAME.into())), vec![], ) .exec() .await .unwrap(); - println!("library created in database: {:?}", library); + info!("library created in database: {:?}", library); Ok(()) } diff --git a/core/src/library/statistics.rs b/core/src/library/statistics.rs index f866999b5..91157286c 100644 --- a/core/src/library/statistics.rs +++ b/core/src/library/statistics.rs @@ -5,13 +5,14 @@ use crate::{ CoreContext, }; use fs_extra::dir::get_size; +use log::info; use serde::{Deserialize, Serialize}; -use std::fs; +use tokio::fs; use ts_rs::TS; use super::LibraryError; -#[derive(Debug, Serialize, Deserialize, TS, Clone)] +#[derive(Debug, Serialize, Deserialize, TS, Clone, Default)] #[ts(export)] pub struct Statistics { pub total_file_count: i32, @@ -23,29 +24,15 @@ pub struct Statistics { pub library_db_size: String, } -impl Into for Data { - fn into(self) -> Statistics { - Statistics { - total_file_count: self.total_file_count, - total_bytes_used: self.total_bytes_used, - total_bytes_capacity: self.total_bytes_capacity, - total_bytes_free: self.total_bytes_free, - total_unique_bytes: self.total_unique_bytes, - preview_media_bytes: self.preview_media_bytes, - library_db_size: String::new(), - } - } -} - -impl Default for Statistics { - fn default() -> Self { +impl From for Statistics { + fn from(data: Data) -> Self { Self { - total_file_count: 0, - total_bytes_used: String::new(), - total_bytes_capacity: String::new(), - total_bytes_free: String::new(), - total_unique_bytes: String::new(), - preview_media_bytes: String::new(), + total_file_count: data.total_file_count, + total_bytes_used: data.total_bytes_used, + total_bytes_capacity: data.total_bytes_capacity, + total_bytes_free: data.total_bytes_free, + total_unique_bytes: data.total_unique_bytes, + preview_media_bytes: data.preview_media_bytes, library_db_size: String::new(), } } @@ -54,33 +41,29 @@ impl Default for Statistics { impl Statistics { pub async fn retrieve(ctx: &CoreContext) -> Result { let config = get_nodestate(); - let db = &ctx.database; let library_data = config.get_current_library(); - let library_statistics_db = match db + let library_statistics_db = ctx + .database .library_statistics() .find_unique(id::equals(library_data.library_id)) .exec() .await? - { - Some(library_statistics_db) => library_statistics_db.into(), - // create the default values if database has no entry - None => Statistics::default(), - }; - Ok(library_statistics_db.into()) + .map_or_else(Default::default, Into::into); + Ok(library_statistics_db) } pub async fn calculate(ctx: &CoreContext) -> Result { let config = get_nodestate(); - let db = &ctx.database; // get library from client state let library_data = config.get_current_library(); - println!( + info!( "Calculating library statistics {:?}", library_data.library_uuid ); // get library from db - let library = db + let library = ctx + .database .library() .find_unique(library::pub_id::equals( library_data.library_uuid.to_string(), @@ -92,7 +75,8 @@ impl Statistics { return Err(LibraryError::LibraryNotFound); } - let library_statistics = db + let library_statistics = ctx + .database .library_statistics() .find_unique(id::equals(library_data.library_id)) .exec() @@ -100,9 +84,9 @@ impl Statistics { // TODO: get from database, not sys let volumes = Volume::get_volumes(); - Volume::save(&ctx).await?; + Volume::save(ctx).await?; - // println!("{:?}", volumes); + // info!("{:?}", volumes); let mut available_capacity: u64 = 0; let mut total_capacity: u64 = 0; @@ -113,14 +97,14 @@ impl Statistics { } } - let library_db_size = match fs::metadata(library_data.library_path.as_str()) { + let library_db_size = match fs::metadata(library_data.library_path).await { Ok(metadata) => metadata.len(), Err(_) => 0, }; - println!("{:?}", library_statistics); + info!("{:?}", library_statistics); - let thumbnail_folder_size = get_size(&format!("{}/{}", config.data_path, "thumbnails")); + let thumbnail_folder_size = get_size(config.data_path.unwrap().join("thumbnails")); let statistics = Statistics { library_db_size: library_db_size.to_string(), @@ -135,7 +119,8 @@ impl Statistics { None => library_data.library_id, }; - db.library_statistics() + ctx.database + .library_statistics() .upsert( library_id::equals(library_local_id), ( @@ -143,7 +128,7 @@ impl Statistics { vec![library_db_size::set(statistics.library_db_size.clone())], ), vec![ - total_file_count::set(statistics.total_file_count.clone()), + total_file_count::set(statistics.total_file_count), total_bytes_used::set(statistics.total_bytes_used.clone()), total_bytes_capacity::set(statistics.total_bytes_capacity.clone()), total_bytes_free::set(statistics.total_bytes_free.clone()), diff --git a/core/src/node/mod.rs b/core/src/node/mod.rs index 52bed0ef9..309815c97 100644 --- a/core/src/node/mod.rs +++ b/core/src/node/mod.rs @@ -4,6 +4,7 @@ use crate::{ }; use chrono::{DateTime, Utc}; use int_enum::IntEnum; +use log::info; use serde::{Deserialize, Serialize}; use std::env; use thiserror::Error; @@ -22,17 +23,24 @@ pub struct LibraryNode { pub last_seen: DateTime, } -impl Into for node::Data { - fn into(self) -> LibraryNode { - LibraryNode { - uuid: self.pub_id, - name: self.name, - platform: IntEnum::from_int(self.platform).unwrap(), - last_seen: self.last_seen.into(), +impl From for LibraryNode { + fn from(data: node::Data) -> Self { + Self { + uuid: data.pub_id, + name: data.name, + platform: IntEnum::from_int(data.platform).unwrap(), + last_seen: data.last_seen.into(), } } } +impl From> for LibraryNode { + fn from(data: Box) -> Self { + Self::from(*data) + } +} + +#[allow(clippy::upper_case_acronyms)] #[repr(i32)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, TS, Eq, PartialEq, IntEnum)] #[ts(export)] @@ -47,11 +55,9 @@ pub enum Platform { impl LibraryNode { pub async fn create(node: &Node) -> Result<(), NodeError> { - println!("Creating node..."); + info!("Creating node..."); let mut config = state::get_nodestate(); - let db = &node.database; - let hostname = match hostname::get() { Ok(hostname) => hostname.to_str().unwrap_or_default().to_owned(), Err(_) => "unknown".to_owned(), @@ -64,30 +70,31 @@ impl LibraryNode { _ => Platform::Unknown, }; - let _node = match db + let node = if let Some(node) = node + .database .node() .find_unique(node::pub_id::equals(config.node_pub_id.clone())) .exec() .await? { - Some(node) => node, - None => { - db.node() - .create( - node::pub_id::set(config.node_pub_id.clone()), - node::name::set(hostname.clone()), - vec![node::platform::set(platform as i32)], - ) - .exec() - .await? - } + node + } else { + node.database + .node() + .create( + node::pub_id::set(config.node_pub_id.clone()), + node::name::set(hostname.clone()), + vec![node::platform::set(platform as i32)], + ) + .exec() + .await? }; config.node_name = hostname; - config.node_id = _node.id; - config.save(); + config.node_id = node.id; + config.save().await; - println!("node: {:?}", &_node); + info!("node: {:?}", node); Ok(()) } diff --git a/core/src/node/state.rs b/core/src/node/state.rs index b7124686f..6892b0d37 100644 --- a/core/src/node/state.rs +++ b/core/src/node/state.rs @@ -1,8 +1,12 @@ use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; -use std::fs; -use std::io::{BufReader, Write}; +use std::path::PathBuf; use std::sync::RwLock; +use tokio::io::AsyncReadExt; +use tokio::{ + fs, + io::{AsyncWriteExt, BufReader}, +}; use ts_rs::TS; use uuid::Uuid; @@ -13,7 +17,7 @@ pub struct NodeState { pub node_id: i32, pub node_name: String, // config path is stored as struct can exist only in memory during startup and be written to disk later without supplying path - pub data_path: String, + pub data_path: Option, // the port this node uses to listen for incoming connections pub tcp_port: u32, // all the libraries loaded by this node @@ -29,7 +33,7 @@ pub static NODE_STATE_CONFIG_NAME: &str = "node_state.json"; pub struct LibraryState { pub library_uuid: String, pub library_id: i32, - pub library_path: String, + pub library_path: PathBuf, pub offline: bool, } @@ -39,49 +43,50 @@ lazy_static! { } pub fn get_nodestate() -> NodeState { - match CONFIG.read() { - Ok(guard) => guard.clone().unwrap_or(NodeState::default()), - Err(_) => return NodeState::default(), + if let Ok(guard) = CONFIG.read() { + guard.clone().unwrap_or_default() + } else { + NodeState::default() } } impl NodeState { - pub fn new(data_path: &str, node_name: &str) -> Result { + pub fn new(data_path: PathBuf, node_name: &str) -> Result { let uuid = Uuid::new_v4().to_string(); // create struct and assign defaults let config = Self { node_pub_id: uuid, - data_path: data_path.to_string(), + data_path: Some(data_path), node_name: node_name.to_string(), ..Default::default() }; Ok(config) } - pub fn save(&self) { + pub async fn save(&self) { self.write_memory(); // only write to disk if config path is set - if !&self.data_path.is_empty() { - let config_path = format!("{}/{}", &self.data_path, NODE_STATE_CONFIG_NAME); - let mut file = fs::File::create(config_path).unwrap(); + if let Some(ref data_path) = self.data_path { + let config_path = data_path.join(NODE_STATE_CONFIG_NAME); + let mut file = fs::File::create(config_path).await.unwrap(); let json = serde_json::to_string(&self).unwrap(); - file.write_all(json.as_bytes()).unwrap(); + file.write_all(json.as_bytes()).await.unwrap(); } } - pub fn read_disk(&mut self) -> Result<(), ()> { - let config_path = format!("{}/{}", &self.data_path, NODE_STATE_CONFIG_NAME); - - // open the file and parse json - match fs::File::open(config_path) { - Ok(file) => { - let reader = BufReader::new(file); - let data = serde_json::from_reader(reader).unwrap(); + pub async fn read_disk(&mut self) -> Result<(), ()> { + if let Some(ref data_path) = self.data_path { + let config_path = data_path.join(NODE_STATE_CONFIG_NAME); + // open the file and parse json + if let Ok(file) = fs::File::open(config_path).await { + let mut buf = vec![]; + let bytes = BufReader::new(file).read_to_end(&mut buf).await.unwrap(); + let data = serde_json::from_slice(&buf[..bytes]).unwrap(); // assign to self *self = data; } - _ => {} } + Ok(()) } @@ -91,17 +96,14 @@ impl NodeState { } pub fn get_current_library(&self) -> LibraryState { - match self - .libraries + self.libraries .iter() .find(|lib| lib.library_uuid == self.current_library_uuid) - { - Some(lib) => lib.clone(), - None => LibraryState::default(), - } + .cloned() + .unwrap_or_default() } - pub fn get_current_library_db_path(&self) -> String { - format!("{}/library.db", &self.get_current_library().library_path) + pub fn get_current_library_db_path(&self) -> PathBuf { + self.get_current_library().library_path.join("library.db") } } diff --git a/core/src/sys/locations.rs b/core/src/sys/locations.rs index 0b4dafac3..162c80b91 100644 --- a/core/src/sys/locations.rs +++ b/core/src/sys/locations.rs @@ -1,15 +1,22 @@ use crate::{ - encode::ThumbnailJob, file::{cas::FileIdentifierJob, indexer::IndexerJob}, node::{get_nodestate, LibraryNode}, prisma::{file_path, location}, ClientQuery, CoreContext, CoreEvent, }; -use prisma_client_rust::{raw, PrismaValue}; + +use log::info; use serde::{Deserialize, Serialize}; -use std::{fs, io, io::Write, path::Path}; +use std::fmt::Debug; +use std::path::{Path, PathBuf}; use thiserror::Error; +use tokio::io::AsyncWriteExt; +use tokio::{ + fs::{metadata, File}, + io, +}; use ts_rs::TS; +use uuid::Uuid; use super::SysError; @@ -28,25 +35,25 @@ pub struct LocationResource { pub date_created: chrono::DateTime, } -impl Into for location::Data { - fn into(mut self) -> LocationResource { +impl From for LocationResource { + fn from(data: location::Data) -> Self { LocationResource { - id: self.id, - name: self.name, - path: self.local_path, - total_capacity: self.total_capacity, - available_capacity: self.available_capacity, - is_removable: self.is_removable, - node: self.node.take().unwrap_or(None).map(|node| (*node).into()), - is_online: self.is_online, - date_created: self.date_created.into(), + id: data.id, + name: data.name, + path: data.local_path, + total_capacity: data.total_capacity, + available_capacity: data.available_capacity, + is_removable: data.is_removable, + node: data.node.unwrap_or(None).map(Into::into), + is_online: data.is_online, + date_created: data.date_created.into(), } } } #[derive(Serialize, Deserialize, Default)] pub struct DotSpacedrive { - pub location_uuid: String, + pub location_uuid: Uuid, pub library_uuid: String, } @@ -69,24 +76,25 @@ pub async fn get_location( ctx: &CoreContext, location_id: i32, ) -> Result { - let db = &ctx.database; - // get location by location_id from db and include location_paths - let location = match db + ctx.database .location() .find_unique(location::id::equals(location_id)) .exec() .await? - { - Some(location) => location, - None => Err(LocationError::NotFound(location_id.to_string()))?, - }; - Ok(location.into()) + .map(Into::into) + .ok_or_else(|| LocationError::IdNotFound(location_id).into()) } -pub fn scan_location(ctx: &CoreContext, location_id: i32, path: String) { - ctx.spawn_job(Box::new(IndexerJob { path: path.clone() })); - ctx.queue_job(Box::new(FileIdentifierJob { location_id, path })); +pub fn scan_location(ctx: &CoreContext, location_id: i32, path: impl AsRef) { + let path_buf = path.as_ref().to_path_buf(); + ctx.spawn_job(Box::new(IndexerJob { + path: path_buf.clone(), + })); + ctx.queue_job(Box::new(FileIdentifierJob { + location_id, + path: path_buf, + })); // TODO: make a way to stop jobs so this can be canceled without rebooting app // ctx.queue_job(Box::new(ThumbnailJob { // location_id, @@ -97,19 +105,18 @@ pub fn scan_location(ctx: &CoreContext, location_id: i32, path: String) { pub async fn new_location_and_scan( ctx: &CoreContext, - path: &str, + path: impl AsRef + Debug, ) -> Result { - let location = create_location(&ctx, path).await?; + let location = create_location(ctx, &path).await?; - scan_location(&ctx, location.id, path.to_string()); + scan_location(ctx, location.id, path); Ok(location) } pub async fn get_locations(ctx: &CoreContext) -> Result, SysError> { - let db = &ctx.database; - - let locations = db + let locations = ctx + .database .location() .find_many(vec![]) .with(location::node::fetch()) @@ -117,119 +124,107 @@ pub async fn get_locations(ctx: &CoreContext) -> Result, S .await?; // turn locations into LocationResource - let locations: Vec = locations - .into_iter() - .map(|location| location.into()) - .collect(); - - Ok(locations) + Ok(locations.into_iter().map(LocationResource::from).collect()) } -pub async fn create_location(ctx: &CoreContext, path: &str) -> Result { - let db = &ctx.database; - let config = get_nodestate(); +pub async fn create_location( + ctx: &CoreContext, + path: impl AsRef + Debug, +) -> Result { + let path = path.as_ref(); // check if we have access to this location - if !Path::new(path).exists() { - Err(LocationError::NotFound(path.to_string()))?; + if !path.exists() { + return Err(LocationError::PathNotFound(path.to_owned()).into()); } - // if on windows - if cfg!(target_family = "windows") { - // try and create a dummy file to see if we can write to this location - match fs::File::create(format!("{}/{}", path.clone(), ".spacewrite")) { - Ok(file) => file, - Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?, - }; - - match fs::remove_file(format!("{}/{}", path.clone(), ".spacewrite")) { - Ok(_) => (), - Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?, - } - } else { - // unix allows us to test this more directly - match fs::File::open(&path) { - Ok(_) => println!("Path is valid, creating location for '{}'", &path), - Err(e) => Err(LocationError::FileReadError(e))?, - } + if metadata(path) + .await + .map_err(|e| LocationError::DotfileReadFailure(e, path.to_owned()))? + .permissions() + .readonly() + { + return Err(LocationError::ReadonlyDotFileLocationFailure(path.to_owned()).into()); } + let path_string = path.to_string_lossy().to_string(); + // check if location already exists - let location = match db + let location_resource = if let Some(location) = ctx + .database .location() - .find_first(vec![location::local_path::equals(Some(path.to_string()))]) + .find_first(vec![location::local_path::equals(Some( + path_string.clone(), + ))]) .exec() .await? { - Some(location) => location, - None => { - println!( - "Location does not exist, creating new location for '{}'", - &path - ); - let uuid = uuid::Uuid::new_v4(); + location.into() + } else { + info!( + "Location does not exist, creating new location for '{:#?}'", + path + ); + let uuid = Uuid::new_v4(); - let p = Path::new(&path); + let config = get_nodestate(); - let location = db - .location() - .create( - location::pub_id::set(uuid.to_string()), - vec![ - location::name::set(Some( - p.file_name().unwrap().to_string_lossy().to_string(), - )), - location::is_online::set(true), - location::local_path::set(Some(path.to_string())), - location::node_id::set(Some(config.node_id)), - ], - ) - .exec() - .await?; + let location = ctx + .database + .location() + .create( + location::pub_id::set(uuid.to_string()), + vec![ + location::name::set(Some( + path.file_name().unwrap().to_string_lossy().to_string(), + )), + location::is_online::set(true), + location::local_path::set(Some(path_string)), + location::node_id::set(Some(config.node_id)), + ], + ) + .exec() + .await?; - println!("Created location: {:?}", location); + info!("Created location: {:?}", location); - // write a file called .spacedrive to path containing the location id in JSON format - let mut dotfile = match fs::File::create(format!("{}/{}", path.clone(), DOTFILE_NAME)) { - Ok(file) => file, - Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?, - }; + // write a file called .spacedrive to path containing the location id in JSON format + let mut dotfile = File::create(path.with_file_name(DOTFILE_NAME)) + .await + .map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?; - let data = DotSpacedrive { - location_uuid: uuid.to_string(), - library_uuid: config.current_library_uuid, - }; + let data = DotSpacedrive { + location_uuid: uuid, + library_uuid: config.current_library_uuid, + }; - let json = match serde_json::to_string(&data) { - Ok(json) => json, - Err(e) => Err(LocationError::DotfileSerializeFailure(e, path.to_string()))?, - }; + let json_bytes = serde_json::to_vec(&data) + .map_err(|e| LocationError::DotfileSerializeFailure(e, path.to_owned()))?; - match dotfile.write_all(json.as_bytes()) { - Ok(_) => (), - Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?, - } + dotfile + .write_all(&json_bytes) + .await + .map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?; - ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::SysGetLocations)) - .await; + ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::SysGetLocations)) + .await; - location - } + location.into() }; - Ok(location.into()) + Ok(location_resource) } pub async fn delete_location(ctx: &CoreContext, location_id: i32) -> Result<(), SysError> { - let db = &ctx.database; - - db.file_path() + ctx.database + .file_path() .find_many(vec![file_path::location_id::equals(Some(location_id))]) .delete() .exec() .await?; - db.location() + ctx.database + .location() .find_unique(location::id::equals(location_id)) .delete() .exec() @@ -238,7 +233,7 @@ pub async fn delete_location(ctx: &CoreContext, location_id: i32) -> Result<(), ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::SysGetLocations)) .await; - println!("Location {} deleted", location_id); + info!("Location {} deleted", location_id); Ok(()) } @@ -246,15 +241,21 @@ pub async fn delete_location(ctx: &CoreContext, location_id: i32) -> Result<(), #[derive(Error, Debug)] pub enum LocationError { #[error("Failed to create location (uuid {uuid:?})")] - CreateFailure { uuid: String }, - #[error("Failed to read location dotfile")] - DotfileReadFailure(io::Error), + CreateFailure { uuid: Uuid }, + #[error("Failed to read location dotfile (path: {1:?})")] + DotfileReadFailure(io::Error, PathBuf), #[error("Failed to serialize dotfile for location (at path: {1:?})")] - DotfileSerializeFailure(serde_json::Error, String), - #[error("Location not found (uuid: {1:?})")] - DotfileWriteFailure(io::Error, String), - #[error("Location not found (uuid: {0:?})")] - NotFound(String), + DotfileSerializeFailure(serde_json::Error, PathBuf), + #[error("Dotfile location is read only (at path: {0:?})")] + ReadonlyDotFileLocationFailure(PathBuf), + #[error("Failed to write dotfile (path: {1:?})")] + DotfileWriteFailure(io::Error, PathBuf), + #[error("Location not found (path: {0:?})")] + PathNotFound(PathBuf), + #[error("Location not found (uuid: {0})")] + UuidNotFound(Uuid), + #[error("Location not found (id: {0})")] + IdNotFound(i32), #[error("Failed to open file from local os")] FileReadError(io::Error), #[error("Failed to read mounted volumes from local os")] diff --git a/core/src/sys/mod.rs b/core/src/sys/mod.rs index d9565e399..8eafbf28b 100644 --- a/core/src/sys/mod.rs +++ b/core/src/sys/mod.rs @@ -11,11 +11,11 @@ use crate::{job, prisma}; #[derive(Error, Debug)] pub enum SysError { #[error("Location error")] - LocationError(#[from] LocationError), + Location(#[from] LocationError), #[error("Error with system volumes")] - VolumeError(String), + Volume(String), #[error("Error from job runner")] - JobError(#[from] job::JobError), + Job(#[from] job::JobError), #[error("Database error")] - DatabaseError(#[from] prisma::QueryError), + Database(#[from] prisma::QueryError), } diff --git a/core/src/sys/volumes.rs b/core/src/sys/volumes.rs index 6a043d991..ffb235c05 100644 --- a/core/src/sys/volumes.rs +++ b/core/src/sys/volumes.rs @@ -1,5 +1,5 @@ // use crate::native; -use crate::{node::get_nodestate, prisma::volume::*}; +use crate::{node::get_nodestate, prisma::volume::*, CoreContext}; use serde::{Deserialize, Serialize}; use ts_rs::TS; // #[cfg(not(target_os = "macos"))] @@ -7,8 +7,6 @@ use std::process::Command; // #[cfg(not(target_os = "macos"))] use sysinfo::{DiskExt, System, SystemExt}; -use crate::CoreContext; - use super::SysError; #[derive(Serialize, Deserialize, Debug, Default, Clone, TS)] @@ -27,17 +25,17 @@ pub struct Volume { impl Volume { pub async fn save(ctx: &CoreContext) -> Result<(), SysError> { - let db = &ctx.database; let config = get_nodestate(); let volumes = Self::get_volumes()?; // enter all volumes associate with this client add to db for volume in volumes { - db.volume() + ctx.database + .volume() .upsert( node_id_mount_point_name( - config.node_id.clone(), + config.node_id, volume.mount_point.to_string(), volume.name.to_string(), ), @@ -67,7 +65,7 @@ impl Volume { Ok(()) } pub fn get_volumes() -> Result, SysError> { - let all_volumes: Vec = System::new_all() + Ok(System::new_all() .disks() .iter() .map(|disk| { @@ -123,15 +121,8 @@ impl Volume { is_root_filesystem: mount_point == "/", } }) - .collect(); - - let volumes = all_volumes - .clone() - .into_iter() .filter(|volume| !volume.mount_point.starts_with("/System")) - .collect(); - - Ok(volumes) + .collect()) } } diff --git a/core/src/util/db.rs b/core/src/util/db.rs index d0b31b2c8..e299c7b94 100644 --- a/core/src/util/db.rs +++ b/core/src/util/db.rs @@ -2,10 +2,13 @@ use crate::prisma::{self, migration, PrismaClient}; use crate::CoreContext; use data_encoding::HEXLOWER; use include_dir::{include_dir, Dir}; +use log::{error, info}; use prisma_client_rust::raw; use ring::digest::{Context, Digest, SHA256}; use std::ffi::OsStr; +use std::fmt::Debug; use std::io::{self, BufReader, Read}; +use std::path::Path; use thiserror::Error; const INIT_MIGRATION: &str = include_str!("../../prisma/migrations/migration_table/migration.sql"); @@ -17,9 +20,12 @@ pub enum DatabaseError { ClientError(#[from] prisma::NewClientError), } -pub async fn create_connection(path: &str) -> Result { - println!("Creating database connection: {:?}", path); - let client = prisma::new_client_with_url(&format!("file:{}", &path)).await?; +pub async fn create_connection( + path: impl AsRef + Debug, +) -> Result { + info!("Creating database connection: {:?}", path); + let client = + prisma::new_client_with_url(&format!("file:{}", path.as_ref().to_string_lossy())).await?; Ok(client) } @@ -47,12 +53,12 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> { .await { Ok(data) => { - if data.len() == 0 { + if data.is_empty() { // execute migration match client._execute_raw(raw!(INIT_MIGRATION)).await { Ok(_) => {} Err(e) => { - println!("Failed to create migration table: {}", e); + info!("Failed to create migration table: {}", e); } }; @@ -64,7 +70,7 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> { .unwrap(); #[cfg(debug_assertions)] - println!("Migration table created: {:?}", value); + info!("Migration table created: {:?}", value); } let mut migration_subdirs = MIGRATIONS_DIR @@ -89,7 +95,7 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> { }); for subdir in migration_subdirs { - println!("{:?}", subdir.path()); + info!("{:?}", subdir.path()); let migration_file = subdir .get_file(subdir.path().join("./migration.sql")) .unwrap(); @@ -110,9 +116,9 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> { if existing_migration.is_none() { #[cfg(debug_assertions)] - println!("Running migration: {}", name); + info!("Running migration: {}", name); - let steps = migration_sql.split(";").collect::>(); + let steps = migration_sql.split(';').collect::>(); let steps = &steps[0..steps.len() - 1]; client @@ -138,15 +144,15 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<(), DatabaseError> { .unwrap(); } Err(e) => { - println!("Error running migration: {}", name); - println!("{}", e); + error!("Error running migration: {}", name); + error!("{:?}", e); break; } } } #[cfg(debug_assertions)] - println!("Migration {} recorded successfully", name); + info!("Migration {} recorded successfully", name); } } } From db1cbf0ffef964610aca059c6f26f9eb2dae13ff Mon Sep 17 00:00:00 2001 From: Ericson Fogo Soares Date: Wed, 29 Jun 2022 14:46:47 -0300 Subject: [PATCH 2/4] Removing tokio::spawn_blocking from thumbnail generation --- core/src/encode/thumb.rs | 117 ++++++++++++++++++--------------------- 1 file changed, 55 insertions(+), 62 deletions(-) diff --git a/core/src/encode/thumb.rs b/core/src/encode/thumb.rs index d9d23d587..989859506 100644 --- a/core/src/encode/thumb.rs +++ b/core/src/encode/thumb.rs @@ -4,10 +4,10 @@ use crate::{ prisma::file_path, sys, CoreContext, CoreEvent, }; -use futures::executor::block_on; use image::{self, imageops, DynamicImage, GenericImageView}; use log::{error, info}; use std::error::Error; +use std::ops::Deref; use std::path::{Path, PathBuf}; use tokio::fs; use webp::Encoder; @@ -55,72 +55,62 @@ impl Job for ThumbnailJob { let image_files = get_images(&ctx.core_ctx, self.location_id, &self.path).await?; info!("Found {:?} files", image_files.len()); - let is_background = self.background; + ctx.progress(vec![ + JobReportUpdate::TaskCount(image_files.len()), + JobReportUpdate::Message(format!("Preparing to process {} files", image_files.len())), + ]); - tokio::task::spawn_blocking(move || { - ctx.progress(vec![ - JobReportUpdate::TaskCount(image_files.len()), - JobReportUpdate::Message(format!( - "Preparing to process {} files", - image_files.len() - )), - ]); + for (i, image_file) in image_files.iter().enumerate() { + ctx.progress(vec![JobReportUpdate::Message(format!( + "Processing {}", + image_file.materialized_path.clone() + ))]); - for (i, image_file) in image_files.iter().enumerate() { - ctx.progress(vec![JobReportUpdate::Message(format!( - "Processing {}", - image_file.materialized_path.clone() - ))]); + // assemble the file path + let path = Path::new(&root_path).join(&image_file.materialized_path); + error!("image_file {:?}", image_file); - // assemble the file path - let path = Path::new(&root_path).join(&image_file.materialized_path); - error!("image_file {:?}", image_file); - - // get cas_id, if none found skip - let cas_id = match image_file.file() { - Ok(file) => { - if let Some(f) = file { - f.cas_id.clone() - } else { - continue; - } - } - Err(_) => { - error!("Error getting cas_id {:?}", image_file.materialized_path); + // get cas_id, if none found skip + let cas_id = match image_file.file() { + Ok(file) => { + if let Some(f) = file { + f.cas_id.clone() + } else { continue; } - }; - - // Define and write the WebP-encoded file to a given path - let output_path = config - .data_path - .as_ref() - .unwrap() - .join(THUMBNAIL_CACHE_DIR_NAME) - .join(format!("{}", location.id)) - .join(&cas_id) - .with_extension("webp"); - - // check if file exists at output path - if !output_path.exists() { - info!("Writing {:?} to {:?}", path, output_path); - block_on(generate_thumbnail(&path, &output_path)) - .map_err(|e| { - info!("Error generating thumb {:?}", e); - }) - .unwrap_or(()); - - ctx.progress(vec![JobReportUpdate::CompletedTaskCount(i + 1)]); - - if !is_background { - block_on(ctx.core_ctx.emit(CoreEvent::NewThumbnail { cas_id })); - }; - } else { - info!("Thumb exists, skipping... {}", output_path.display()); } + Err(_) => { + error!("Error getting cas_id {:?}", image_file.materialized_path); + continue; + } + }; + + // Define and write the WebP-encoded file to a given path + let output_path = config + .data_path + .as_ref() + .unwrap() + .join(THUMBNAIL_CACHE_DIR_NAME) + .join(format!("{}", location.id)) + .join(&cas_id) + .with_extension("webp"); + + // check if file exists at output path + if !output_path.exists() { + info!("Writing {:?} to {:?}", path, output_path); + if let Err(e) = generate_thumbnail(&path, &output_path).await { + error!("Error generating thumb {:?}", e); + } + + ctx.progress(vec![JobReportUpdate::CompletedTaskCount(i + 1)]); + + if !self.background { + ctx.core_ctx.emit(CoreEvent::NewThumbnail { cas_id }).await; + }; + } else { + info!("Thumb exists, skipping... {}", output_path.display()); } - }) - .await?; + } Ok(()) } @@ -144,9 +134,12 @@ pub async fn generate_thumbnail>( let encoder = Encoder::from_image(&img)?; // Encode the image at a specified quality 0-100 - let webp = encoder.encode(THUMBNAIL_QUALITY); - fs::write(output_path, &*webp).await?; + // Type WebPMemory is !Send, which makes the Future in this function !Send, + // this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec + // which implies on a unwanted clone... + let webp = encoder.encode(THUMBNAIL_QUALITY).deref().to_owned(); + fs::write(output_path, &webp).await?; Ok(()) } From 149a2e8d2cd12bdb62c680e555199f715ffecaeb Mon Sep 17 00:00:00 2001 From: Ericson Fogo Soares Date: Thu, 30 Jun 2022 00:04:37 -0300 Subject: [PATCH 3/4] Removing tokio::spawn_blocking from identifier job * Some small optimizations to await many queries concurrently --- core/src/file/cas/identifier.rs | 248 +++++++++++++++++--------------- 1 file changed, 131 insertions(+), 117 deletions(-) diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index de9a54483..855352385 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -8,11 +8,11 @@ use crate::{ CoreContext, }; use chrono::{DateTime, FixedOffset}; -use futures::executor::block_on; -use log::info; +use futures::future::join_all; +use log::{error, info}; use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::error::Error; use std::path::{Path, PathBuf}; use tokio::{fs, io}; @@ -50,138 +50,154 @@ impl Job for FileIdentifierJob { // update job with total task count based on orphan file_paths count ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]); - let db = ctx.core_ctx.database.clone(); - // dedicated tokio thread for task - let _ctx = tokio::task::spawn_blocking(move || { - let mut completed: usize = 0; - let mut cursor: i32 = 1; - // loop until task count is complete - while completed < task_count { - // link file_path ids to a CreateFile struct containing unique file data - let mut chunk: HashMap = HashMap::new(); - let mut cas_lookup: HashMap = HashMap::new(); + let mut completed: usize = 0; + let mut cursor: i32 = 1; + // loop until task count is complete + while completed < task_count { + // link file_path ids to a CreateFile struct containing unique file data + let mut chunk: HashMap = HashMap::new(); + let mut cas_lookup: HashMap = HashMap::new(); - // get chunk of orphans to process - let file_paths = match block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)) { - Ok(file_paths) => file_paths, + // get chunk of orphans to process + let file_paths = match get_orphan_file_paths(&ctx.core_ctx, cursor).await { + Ok(file_paths) => file_paths, + Err(e) => { + info!("Error getting orphan file paths: {:#?}", e); + continue; + } + }; + info!( + "Processing {:?} orphan files. ({} completed of {})", + file_paths.len(), + completed, + task_count + ); + + // analyze each file_path + for file_path in &file_paths { + // get the cas_id and extract metadata + match prepare_file(&location_path, file_path).await { + Ok(file) => { + let cas_id = file.cas_id.clone(); + // create entry into chunks for created file data + chunk.insert(file_path.id, file); + cas_lookup.insert(cas_id, file_path.id); + } Err(e) => { - info!("Error getting orphan file paths: {:#?}", e); + info!("Error processing file: {:#?}", e); continue; } }; - info!( - "Processing {:?} orphan files. ({} completed of {})", - file_paths.len(), - completed, - task_count - ); + } - // analyze each file_path - for file_path in file_paths.iter() { - // get the cas_id and extract metadata - match block_on(prepare_file(&location_path, file_path)) { - Ok(file) => { - let cas_id = file.cas_id.clone(); - // create entry into chunks for created file data - chunk.insert(file_path.id, file); - cas_lookup.insert(cas_id, file_path.id); - } - Err(e) => { - info!("Error processing file: {:#?}", e); - continue; - } - }; + // find all existing files by cas id + let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); + let existing_files = ctx + .core_ctx + .database + .file() + .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) + .exec() + .await?; + + info!("Found {} existing files", existing_files.len()); + + // link those existing files to their file paths + // Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker + let prisma_file_path = ctx.core_ctx.database.file_path(); + for result in join_all(existing_files.iter().map(|file| { + prisma_file_path + .find_unique(file_path::id::equals( + *cas_lookup.get(&file.cas_id).unwrap(), + )) + .update(vec![file_path::file_id::set(Some(file.id))]) + .exec() + })) + .await + { + if let Err(e) = result { + error!("Error linking file: {:#?}", e); } + } - // find all existing files by cas id - let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect(); - let existing_files: Vec = block_on( - db.file() - .find_many(vec![file::cas_id::in_vec(generated_cas_ids)]) - .exec(), - ) - .unwrap(); - info!("Found {} existing files", existing_files.len()); + let existing_files_cas_ids = existing_files + .iter() + .map(|file| file.cas_id.clone()) + .collect::>(); - // link those existing files to their file paths - for file in existing_files.iter() { - let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); - block_on( - db.file_path() - .find_unique(file_path::id::equals(*file_path_id)) - .update(vec![file_path::file_id::set(Some(file.id))]) - .exec(), - ) - .unwrap(); - } + // extract files that don't already exist in the database + let new_files = chunk + .iter() + .map(|(_id, create_file)| create_file) + .filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id)) + .collect::>(); - // extract files that don't already exist in the database - let new_files: Vec<&CreateFile> = chunk - .iter() - .map(|(_, c)| c) - .filter(|c| !existing_files.iter().any(|d| d.cas_id == c.cas_id)) - .collect(); + // assemble prisma values for new unique files + let mut values: Vec = Vec::new(); + for file in &new_files { + values.extend([ + PrismaValue::String(file.cas_id.clone()), + PrismaValue::Int(file.size_in_bytes), + PrismaValue::DateTime(file.date_created), + ]); + } - // assemble prisma values for new unique files - let mut values: Vec = Vec::new(); - for file in new_files.iter() { - values.extend([ - PrismaValue::String(file.cas_id.clone()), - PrismaValue::Int(file.size_in_bytes), - PrismaValue::DateTime(file.date_created), - ]); - } - - // create new file records with assembled values - let created_files: Vec = block_on(db._query_raw(Raw::new( + // create new file records with assembled values + let created_files: Vec = ctx + .core_ctx + .database + ._query_raw(Raw::new( &format!( "INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {} ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", vec!["({}, {}, {})"; new_files.len()].join(",") ), values, - ))) + )) + .await .unwrap_or_else(|e| { - info!("Error inserting files: {:#?}", e); + error!("Error inserting files: {:#?}", e); Vec::new() }); + // This code is duplicates, is this right? + for result in join_all(created_files.iter().map(|file| { // associate newly created files with their respective file_paths - for file in created_files.iter() { - // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk - // - insert many could work, but I couldn't find a good way to do this in a single SQL query - let file_path_id = cas_lookup.get(&file.cas_id).unwrap(); - block_on( - db.file_path() - .find_unique(file_path::id::equals(*file_path_id)) - .update(vec![file_path::file_id::set(Some(file.id))]) - .exec(), - ) - .unwrap(); + // TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk + // - insert many could work, but I couldn't find a good way to do this in a single SQL query + prisma_file_path + .find_unique(file_path::id::equals( + *cas_lookup.get(&file.cas_id).unwrap(), + )) + .update(vec![file_path::file_id::set(Some(file.id))]) + .exec() + })) + .await + { + if let Err(e) = result { + error!("Error linking file: {:#?}", e); } - - // handle loop end - let last_row = match file_paths.last() { - Some(l) => l, - None => { - break; - } - }; - cursor = last_row.id; - completed += 1; - - ctx.progress(vec![ - JobReportUpdate::CompletedTaskCount(completed), - JobReportUpdate::Message(format!( - "Processed {} of {} orphan files", - completed * CHUNK_SIZE, - total_count - )), - ]); } - ctx - }) - .await?; + + // handle loop end + let last_row = match file_paths.last() { + Some(l) => l, + None => { + break; + } + }; + cursor = last_row.id; + completed += 1; + + ctx.progress(vec![ + JobReportUpdate::CompletedTaskCount(completed), + JobReportUpdate::Message(format!( + "Processed {} of {} orphan files", + completed * CHUNK_SIZE, + total_count + )), + ]); + } // let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?; Ok(()) @@ -197,8 +213,7 @@ pub async fn count_orphan_file_paths( ctx: &CoreContext, location_id: i64, ) -> Result { - let db = &ctx.database; - let files_count = db + let files_count = ctx.database ._query_raw::(raw!( "SELECT COUNT(*) AS count FROM file_paths WHERE file_id IS NULL AND is_dir IS FALSE AND location_id = {}", PrismaValue::Int(location_id) @@ -211,12 +226,11 @@ pub async fn get_orphan_file_paths( ctx: &CoreContext, cursor: i32, ) -> Result, FileError> { - let db = &ctx.database; info!( "discovering {} orphan file paths at cursor: {:?}", CHUNK_SIZE, cursor ); - let files = db + ctx.database .file_path() .find_many(vec![ file_path::file_id::equals(None), @@ -226,8 +240,8 @@ pub async fn get_orphan_file_paths( .cursor(file_path::id::cursor(cursor)) .take(CHUNK_SIZE as i64) .exec() - .await?; - Ok(files) + .await + .map_err(|e| e.into()) } #[derive(Deserialize, Serialize, Debug)] From d56aa62027ec2d3adaa99110d992f90c30f2d5f5 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Sun, 17 Jul 2022 19:58:21 +0800 Subject: [PATCH 4/4] fix incorrect import in Typescript --- .../interface/src/components/layout/TopBar.tsx | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/interface/src/components/layout/TopBar.tsx b/packages/interface/src/components/layout/TopBar.tsx index 151a880c3..4e527ca55 100644 --- a/packages/interface/src/components/layout/TopBar.tsx +++ b/packages/interface/src/components/layout/TopBar.tsx @@ -1,5 +1,5 @@ import { ChevronLeftIcon, ChevronRightIcon } from '@heroicons/react/outline'; -import { useLibraryCommand } from '@sd/client'; +import { AppPropsContext, useLibraryCommand } from '@sd/client'; import { useExplorerStore } from '@sd/client'; import { Dropdown } from '@sd/ui'; import clsx from 'clsx'; @@ -15,7 +15,6 @@ import { } from 'phosphor-react'; import React, { DetailedHTMLProps, HTMLAttributes, useContext } from 'react'; import { useNavigate } from 'react-router-dom'; -import { AppPropsContext } from '../../AppPropsContext'; import { Shortcut } from '../primitive/Shortcut'; import { DefaultProps } from '../primitive/types'; @@ -51,22 +50,27 @@ const TopBarButton: React.FC = ({ icon: Icon, ...props }) => ); }; -const SearchBar: React.FC = (props) => { //TODO: maybe pass the appProps, so we can have the context in the TopBar if needed again +const SearchBar: React.FC = (props) => { + //TODO: maybe pass the appProps, so we can have the context in the TopBar if needed again const appProps = useContext(AppPropsContext); - return ( + return (
- + {/* */}
); -} +}; export const TopBar: React.FC = (props) => { const { locationId } = useExplorerStore();