Merge pull request #319 from fogodev/main

Fixing Clippy warnings
This commit is contained in:
Oscar Beaumont
2022-07-19 00:36:16 +08:00
committed by GitHub
25 changed files with 616 additions and 624 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -24,10 +24,10 @@ ring = "0.17.0-alpha.10"
int-enum = "0.4.0"
# Project dependencies
ts-rs = { version = "6.1", features = ["chrono-impl", "uuid-impl"] }
ts-rs = { version = "6.2", features = ["chrono-impl", "uuid-impl", "serde-compat"] }
prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust.git", tag = "0.5.0" }
walkdir = "^2.3.2"
uuid = "0.8"
uuid = { version = "^0.8.2", features = ["v4", "serde"]}
sysinfo = "0.23.9"
thiserror = "1.0.30"
core-derive = { path = "./derive" }

View File

@@ -1,5 +1,4 @@
extern crate ffmpeg_next as ffmpeg;
use ffmpeg::format;
use ffmpeg_next::format;
#[derive(Default, Debug)]
pub struct MediaItem {
@@ -125,10 +124,10 @@ pub struct AudioStream {
// }
// media_item.steams.push(stream_item);
// }
// println!("{:#?}", media_item);
// info!("{:#?}", media_item);
// }
// Err(error) => println!("error: {}", error),
// Err(error) => error!("error: {}", error),
// }
// Ok(())
// }

View File

@@ -1,21 +1,21 @@
use crate::job::{JobReportUpdate, JobResult};
use crate::library::LibraryContext;
use crate::{
job::{Job, WorkerContext},
job::{Job, JobReportUpdate, JobResult, WorkerContext},
library::LibraryContext,
prisma::file_path,
sys, CoreEvent,
};
use crate::{sys, CoreEvent};
use futures::executor::block_on;
use image::*;
use log::{error, info};
use std::fs;
use image::{self, imageops, DynamicImage, GenericImageView};
use log::{debug, error, info};
use std::error::Error;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use webp::*;
use tokio::fs;
use webp::Encoder;
#[derive(Debug, Clone)]
pub struct ThumbnailJob {
pub location_id: i32,
pub path: String,
pub path: PathBuf,
pub background: bool,
}
@@ -30,116 +30,113 @@ impl Job for ThumbnailJob {
}
async fn run(&self, ctx: WorkerContext) -> JobResult {
let library_ctx = ctx.library_ctx();
let thumbnail_dir = Path::new(&library_ctx.config().data_directory())
let thumbnail_dir = library_ctx
.config()
.data_directory()
.join(THUMBNAIL_CACHE_DIR_NAME)
.join(format!("{}", self.location_id));
.join(self.location_id.to_string());
let location = sys::get_location(&library_ctx, self.location_id).await?;
info!(
"Searching for images in location {} at path {}",
location.id, self.path
);
info!(
"Searching for images in location {} at path {}",
"Searching for images in location {} at path {:#?}",
location.id, self.path
);
// create all necessary directories if they don't exist
fs::create_dir_all(&thumbnail_dir)?;
fs::create_dir_all(&thumbnail_dir).await?;
let root_path = location.path.unwrap();
// query database for all files in this location that need thumbnails
let image_files = get_images(&library_ctx, self.location_id, &self.path).await?;
info!("Found {:?} files", image_files.len());
let is_background = self.background.clone();
tokio::task::spawn_blocking(move || {
ctx.progress(vec![
JobReportUpdate::TaskCount(image_files.len()),
JobReportUpdate::Message(format!(
"Preparing to process {} files",
image_files.len()
)),
]);
ctx.progress(vec![
JobReportUpdate::TaskCount(image_files.len()),
JobReportUpdate::Message(format!("Preparing to process {} files", image_files.len())),
]);
for (i, image_file) in image_files.iter().enumerate() {
ctx.progress(vec![JobReportUpdate::Message(format!(
"Processing {}",
image_file.materialized_path.clone()
))]);
for (i, image_file) in image_files.iter().enumerate() {
ctx.progress(vec![JobReportUpdate::Message(format!(
"Processing {}",
image_file.materialized_path
))]);
// assemble the file path
let path = Path::new(&root_path).join(&image_file.materialized_path);
error!("image_file {:?}", image_file);
// assemble the file path
let path = Path::new(&root_path).join(&image_file.materialized_path);
debug!("image_file {:?}", image_file);
// get cas_id, if none found skip
let cas_id = match image_file.file() {
Ok(file) => {
if let Some(f) = file {
f.cas_id.clone()
} else {
continue;
}
}
Err(_) => {
error!("Error getting cas_id {:?}", image_file.materialized_path);
// get cas_id, if none found skip
let cas_id = match image_file.file() {
Ok(file) => {
if let Some(f) = file {
f.cas_id.clone()
} else {
info!(
"skipping thumbnail generation for {}",
image_file.materialized_path
);
continue;
}
};
// Define and write the WebP-encoded file to a given path
let output_path = Path::new(&thumbnail_dir)
.join(&cas_id)
.with_extension("webp");
// check if file exists at output path
if !output_path.exists() {
info!("Writing {:?} to {:?}", path, output_path);
generate_thumbnail(&path, &output_path)
.map_err(|e| {
info!("Error generating thumb {:?}", e);
})
.unwrap_or(());
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(i + 1)]);
if !is_background {
block_on(ctx.library_ctx().emit(CoreEvent::NewThumbnail { cas_id }));
};
} else {
info!("Thumb exists, skipping... {}", output_path.display());
}
Err(_) => {
error!("Error getting cas_id {:?}", image_file.materialized_path);
continue;
}
};
// Define and write the WebP-encoded file to a given path
let output_path = thumbnail_dir.join(&cas_id).with_extension("webp");
// check if file exists at output path
if !output_path.exists() {
info!("Writing {:?} to {:?}", path, output_path);
tokio::spawn(async move {
if let Err(e) = generate_thumbnail(&path, &output_path).await {
error!("Error generating thumb {:?}", e);
}
});
ctx.progress(vec![JobReportUpdate::CompletedTaskCount(i + 1)]);
if !self.background {
ctx.library_ctx()
.emit(CoreEvent::NewThumbnail { cas_id })
.await;
};
} else {
info!("Thumb exists, skipping... {}", output_path.display());
}
})
.await?;
}
Ok(())
}
}
pub fn generate_thumbnail(
file_path: &PathBuf,
output_path: &PathBuf,
) -> Result<(), Box<dyn std::error::Error>> {
pub async fn generate_thumbnail<P: AsRef<Path>>(
file_path: P,
output_path: P,
) -> Result<(), Box<dyn Error>> {
// Using `image` crate, open the included .jpg file
let img = image::open(file_path)?;
let (w, h) = img.dimensions();
// Optionally, resize the existing photo and convert back into DynamicImage
let img: DynamicImage = image::DynamicImage::ImageRgba8(imageops::resize(
let img = DynamicImage::ImageRgba8(imageops::resize(
&img,
(w as f32 * THUMBNAIL_SIZE_FACTOR) as u32,
(h as f32 * THUMBNAIL_SIZE_FACTOR) as u32,
imageops::FilterType::Triangle,
));
// Create the WebP encoder for the above image
let encoder: Encoder = Encoder::from_image(&img)?;
let encoder = Encoder::from_image(&img)?;
// Encode the image at a specified quality 0-100
let webp: WebPMemory = encoder.encode(THUMBNAIL_QUALITY);
std::fs::write(&output_path, &*webp)?;
// Type WebPMemory is !Send, which makes the Future in this function !Send,
// this make us `deref` to have a `&[u8]` and then `to_owned` to make a Vec<u8>
// which implies on a unwanted clone...
let webp = encoder.encode(THUMBNAIL_QUALITY).deref().to_owned();
fs::write(output_path, &webp).await?;
Ok(())
}
@@ -147,7 +144,7 @@ pub fn generate_thumbnail(
pub async fn get_images(
ctx: &LibraryContext,
location_id: i32,
path: &str,
path: impl AsRef<Path>,
) -> Result<Vec<file_path::Data>, std::io::Error> {
let mut params = vec![
file_path::location_id::equals(Some(location_id)),
@@ -160,8 +157,10 @@ pub async fn get_images(
]),
];
if !path.is_empty() {
params.push(file_path::materialized_path::starts_with(path.to_string()))
let path_str = path.as_ref().to_string_lossy().to_string();
if !path_str.is_empty() {
params.push(file_path::materialized_path::starts_with(path_str))
}
let image_files = ctx

View File

@@ -1,36 +1,27 @@
use data_encoding::HEXLOWER;
use ring::digest::{Context, SHA256};
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::path::PathBuf;
use tokio::{
fs::File,
io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom},
};
static SAMPLE_COUNT: u64 = 4;
static SAMPLE_SIZE: u64 = 10000;
fn read_at(file: &File, offset: u64, size: u64) -> Result<Vec<u8>, io::Error> {
async fn read_at(file: &mut File, offset: u64, size: u64) -> Result<Vec<u8>, io::Error> {
let mut buf = vec![0u8; size as usize];
#[cfg(target_family = "unix")]
{
use std::os::unix::prelude::*;
file.read_exact_at(&mut buf, offset)?;
}
#[cfg(target_family = "windows")]
{
use std::os::windows::prelude::*;
file.seek_read(&mut buf, offset)?;
}
file.seek(SeekFrom::Start(offset)).await?;
file.read_exact(&mut buf).await?;
Ok(buf)
}
pub fn generate_cas_id(path: PathBuf, size: u64) -> Result<String, io::Error> {
pub async fn generate_cas_id(path: PathBuf, size: u64) -> Result<String, io::Error> {
// open file reference
let file = File::open(path)?;
let mut file = File::open(path).await?;
let mut context = Context::new(&SHA256);
@@ -39,20 +30,16 @@ pub fn generate_cas_id(path: PathBuf, size: u64) -> Result<String, io::Error> {
// if size is small enough, just read the whole thing
if SAMPLE_COUNT * SAMPLE_SIZE > size {
let buf = read_at(&file, 0, size.try_into().unwrap())?;
let buf = read_at(&mut file, 0, size).await?;
context.update(&buf);
} else {
// loop over samples
for i in 0..SAMPLE_COUNT {
let buf = read_at(
&file,
(size / SAMPLE_COUNT) * i,
SAMPLE_SIZE.try_into().unwrap(),
)?;
let buf = read_at(&mut file, (size / SAMPLE_COUNT) * i, SAMPLE_SIZE).await?;
context.update(&buf);
}
// sample end of file
let buf = read_at(&file, size - SAMPLE_SIZE, SAMPLE_SIZE.try_into().unwrap())?;
let buf = read_at(&mut file, size - SAMPLE_SIZE, SAMPLE_SIZE).await?;
context.update(&buf);
}

View File

@@ -8,13 +8,13 @@ use crate::{
sys::get_location,
};
use chrono::{DateTime, FixedOffset};
use futures::executor::block_on;
use log::info;
use futures::future::join_all;
use log::{error, info};
use prisma_client_rust::{prisma_models::PrismaValue, raw, raw::Raw, Direction};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::{fs, io};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use tokio::{fs, io};
// FileIdentifierJob takes file_paths without a file_id and uniquely identifies them
// first: generating the cas_id and extracting metadata
@@ -22,7 +22,7 @@ use std::{fs, io};
#[derive(Debug)]
pub struct FileIdentifierJob {
pub location_id: i32,
pub path: String,
pub path: PathBuf,
}
// we break this job into chunks of 100 to improve performance
@@ -38,7 +38,7 @@ impl Job for FileIdentifierJob {
info!("Identifying orphan file paths...");
let location = get_location(&ctx.library_ctx(), self.location_id).await?;
let location_path = location.path.unwrap_or("".to_string());
let location_path = location.path.unwrap_or_else(|| "".to_string());
let total_count = count_orphan_file_paths(&ctx.library_ctx(), location.id.into()).await?;
info!("Found {} orphan file paths", total_count);
@@ -49,138 +49,155 @@ impl Job for FileIdentifierJob {
// update job with total task count based on orphan file_paths count
ctx.progress(vec![JobReportUpdate::TaskCount(task_count)]);
// dedicated tokio thread for task
let _ctx = tokio::task::spawn_blocking(move || {
let db = ctx.library_ctx().db;
let mut completed: usize = 0;
let mut cursor: i32 = 1;
// loop until task count is complete
while completed < task_count {
// link file_path ids to a CreateFile struct containing unique file data
let mut chunk: HashMap<i32, CreateFile> = HashMap::new();
let mut cas_lookup: HashMap<String, i32> = HashMap::new();
let mut completed: usize = 0;
let mut cursor: i32 = 1;
// loop until task count is complete
while completed < task_count {
// link file_path ids to a CreateFile struct containing unique file data
let mut chunk: HashMap<i32, CreateFile> = HashMap::new();
let mut cas_lookup: HashMap<String, i32> = HashMap::new();
// get chunk of orphans to process
let file_paths = match block_on(get_orphan_file_paths(&ctx.library_ctx(), cursor)) {
Ok(file_paths) => file_paths,
// get chunk of orphans to process
let file_paths = match get_orphan_file_paths(&ctx.library_ctx(), cursor).await {
Ok(file_paths) => file_paths,
Err(e) => {
info!("Error getting orphan file paths: {:#?}", e);
continue;
}
};
info!(
"Processing {:?} orphan files. ({} completed of {})",
file_paths.len(),
completed,
task_count
);
// analyze each file_path
for file_path in &file_paths {
// get the cas_id and extract metadata
match prepare_file(&location_path, file_path).await {
Ok(file) => {
let cas_id = file.cas_id.clone();
// create entry into chunks for created file data
chunk.insert(file_path.id, file);
cas_lookup.insert(cas_id, file_path.id);
}
Err(e) => {
info!("Error getting orphan file paths: {}", e);
info!("Error processing file: {:#?}", e);
continue;
}
};
info!(
"Processing {:?} orphan files. ({} completed of {})",
file_paths.len(),
completed,
task_count
);
}
// analyze each file_path
for file_path in file_paths.iter() {
// get the cas_id and extract metadata
match prepare_file(&location_path, file_path) {
Ok(file) => {
let cas_id = file.cas_id.clone();
// create entry into chunks for created file data
chunk.insert(file_path.id, file);
cas_lookup.insert(cas_id, file_path.id);
}
Err(e) => {
info!("Error processing file: {}", e);
continue;
}
};
// find all existing files by cas id
let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect();
let existing_files = ctx
.library_ctx()
.db
.file()
.find_many(vec![file::cas_id::in_vec(generated_cas_ids)])
.exec()
.await?;
info!("Found {} existing files", existing_files.len());
// link those existing files to their file paths
// Had to put the file_path in a variable outside of the closure, to satisfy the borrow checker
let library_ctx = ctx.library_ctx();
let prisma_file_path = library_ctx.db.file_path();
for result in join_all(existing_files.iter().map(|file| {
prisma_file_path
.find_unique(file_path::id::equals(
*cas_lookup.get(&file.cas_id).unwrap(),
))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec()
}))
.await
{
if let Err(e) = result {
error!("Error linking file: {:#?}", e);
}
}
// find all existing files by cas id
let generated_cas_ids = chunk.values().map(|c| c.cas_id.clone()).collect();
let existing_files: Vec<file::Data> = block_on(
db.file()
.find_many(vec![file::cas_id::in_vec(generated_cas_ids)])
.exec(),
)
.unwrap();
info!("Found {} existing files", existing_files.len());
let existing_files_cas_ids = existing_files
.iter()
.map(|file| file.cas_id.clone())
.collect::<HashSet<_>>();
// link those existing files to their file paths
for file in existing_files.iter() {
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
block_on(
db.file_path()
.find_unique(file_path::id::equals(file_path_id.clone()))
.update(vec![file_path::file_id::set(Some(file.id.clone()))])
.exec(),
)
.unwrap();
}
// extract files that don't already exist in the database
let new_files = chunk
.iter()
.map(|(_id, create_file)| create_file)
.filter(|create_file| !existing_files_cas_ids.contains(&create_file.cas_id))
.collect::<Vec<_>>();
// extract files that don't already exist in the database
let new_files: Vec<&CreateFile> = chunk
.iter()
.map(|(_, c)| c)
.filter(|c| !existing_files.iter().any(|d| d.cas_id == c.cas_id))
.collect();
// assemble prisma values for new unique files
let mut values = Vec::with_capacity(new_files.len() * 3);
for file in &new_files {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes),
PrismaValue::DateTime(file.date_created),
]);
}
// assemble prisma values for new unique files
let mut values: Vec<PrismaValue> = Vec::new();
for file in new_files.iter() {
values.extend([
PrismaValue::String(file.cas_id.clone()),
PrismaValue::Int(file.size_in_bytes.clone()),
PrismaValue::DateTime(file.date_created.clone()),
]);
}
// create new file records with assembled values
let created_files: Vec<FileCreated> = block_on(db._query_raw(Raw::new(
// create new file records with assembled values
let created_files: Vec<FileCreated> = ctx
.library_ctx()
.db
._query_raw(Raw::new(
&format!(
"INSERT INTO files (cas_id, size_in_bytes, date_created) VALUES {}
ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id",
vec!["({}, {}, {})"; new_files.len()].join(",")
),
values,
)))
))
.await
.unwrap_or_else(|e| {
info!("Error inserting files: {}", e);
error!("Error inserting files: {:#?}", e);
Vec::new()
});
// This code is duplicates, is this right?
for result in join_all(created_files.iter().map(|file| {
// associate newly created files with their respective file_paths
for file in created_files.iter() {
// TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk
// - insert many could work, but I couldn't find a good way to do this in a single SQL query
let file_path_id = cas_lookup.get(&file.cas_id).unwrap();
block_on(
db.file_path()
.find_unique(file_path::id::equals(file_path_id.clone()))
.update(vec![file_path::file_id::set(Some(file.id.clone()))])
.exec(),
)
.unwrap();
// TODO: this is potentially bottle necking the chunk system, individually linking file_path to file, 100 queries per chunk
// - insert many could work, but I couldn't find a good way to do this in a single SQL query
prisma_file_path
.find_unique(file_path::id::equals(
*cas_lookup.get(&file.cas_id).unwrap(),
))
.update(vec![file_path::file_id::set(Some(file.id))])
.exec()
}))
.await
{
if let Err(e) = result {
error!("Error linking file: {:#?}", e);
}
// handle loop end
let last_row = match file_paths.last() {
Some(l) => l,
None => {
break;
}
};
cursor = last_row.id;
completed += 1;
ctx.progress(vec![
JobReportUpdate::CompletedTaskCount(completed),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
completed * CHUNK_SIZE,
total_count
)),
]);
}
ctx
})
.await?;
// handle loop end
let last_row = match file_paths.last() {
Some(l) => l,
None => {
break;
}
};
cursor = last_row.id;
completed += 1;
ctx.progress(vec![
JobReportUpdate::CompletedTaskCount(completed),
JobReportUpdate::Message(format!(
"Processed {} of {} orphan files",
completed * CHUNK_SIZE,
total_count
)),
]);
}
// let _remaining = count_orphan_file_paths(&ctx.core_ctx, location.id.into()).await?;
Ok(())
@@ -209,12 +226,11 @@ pub async fn get_orphan_file_paths(
ctx: &LibraryContext,
cursor: i32,
) -> Result<Vec<file_path::Data>, FileError> {
let db = &ctx.db;
info!(
"discovering {} orphan file paths at cursor: {:?}",
CHUNK_SIZE, cursor
);
let files = db
ctx.db
.file_path()
.find_many(vec![
file_path::file_id::equals(None),
@@ -224,9 +240,8 @@ pub async fn get_orphan_file_paths(
.cursor(file_path::id::cursor(cursor))
.take(CHUNK_SIZE as i64)
.exec()
.await?;
Ok(files)
.await
.map_err(|e| e.into())
}
#[derive(Deserialize, Serialize, Debug)]
@@ -242,13 +257,15 @@ pub struct FileCreated {
pub cas_id: String,
}
pub fn prepare_file(
location_path: &str,
pub async fn prepare_file(
location_path: impl AsRef<Path>,
file_path: &file_path::Data,
) -> Result<CreateFile, io::Error> {
let path = Path::new(&location_path).join(Path::new(file_path.materialized_path.as_str()));
let path = location_path
.as_ref()
.join(file_path.materialized_path.as_str());
let metadata = fs::metadata(&path)?;
let metadata = fs::metadata(&path).await?;
// let date_created: DateTime<Utc> = metadata.created().unwrap().into();
@@ -256,7 +273,7 @@ pub fn prepare_file(
let cas_id = {
if !file_path.is_dir {
let mut ret = generate_cas_id(path.clone(), size.clone()).unwrap();
let mut ret = generate_cas_id(path, size).await?;
ret.truncate(16);
ret
} else {

View File

@@ -6,29 +6,32 @@ use crate::{
sys::get_location,
tag::{Tag, TagError, TagOnFile, TagWithFiles},
};
use log::info;
use std::path::Path;
pub async fn open_dir(
ctx: &LibraryContext,
location_id: &i32,
path: &str,
location_id: i32,
path: impl AsRef<Path>,
) -> Result<DirectoryWithContents, FileError> {
// get location
let location = get_location(ctx, location_id.clone()).await?;
let location = get_location(ctx, location_id).await?;
let path_str = path.as_ref().to_string_lossy().to_string();
let directory = ctx
.db
.file_path()
.find_first(vec![
file_path::location_id::equals(Some(location.id)),
file_path::materialized_path::equals(path.into()),
file_path::materialized_path::equals(path_str),
file_path::is_dir::equals(true),
])
.exec()
.await?
.ok_or(FileError::DirectoryNotFound(path.to_string()))?;
.ok_or_else(|| FileError::DirectoryNotFound(path.as_ref().to_path_buf()))?;
println!("DIRECTORY: {:?}", directory);
info!("DIRECTORY: {:?}", directory);
let mut file_paths: Vec<FilePath> = ctx
.db
@@ -46,10 +49,12 @@ pub async fn open_dir(
for file_path in &mut file_paths {
if let Some(file) = &mut file_path.file {
let thumb_path = Path::new(&ctx.config().data_directory())
let thumb_path = ctx
.config()
.data_directory()
.join(THUMBNAIL_CACHE_DIR_NAME)
.join(format!("{}", location.id))
.join(file.cas_id.clone())
.join(location.id.to_string())
.join(&file.cas_id)
.with_extension("webp");
file.has_thumbnail = thumb_path.exists();
@@ -69,7 +74,7 @@ pub async fn open_tag(ctx: &LibraryContext, tag_id: i32) -> Result<TagWithFiles,
.find_unique(tag::id::equals(tag_id))
.exec()
.await?
.ok_or_else(|| TagError::TagNotFound(tag_id))?
.ok_or(TagError::TagNotFound(tag_id))?
.into();
let files_with_tag: Vec<TagOnFile> = ctx

View File

@@ -1,15 +1,17 @@
use crate::job::{Job, JobReportUpdate, JobResult, WorkerContext};
use std::path::PathBuf;
use self::scan::ScanProgress;
mod scan;
// Re-exporting
pub use scan::*;
pub use scan::scan_path;
use scan::scan_path;
#[derive(Debug)]
pub struct IndexerJob {
pub path: String,
pub path: PathBuf,
}
#[async_trait::async_trait]
@@ -18,7 +20,7 @@ impl Job for IndexerJob {
"indexer"
}
async fn run(&self, ctx: WorkerContext) -> JobResult {
scan_path(&ctx.library_ctx(), self.path.as_str(), move |p| {
scan_path(&ctx.library_ctx(), &self.path, move |p| {
ctx.progress(
p.iter()
.map(|p| match p.clone() {

View File

@@ -8,7 +8,13 @@ use prisma_client_rust::raw;
use prisma_client_rust::raw::Raw;
use serde::{Deserialize, Serialize};
use std::ffi::OsStr;
use std::{collections::HashMap, fs, path::Path, path::PathBuf, time::Instant};
use std::fmt::Debug;
use std::{
collections::HashMap,
path::{Path, PathBuf},
time::Instant,
};
use tokio::fs;
use walkdir::{DirEntry, WalkDir};
#[derive(Clone)]
@@ -23,12 +29,10 @@ static BATCH_SIZE: usize = 100;
// creates a vector of valid path buffers from a directory
pub async fn scan_path(
ctx: &LibraryContext,
path: &str,
path: impl AsRef<Path> + Debug,
on_progress: impl Fn(Vec<ScanProgress>) + Send + Sync + 'static,
) -> JobResult {
let path = path.to_string();
let location = create_location(&ctx, &path).await?;
let location = create_location(ctx, &path).await?;
// query db to highers id, so we can increment it for the new files indexed
#[derive(Deserialize, Serialize, Debug)]
@@ -42,15 +46,16 @@ pub async fn scan_path(
.await
{
Ok(rows) => rows[0].id.unwrap_or(0),
Err(e) => panic!("Error querying for next file id: {}", e),
Err(e) => panic!("Error querying for next file id: {:#?}", e),
};
//check is path is a directory
if !PathBuf::from(&path).is_dir() {
if !path.as_ref().is_dir() {
// return Err(anyhow::anyhow!("{} is not a directory", &path));
panic!("{} is not a directory", &path);
panic!("{:#?} is not a directory", path);
}
let dir_path = path.clone();
let path_buf = path.as_ref().to_path_buf();
// spawn a dedicated thread to scan the directory for performance
let (paths, scan_start, on_progress) = tokio::task::spawn_blocking(move || {
@@ -67,10 +72,9 @@ pub async fn scan_path(
next_file_id
};
// walk through directory recursively
for entry in WalkDir::new(&dir_path).into_iter().filter_entry(|dir| {
let approved =
!is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir);
approved
for entry in WalkDir::new(path_buf).into_iter().filter_entry(|dir| {
// check if entry is approved
!is_hidden(dir) && !is_app_bundle(dir) && !is_node_modules(dir) && !is_library(dir)
}) {
// extract directory entry or log and continue if failed
let entry = match entry {
@@ -86,7 +90,7 @@ pub async fn scan_path(
let parent_path = path
.parent()
.unwrap_or(Path::new(""))
.unwrap_or_else(|| Path::new(""))
.to_str()
.unwrap_or("");
let parent_dir_id = dirs.get(&*parent_path);
@@ -100,7 +104,7 @@ pub async fn scan_path(
};
on_progress(vec![
ScanProgress::Message(format!("{}", path_str)),
ScanProgress::Message(format!("Scanning {}", path_str)),
ScanProgress::ChunkCount(paths.len() / BATCH_SIZE),
]);
@@ -122,8 +126,7 @@ pub async fn scan_path(
}
(paths, scan_start, on_progress)
})
.await
.unwrap();
.await?;
let db_write_start = Instant::now();
let scan_read_time = scan_start.elapsed();
@@ -143,7 +146,7 @@ pub async fn scan_path(
for (file_path, file_id, parent_dir_id, is_dir) in chunk {
files.extend(
match prepare_values(&file_path, *file_id, &location, parent_dir_id, *is_dir) {
match prepare_values(file_path, *file_id, &location, parent_dir_id, *is_dir).await {
Ok(values) => values.to_vec(),
Err(e) => {
error!("Error creating file model from path {:?}: {}", file_path, e);
@@ -178,14 +181,14 @@ pub async fn scan_path(
}
// reads a file at a path and creates an ActiveModel with metadata
fn prepare_values(
async fn prepare_values(
file_path: &PathBuf,
id: i32,
location: &LocationResource,
parent_id: &Option<i32>,
is_dir: bool,
) -> Result<[PrismaValue; 8], std::io::Error> {
let metadata = fs::metadata(&file_path)?;
let metadata = fs::metadata(&file_path).await?;
let location_path = Path::new(location.path.as_ref().unwrap().as_str());
// let size = metadata.len();
let name;
@@ -215,7 +218,6 @@ fn prepare_values(
PrismaValue::String(name),
PrismaValue::String(extension.to_lowercase()),
parent_id
.clone()
.map(|id| PrismaValue::Int(id as i64))
.unwrap_or(PrismaValue::Null),
PrismaValue::DateTime(date_created.into()),
@@ -237,7 +239,7 @@ fn is_hidden(entry: &DirEntry) -> bool {
entry
.file_name()
.to_str()
.map(|s| s.starts_with("."))
.map(|s| s.starts_with('.'))
.unwrap_or(false)
}
@@ -266,7 +268,7 @@ fn is_app_bundle(entry: &DirEntry) -> bool {
.map(|s| s.contains(".app") | s.contains(".bundle"))
.unwrap_or(false);
let is_app_bundle = is_dir && contains_dot;
// let is_app_bundle = is_dir && contains_dot;
// if is_app_bundle {
// let path_buff = entry.path();
// let path = path_buff.to_str().unwrap();
@@ -274,5 +276,5 @@ fn is_app_bundle(entry: &DirEntry) -> bool {
// self::path(&path, );
// }
is_app_bundle
is_dir && contains_dot
}

View File

@@ -1,6 +1,7 @@
use chrono::{DateTime, Utc};
use int_enum::IntEnum;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use thiserror::Error;
use ts_rs::TS;
@@ -79,46 +80,52 @@ pub enum FileKind {
Alias = 8,
}
impl Into<File> for file::Data {
fn into(self) -> File {
File {
id: self.id,
cas_id: self.cas_id,
integrity_checksum: self.integrity_checksum,
kind: IntEnum::from_int(self.kind).unwrap(),
size_in_bytes: self.size_in_bytes.to_string(),
// encryption: EncryptionAlgorithm::from_int(self.encryption).unwrap(),
ipfs_id: self.ipfs_id,
hidden: self.hidden,
favorite: self.favorite,
important: self.important,
has_thumbnail: self.has_thumbnail,
has_thumbstrip: self.has_thumbstrip,
has_video_preview: self.has_video_preview,
note: self.note,
date_created: self.date_created.into(),
date_modified: self.date_modified.into(),
date_indexed: self.date_indexed.into(),
impl From<file::Data> for File {
fn from(data: file::Data) -> Self {
Self {
id: data.id,
cas_id: data.cas_id,
integrity_checksum: data.integrity_checksum,
kind: IntEnum::from_int(data.kind).unwrap(),
size_in_bytes: data.size_in_bytes.to_string(),
// encryption: EncryptionAlgorithm::from_int(data.encryption).unwrap(),
ipfs_id: data.ipfs_id,
hidden: data.hidden,
favorite: data.favorite,
important: data.important,
has_thumbnail: data.has_thumbnail,
has_thumbstrip: data.has_thumbstrip,
has_video_preview: data.has_video_preview,
note: data.note,
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
date_indexed: data.date_indexed.into(),
paths: vec![],
}
}
}
impl Into<FilePath> for file_path::Data {
fn into(mut self) -> FilePath {
FilePath {
id: self.id,
is_dir: self.is_dir,
materialized_path: self.materialized_path,
file_id: self.file_id,
parent_id: self.parent_id,
location_id: self.location_id.unwrap_or(0),
date_indexed: self.date_indexed.into(),
name: self.name,
extension: self.extension,
date_created: self.date_created.into(),
date_modified: self.date_modified.into(),
file: self.file.take().unwrap_or(None).map(|file| (*file).into()),
impl From<Box<file::Data>> for File {
fn from(data: Box<file::Data>) -> Self {
Self::from(*data)
}
}
impl From<file_path::Data> for FilePath {
fn from(data: file_path::Data) -> Self {
Self {
id: data.id,
is_dir: data.is_dir,
materialized_path: data.materialized_path,
file_id: data.file_id,
parent_id: data.parent_id,
location_id: data.location_id.unwrap_or(0),
date_indexed: data.date_indexed.into(),
name: data.name,
extension: data.extension,
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
file: data.file.unwrap_or(None).map(Into::into),
}
}
}
@@ -133,9 +140,9 @@ pub struct DirectoryWithContents {
#[derive(Error, Debug)]
pub enum FileError {
#[error("Directory not found (path: {0:?})")]
DirectoryNotFound(String),
DirectoryNotFound(PathBuf),
#[error("File not found (path: {0:?})")]
FileNotFound(String),
FileNotFound(PathBuf),
#[error("Database error")]
DatabaseError(#[from] prisma::QueryError),
#[error("System error")]
@@ -160,7 +167,7 @@ pub async fn set_note(
library_id: ctx.id.to_string(),
query: LibraryQuery::GetExplorerDir {
limit: 0,
path: "".to_string(),
path: PathBuf::new(),
location_id: 0,
},
}))
@@ -187,7 +194,7 @@ pub async fn favorite(
library_id: ctx.id.to_string(),
query: LibraryQuery::GetExplorerDir {
limit: 0,
path: "".to_string(),
path: PathBuf::new(),
location_id: 0,
},
}))

View File

@@ -7,7 +7,7 @@ use crate::{
prisma::{job, node},
};
use int_enum::IntEnum;
use log::info;
use log::{error, info};
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, VecDeque},
@@ -25,8 +25,8 @@ pub type JobResult = Result<(), Box<dyn Error + Send + Sync>>;
#[async_trait::async_trait]
pub trait Job: Send + Sync + Debug {
async fn run(&self, ctx: WorkerContext) -> JobResult;
fn name(&self) -> &'static str;
async fn run(&self, ctx: WorkerContext) -> JobResult;
}
pub enum JobManagerEvent {
@@ -43,7 +43,7 @@ pub struct JobManager {
impl JobManager {
pub fn new() -> Arc<Self> {
let (internal_sender, mut internal_reciever) = mpsc::unbounded_channel();
let (internal_sender, mut internal_receiver) = mpsc::unbounded_channel();
let this = Arc::new(Self {
job_queue: RwLock::new(VecDeque::new()),
running_workers: RwLock::new(HashMap::new()),
@@ -52,7 +52,7 @@ impl JobManager {
let this2 = this.clone();
tokio::spawn(async move {
while let Some(event) = internal_reciever.recv().await {
while let Some(event) = internal_receiver.recv().await {
match event {
JobManagerEvent::IngestJob(ctx, job) => this2.clone().ingest(&ctx, job).await,
}
@@ -73,7 +73,7 @@ impl JobManager {
let wrapped_worker = Arc::new(Mutex::new(worker));
Worker::spawn(self.clone(), wrapped_worker.clone(), ctx).await;
Worker::spawn(Arc::clone(&self), Arc::clone(&wrapped_worker), ctx.clone()).await;
running_workers.insert(id, wrapped_worker);
} else {
@@ -95,7 +95,7 @@ impl JobManager {
self.internal_sender
.send(JobManagerEvent::IngestJob(ctx.clone(), job))
.unwrap_or_else(|_| {
println!("Failed to ingest job!");
error!("Failed to ingest job!");
});
}
}
@@ -111,9 +111,8 @@ impl JobManager {
}
// pub async fn queue_pending_job(ctx: &LibraryContext) -> Result<(), JobError> {
// let db = &ctx.db;
// let _next_job = db
// let _next_job = ctx
// .db
// .job()
// .find_first(vec![job::status::equals(JobStatus::Queued.int_value())])
// .exec()
@@ -123,14 +122,14 @@ impl JobManager {
// }
pub async fn get_history(ctx: &LibraryContext) -> Result<Vec<JobReport>, JobError> {
let db = &ctx.db;
let jobs = db
let jobs = ctx
.db
.job()
.find_many(vec![job::status::not(JobStatus::Running.int_value())])
.exec()
.await?;
Ok(jobs.into_iter().map(|j| j.into()).collect())
Ok(jobs.into_iter().map(Into::into).collect())
}
}
@@ -165,20 +164,20 @@ pub struct JobReport {
}
// convert database struct into a resource struct
impl Into<JobReport> for job::Data {
fn into(self) -> JobReport {
impl From<job::Data> for JobReport {
fn from(data: job::Data) -> JobReport {
JobReport {
id: self.id,
name: self.name,
// client_id: self.client_id,
status: JobStatus::from_int(self.status).unwrap(),
task_count: self.task_count,
completed_task_count: self.completed_task_count,
date_created: self.date_created.into(),
date_modified: self.date_modified.into(),
data: self.data,
id: data.id,
name: data.name,
// client_id: data.client_id,
status: JobStatus::from_int(data.status).unwrap(),
task_count: data.task_count,
completed_task_count: data.completed_task_count,
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
data: data.data,
message: String::new(),
seconds_elapsed: self.seconds_elapsed,
seconds_elapsed: data.seconds_elapsed,
}
}
}
@@ -203,7 +202,7 @@ impl JobReport {
pub async fn create(&self, ctx: &LibraryContext) -> Result<(), JobError> {
let mut params = Vec::new();
if let Some(_) = &self.data {
if self.data.is_some() {
params.push(job::data::set(self.data.clone()))
}

View File

@@ -3,7 +3,7 @@ use super::{
Job, JobManager,
};
use crate::{library::LibraryContext, ClientQuery, CoreEvent, LibraryQuery};
use log::error;
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{
@@ -12,6 +12,8 @@ use tokio::{
},
time::{sleep, Instant},
};
use uuid::Uuid;
// used to update the worker state from inside the worker thread
pub enum WorkerEvent {
Progressed(Vec<JobReportUpdate>),
@@ -58,7 +60,7 @@ pub struct Worker {
impl Worker {
pub fn new(job: Box<dyn Job>) -> Self {
let (worker_sender, worker_receiver) = unbounded_channel();
let uuid = uuid::Uuid::new_v4().to_string();
let uuid = Uuid::new_v4().to_string();
let name = job.name();
Self {
@@ -71,7 +73,7 @@ impl Worker {
pub async fn spawn(
job_manager: Arc<JobManager>,
worker: Arc<Mutex<Self>>,
ctx: &LibraryContext,
ctx: LibraryContext,
) {
// we capture the worker receiver channel so state can be updated from inside the worker
let mut worker_mut = worker.lock().await;
@@ -88,23 +90,23 @@ impl Worker {
worker_mut.job_report.status = JobStatus::Running;
let ctx = ctx.clone();
worker_mut.job_report.create(&ctx).await.unwrap_or(());
// spawn task to handle receiving events from the worker
let library_ctx = ctx.clone();
tokio::spawn(Worker::track_progress(
worker.clone(),
worker_receiver,
ctx.clone(),
library_ctx.clone(),
));
let uuid = worker_mut.job_report.id.clone();
// spawn task to handle running the job
tokio::spawn(async move {
let worker_ctx = WorkerContext {
uuid,
library_ctx: ctx.clone(),
library_ctx,
sender: worker_sender,
};
let job_start = Instant::now();
@@ -123,14 +125,12 @@ impl Worker {
}
});
match job.run(worker_ctx.clone()).await {
Ok(_) => {
worker_ctx.sender.send(WorkerEvent::Completed).unwrap_or(());
}
Err(err) => {
println!("job '{}' failed with error: {}", worker_ctx.uuid, err);
worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(());
}
if let Err(e) = job.run(worker_ctx.clone()).await {
error!("job '{}' failed with error: {}", worker_ctx.uuid, e);
worker_ctx.sender.send(WorkerEvent::Failed).unwrap_or(());
} else {
// handle completion
worker_ctx.sender.send(WorkerEvent::Completed).unwrap_or(());
}
job_manager.complete(&ctx, worker_ctx.uuid).await;

View File

@@ -1,19 +1,22 @@
use crate::{file::cas::FileIdentifierJob, prisma::file as prisma_file, prisma::location};
use job::{JobManager, JobReport};
use library::{LibraryConfig, LibraryConfigWrapped, LibraryManager};
use log::error;
use node::{NodeConfig, NodeConfigManager};
use serde::{Deserialize, Serialize};
use std::{
fs,
path::{Path, PathBuf},
sync::Arc,
};
use tag::{Tag, TagWithFiles};
use thiserror::Error;
use tokio::sync::{
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
use tokio::{
fs,
sync::{
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
},
};
use ts_rs::TS;
@@ -79,7 +82,7 @@ pub struct NodeContext {
impl NodeContext {
pub async fn emit(&self, event: CoreEvent) {
self.event_sender.send(event).await.unwrap_or_else(|e| {
println!("Failed to emit event. {:?}", e);
error!("Failed to emit event. {:#?}", e);
});
}
}
@@ -103,11 +106,15 @@ pub struct Node {
impl Node {
// create new instance of node, run startup tasks
pub async fn new(data_dir: PathBuf) -> (NodeController, mpsc::Receiver<CoreEvent>, Node) {
fs::create_dir_all(&data_dir).unwrap();
pub async fn new(
data_dir: impl AsRef<Path>,
) -> (NodeController, mpsc::Receiver<CoreEvent>, Node) {
fs::create_dir_all(&data_dir).await.unwrap();
let (event_sender, event_recv) = mpsc::channel(100);
let config = NodeConfigManager::new(data_dir.clone()).await.unwrap();
let config = NodeConfigManager::new(data_dir.as_ref().to_owned())
.await
.unwrap();
let jobs = JobManager::new();
let node_ctx = NodeContext {
event_sender: event_sender.clone(),
@@ -117,7 +124,7 @@ impl Node {
let node = Node {
config,
library_manager: LibraryManager::new(Path::new(&data_dir).join("libraries"), node_ctx)
library_manager: LibraryManager::new(data_dir.as_ref().join("libraries"), node_ctx)
.await
.unwrap(),
query_channel: unbounded_channel(),
@@ -139,8 +146,8 @@ impl Node {
pub fn get_context(&self) -> NodeContext {
NodeContext {
event_sender: self.event_sender.clone(),
config: self.config.clone(),
jobs: self.jobs.clone(),
config: Arc::clone(&self.config),
jobs: Arc::clone(&self.jobs),
}
}
@@ -305,12 +312,12 @@ impl Node {
}
// return contents of a directory for the explorer
LibraryQuery::GetExplorerDir {
path,
location_id,
path,
limit: _,
} => CoreResponse::GetExplorerDir(
file::explorer::open_dir(&ctx, &location_id, &path).await?,
),
} => CoreResponse::GetExplorerDir(Box::new(
file::explorer::open_dir(&ctx, location_id, path).await?,
)),
LibraryQuery::GetJobHistory => {
CoreResponse::GetJobHistory(JobManager::get_history(&ctx).await?)
}
@@ -390,7 +397,7 @@ pub enum LibraryCommand {
},
// Locations
LocCreate {
path: String,
path: PathBuf,
},
LocUpdate {
id: i32,
@@ -411,12 +418,12 @@ pub enum LibraryCommand {
},
GenerateThumbsForLocation {
id: i32,
path: String,
path: PathBuf,
},
// PurgeDatabase,
IdentifyUniqueFiles {
id: i32,
path: String,
path: PathBuf,
},
}
@@ -448,7 +455,7 @@ pub enum LibraryQuery {
GetRunningJobs,
GetExplorerDir {
location_id: i32,
path: String,
path: PathBuf,
limit: i32,
},
GetLibraryStatistics,
@@ -488,15 +495,15 @@ pub enum CoreResponse {
Error(String),
GetLibraries(Vec<LibraryConfigWrapped>),
GetVolumes(Vec<sys::Volume>),
TagCreateResponse(tag::Tag),
GetTag(Option<tag::Tag>),
GetTags(Vec<tag::Tag>),
TagCreateResponse(Tag),
GetTag(Option<Tag>),
GetTags(Vec<Tag>),
GetLocation(sys::LocationResource),
GetLocations(Vec<sys::LocationResource>),
GetExplorerDir(file::DirectoryWithContents),
GetExplorerDir(Box<file::DirectoryWithContents>),
GetNode(NodeState),
LocCreate(sys::LocationResource),
OpenTag(Vec<tag::TagWithFiles>),
OpenTag(Vec<TagWithFiles>),
GetRunningJobs(Vec<JobReport>),
GetJobHistory(Vec<JobReport>),
GetLibraryStatistics(library::Statistics),
@@ -527,5 +534,5 @@ pub enum CoreResource {
Location(sys::LocationResource),
File(file::File),
Job(JobReport),
Tag(tag::Tag),
Tag(Tag),
}

View File

@@ -41,10 +41,7 @@ impl LibraryConfig {
file_dir: PathBuf,
config: &LibraryConfig,
) -> Result<(), LibraryManagerError> {
File::create(file_dir)
.map_err(LibraryManagerError::IOError)?
.write_all(serde_json::to_string(config)?.as_bytes())
.map_err(LibraryManagerError::IOError)?;
File::create(file_dir)?.write_all(serde_json::to_string(config)?.as_bytes())?;
Ok(())
}
@@ -54,7 +51,7 @@ impl LibraryConfig {
config_path: PathBuf,
) -> Result<(), LibraryManagerError> {
match current_version {
None => Err(LibraryManagerError::MigrationError(format!(
None => Err(LibraryManagerError::Migration(format!(
"Your Spacedrive library at '{}' is missing the `version` field",
config_path.display()
))),

View File

@@ -31,17 +31,17 @@ pub struct LibraryManager {
#[derive(Error, Debug)]
pub enum LibraryManagerError {
#[error("error saving or loading the config from the filesystem")]
IOError(#[from] io::Error),
IO(#[from] io::Error),
#[error("error serializing or deserializing the JSON in the config file")]
JsonError(#[from] serde_json::Error),
Json(#[from] serde_json::Error),
#[error("Database error")]
DatabaseError(#[from] prisma::QueryError),
Database(#[from] prisma::QueryError),
#[error("Library not found error")]
LibraryNotFoundError,
LibraryNotFound,
#[error("error migrating the config file")]
MigrationError(String),
Migration(String),
#[error("failed to parse uuid")]
UuidError(#[from] uuid::Error),
Uuid(#[from] uuid::Error),
}
impl LibraryManager {
@@ -66,7 +66,7 @@ impl LibraryManager {
let config_path = entry.path();
let library_id = match Path::new(&config_path)
.file_stem()
.map(|v| v.to_str().map(|v| Uuid::from_str(v)))
.map(|v| v.to_str().map(Uuid::from_str))
{
Some(Some(Ok(id))) => id,
_ => {
@@ -119,17 +119,14 @@ impl LibraryManager {
pub(crate) async fn create(&self, config: LibraryConfig) -> Result<(), LibraryManagerError> {
let id = Uuid::new_v4();
LibraryConfig::save(
Path::new(&self.libraries_dir).join(format!("{}.sdlibrary", id.to_string())),
Path::new(&self.libraries_dir).join(format!("{id}.sdlibrary")),
&config,
)
.await?;
let library = Self::load(
id,
&Path::new(&self.libraries_dir)
.join(format!("{}.db", id.to_string()))
.to_str()
.unwrap(),
self.libraries_dir.join(format!("{id}.db")),
config,
self.node_context.clone(),
)
@@ -167,7 +164,7 @@ impl LibraryManager {
let library = libraries
.iter_mut()
.find(|lib| lib.id == Uuid::from_str(&id).unwrap())
.ok_or(LibraryManagerError::LibraryNotFoundError)?;
.ok_or(LibraryManagerError::LibraryNotFound)?;
// update the library
if let Some(name) = name {
@@ -178,7 +175,7 @@ impl LibraryManager {
}
LibraryConfig::save(
Path::new(&self.libraries_dir).join(format!("{}.sdlibrary", id.to_string())),
Path::new(&self.libraries_dir).join(format!("{id}.sdlibrary")),
&library.config,
)
.await?;
@@ -197,14 +194,10 @@ impl LibraryManager {
let library = libraries
.iter()
.find(|l| l.id == id)
.ok_or(LibraryManagerError::LibraryNotFoundError)?;
.ok_or(LibraryManagerError::LibraryNotFound)?;
fs::remove_file(
Path::new(&self.libraries_dir).join(format!("{}.db", library.id.to_string())),
)?;
fs::remove_file(
Path::new(&self.libraries_dir).join(format!("{}.sdlibrary", library.id.to_string())),
)?;
fs::remove_file(Path::new(&self.libraries_dir).join(format!("{}.db", library.id)))?;
fs::remove_file(Path::new(&self.libraries_dir).join(format!("{}.sdlibrary", library.id)))?;
libraries.retain(|l| l.id != id);
@@ -221,18 +214,18 @@ impl LibraryManager {
.await
.iter()
.find(|lib| lib.id.to_string() == library_id)
.map(|v| v.clone())
.map(Clone::clone)
}
/// load the library from a given path
pub(crate) async fn load(
id: Uuid,
db_path: &str,
db_path: impl AsRef<Path>,
config: LibraryConfig,
node_context: NodeContext,
) -> Result<LibraryContext, LibraryManagerError> {
let db = Arc::new(
load_and_migrate(&format!("file:{}", db_path))
load_and_migrate(&format!("file:{}", db_path.as_ref().to_string_lossy()))
.await
.unwrap(),
);

View File

@@ -0,0 +1 @@

View File

@@ -1,12 +1,12 @@
use crate::{prisma::statistics::*, sys::Volume};
use fs_extra::dir::get_size;
use serde::{Deserialize, Serialize};
use std::fs;
use tokio::fs;
use ts_rs::TS;
use super::{LibraryContext, LibraryError};
#[derive(Debug, Serialize, Deserialize, TS, Clone)]
#[derive(Debug, Serialize, Deserialize, TS, Clone, Default)]
#[ts(export)]
pub struct Statistics {
pub total_file_count: i32,
@@ -18,29 +18,15 @@ pub struct Statistics {
pub library_db_size: String,
}
impl Into<Statistics> for Data {
fn into(self) -> Statistics {
Statistics {
total_file_count: self.total_file_count,
total_bytes_used: self.total_bytes_used,
total_bytes_capacity: self.total_bytes_capacity,
total_bytes_free: self.total_bytes_free,
total_unique_bytes: self.total_unique_bytes,
preview_media_bytes: self.preview_media_bytes,
library_db_size: String::new(),
}
}
}
impl Default for Statistics {
fn default() -> Self {
impl From<Data> for Statistics {
fn from(data: Data) -> Self {
Self {
total_file_count: 0,
total_bytes_used: String::new(),
total_bytes_capacity: String::new(),
total_bytes_free: String::new(),
total_unique_bytes: String::new(),
preview_media_bytes: String::new(),
total_file_count: data.total_file_count,
total_bytes_used: data.total_bytes_used,
total_bytes_capacity: data.total_bytes_capacity,
total_bytes_free: data.total_bytes_free,
total_unique_bytes: data.total_unique_bytes,
preview_media_bytes: data.preview_media_bytes,
library_db_size: String::new(),
}
}
@@ -48,18 +34,14 @@ impl Default for Statistics {
impl Statistics {
pub async fn retrieve(ctx: &LibraryContext) -> Result<Statistics, LibraryError> {
let library_statistics_db = match ctx
let library_statistics_db = ctx
.db
.statistics()
.find_unique(id::equals(ctx.node_local_id))
.exec()
.await?
{
Some(library_statistics_db) => library_statistics_db.into(),
// create the default values if database has no entry
None => Statistics::default(),
};
Ok(library_statistics_db.into())
.map_or_else(Default::default, Into::into);
Ok(library_statistics_db)
}
pub async fn calculate(ctx: &LibraryContext) -> Result<Statistics, LibraryError> {
@@ -72,9 +54,9 @@ impl Statistics {
// TODO: get from database, not sys
let volumes = Volume::get_volumes();
Volume::save(&ctx).await?;
Volume::save(ctx).await?;
// println!("{:?}", volumes);
// info!("{:?}", volumes);
let mut available_capacity: u64 = 0;
let mut total_capacity: u64 = 0;
@@ -85,15 +67,12 @@ impl Statistics {
}
}
let library_db_size = match fs::metadata(ctx.config().data_directory()) {
let library_db_size = match fs::metadata(ctx.config().data_directory()).await {
Ok(metadata) => metadata.len(),
Err(_) => 0,
};
let mut thumbsnails_dir = ctx.config().data_directory();
thumbsnails_dir.push("thumbnails");
let thumbnail_folder_size = get_size(&thumbsnails_dir);
let thumbnail_folder_size = get_size(ctx.config().data_directory().join("thumbnails"));
let statistics = Statistics {
library_db_size: library_db_size.to_string(),
@@ -109,7 +88,7 @@ impl Statistics {
id::equals(1),
vec![library_db_size::set(statistics.library_db_size.clone())],
vec![
total_file_count::set(statistics.total_file_count.clone()),
total_file_count::set(statistics.total_file_count),
total_bytes_used::set(statistics.total_bytes_used.clone()),
total_bytes_capacity::set(statistics.total_bytes_capacity.clone()),
total_bytes_free::set(statistics.total_bytes_free.clone()),

View File

@@ -45,11 +45,11 @@ pub struct NodeConfig {
#[derive(Error, Debug)]
pub enum NodeConfigError {
#[error("error saving or loading the config from the filesystem")]
IOError(#[from] io::Error),
IO(#[from] io::Error),
#[error("error serializing or deserializing the JSON in the config file")]
JsonError(#[from] serde_json::Error),
Json(#[from] serde_json::Error),
#[error("error migrating the config file")]
MigrationError(String),
Migration(String),
}
impl NodeConfig {
@@ -141,7 +141,7 @@ impl NodeConfigManager {
) -> Result<(), NodeConfigError> {
match current_version {
None => {
Err(NodeConfigError::MigrationError(format!("Your Spacedrive config file stored at '{}' is missing the `version` field. If you just upgraded please delete the file and restart Spacedrive! Please note this upgrade will stop using your old 'library.db' as the folder structure has changed.", config_path.display())))
Err(NodeConfigError::Migration(format!("Your Spacedrive config file stored at '{}' is missing the `version` field. If you just upgraded please delete the file and restart Spacedrive! Please note this upgrade will stop using your old 'library.db' as the folder structure has changed.", config_path.display())))
}
_ => Ok(()),
}

View File

@@ -15,17 +15,24 @@ pub struct LibraryNode {
pub last_seen: DateTime<Utc>,
}
impl Into<LibraryNode> for node::Data {
fn into(self) -> LibraryNode {
LibraryNode {
uuid: self.pub_id,
name: self.name,
platform: IntEnum::from_int(self.platform).unwrap(),
last_seen: self.last_seen.into(),
impl From<node::Data> for LibraryNode {
fn from(data: node::Data) -> Self {
Self {
uuid: data.pub_id,
name: data.name,
platform: IntEnum::from_int(data.platform).unwrap(),
last_seen: data.last_seen.into(),
}
}
}
impl From<Box<node::Data>> for LibraryNode {
fn from(data: Box<node::Data>) -> Self {
Self::from(*data)
}
}
#[allow(clippy::upper_case_acronyms)]
#[repr(i32)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, TS, Eq, PartialEq, IntEnum)]
#[ts(export)]

1
core/src/node/state.rs Normal file
View File

@@ -0,0 +1 @@

View File

@@ -5,10 +5,18 @@ use crate::{
prisma::{file_path, location},
ClientQuery, CoreEvent, LibraryQuery,
};
use log::info;
use serde::{Deserialize, Serialize};
use std::{fs, io, io::Write, path::Path};
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use thiserror::Error;
use tokio::{
fs::{metadata, File},
io::{self, AsyncWriteExt},
};
use ts_rs::TS;
use uuid::Uuid;
use super::SysError;
@@ -27,26 +35,26 @@ pub struct LocationResource {
pub date_created: chrono::DateTime<chrono::Utc>,
}
impl Into<LocationResource> for location::Data {
fn into(mut self) -> LocationResource {
impl From<location::Data> for LocationResource {
fn from(data: location::Data) -> Self {
LocationResource {
id: self.id,
name: self.name,
path: self.local_path,
total_capacity: self.total_capacity,
available_capacity: self.available_capacity,
is_removable: self.is_removable,
node: self.node.take().unwrap_or(None).map(|node| (*node).into()),
is_online: self.is_online,
date_created: self.date_created.into(),
id: data.id,
name: data.name,
path: data.local_path,
total_capacity: data.total_capacity,
available_capacity: data.available_capacity,
is_removable: data.is_removable,
node: data.node.unwrap_or(None).map(Into::into),
is_online: data.is_online,
date_created: data.date_created.into(),
}
}
}
#[derive(Serialize, Deserialize, Default)]
pub struct DotSpacedrive {
pub location_uuid: String,
pub library_uuid: String,
pub location_uuid: Uuid,
pub library_uuid: Uuid,
}
static DOTFILE_NAME: &str = ".spacedrive";
@@ -69,24 +77,26 @@ pub async fn get_location(
location_id: i32,
) -> Result<LocationResource, SysError> {
// get location by location_id from db and include location_paths
let location = match ctx
.db
ctx.db
.location()
.find_unique(location::id::equals(location_id))
.exec()
.await?
{
Some(location) => location,
None => Err(LocationError::NotFound(location_id.to_string()))?,
};
Ok(location.into())
.map(Into::into)
.ok_or_else(|| LocationError::IdNotFound(location_id).into())
}
pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: String) {
ctx.spawn_job(Box::new(IndexerJob { path: path.clone() }))
.await;
ctx.queue_job(Box::new(FileIdentifierJob { location_id, path }))
.await;
pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: impl AsRef<Path>) {
let path_buf = path.as_ref().to_path_buf();
ctx.spawn_job(Box::new(IndexerJob {
path: path_buf.clone(),
}))
.await;
ctx.queue_job(Box::new(FileIdentifierJob {
location_id,
path: path_buf,
}))
.await;
// TODO: make a way to stop jobs so this can be canceled without rebooting app
// ctx.queue_job(Box::new(ThumbnailJob {
// location_id,
@@ -97,19 +107,18 @@ pub async fn scan_location(ctx: &LibraryContext, location_id: i32, path: String)
pub async fn new_location_and_scan(
ctx: &LibraryContext,
path: &str,
path: impl AsRef<Path> + Debug,
) -> Result<LocationResource, SysError> {
let location = create_location(&ctx, path).await?;
let location = create_location(ctx, &path).await?;
scan_location(&ctx, location.id, path.to_string()).await;
scan_location(ctx, location.id, path).await;
Ok(location)
}
pub async fn get_locations(ctx: &LibraryContext) -> Result<Vec<LocationResource>, SysError> {
let db = &ctx.db;
let locations = db
let locations = ctx
.db
.location()
.find_many(vec![])
.with(location::node::fetch())
@@ -117,121 +126,105 @@ pub async fn get_locations(ctx: &LibraryContext) -> Result<Vec<LocationResource>
.await?;
// turn locations into LocationResource
let locations: Vec<LocationResource> = locations
.into_iter()
.map(|location| location.into())
.collect();
Ok(locations)
Ok(locations.into_iter().map(LocationResource::from).collect())
}
pub async fn create_location(
ctx: &LibraryContext,
path: &str,
path: impl AsRef<Path> + Debug,
) -> Result<LocationResource, SysError> {
let path = path.as_ref();
// check if we have access to this location
if !Path::new(path).exists() {
Err(LocationError::NotFound(path.to_string()))?;
if !path.exists() {
return Err(LocationError::PathNotFound(path.to_owned()).into());
}
// if on windows
if cfg!(target_family = "windows") {
// try and create a dummy file to see if we can write to this location
match fs::File::create(format!("{}/{}", path.clone(), ".spacewrite")) {
Ok(file) => file,
Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?,
};
match fs::remove_file(format!("{}/{}", path.clone(), ".spacewrite")) {
Ok(_) => (),
Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?,
}
} else {
// unix allows us to test this more directly
match fs::File::open(&path) {
Ok(_) => println!("Path is valid, creating location for '{}'", &path),
Err(e) => Err(LocationError::FileReadError(e))?,
}
if metadata(path)
.await
.map_err(|e| LocationError::DotfileReadFailure(e, path.to_owned()))?
.permissions()
.readonly()
{
return Err(LocationError::ReadonlyDotFileLocationFailure(path.to_owned()).into());
}
let path_string = path.to_string_lossy().to_string();
// check if location already exists
let location = match ctx
let location_resource = if let Some(location) = ctx
.db
.location()
.find_first(vec![location::local_path::equals(Some(path.to_string()))])
.find_first(vec![location::local_path::equals(Some(
path_string.clone(),
))])
.exec()
.await?
{
Some(location) => location,
None => {
println!(
"Location does not exist, creating new location for '{}'",
&path
);
let uuid = uuid::Uuid::new_v4();
location.into()
} else {
info!(
"Location does not exist, creating new location for '{}'",
path_string
);
let uuid = Uuid::new_v4();
let p = Path::new(&path);
let location = ctx
.db
.location()
.create(
location::pub_id::set(uuid.to_string()),
vec![
location::name::set(Some(
path.file_name().unwrap().to_string_lossy().to_string(),
)),
location::is_online::set(true),
location::local_path::set(Some(path_string)),
location::node_id::set(Some(ctx.node_local_id)),
],
)
.exec()
.await?;
let location = ctx
.db
.location()
.create(
location::pub_id::set(uuid.to_string()),
vec![
location::name::set(Some(
p.file_name().unwrap().to_string_lossy().to_string(),
)),
location::is_online::set(true),
location::local_path::set(Some(path.to_string())),
location::node_id::set(Some(ctx.node_local_id)),
],
)
.exec()
.await?;
info!("Created location: {:?}", location);
println!("Created location: {:?}", location);
// write a file called .spacedrive to path containing the location id in JSON format
let mut dotfile = File::create(path.with_file_name(DOTFILE_NAME))
.await
.map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?;
// write a file called .spacedrive to path containing the location id in JSON format
let mut dotfile = match fs::File::create(format!("{}/{}", path.clone(), DOTFILE_NAME)) {
Ok(file) => file,
Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?,
};
let data = DotSpacedrive {
location_uuid: uuid,
library_uuid: ctx.id,
};
let data = DotSpacedrive {
location_uuid: uuid.to_string(),
library_uuid: ctx.id.to_string(),
};
let json_bytes = serde_json::to_vec(&data)
.map_err(|e| LocationError::DotfileSerializeFailure(e, path.to_owned()))?;
let json = match serde_json::to_string(&data) {
Ok(json) => json,
Err(e) => Err(LocationError::DotfileSerializeFailure(e, path.to_string()))?,
};
dotfile
.write_all(&json_bytes)
.await
.map_err(|e| LocationError::DotfileWriteFailure(e, path.to_owned()))?;
match dotfile.write_all(json.as_bytes()) {
Ok(_) => (),
Err(e) => Err(LocationError::DotfileWriteFailure(e, path.to_string()))?,
}
// ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::GetLocations))
// .await;
// ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::GetLocations))
// .await;
location
}
location.into()
};
Ok(location.into())
Ok(location_resource)
}
pub async fn delete_location(ctx: &LibraryContext, location_id: i32) -> Result<(), SysError> {
let db = &ctx.db;
db.file_path()
ctx.db
.file_path()
.find_many(vec![file_path::location_id::equals(Some(location_id))])
.delete()
.exec()
.await?;
db.location()
ctx.db
.location()
.find_unique(location::id::equals(location_id))
.delete()
.exec()
@@ -243,7 +236,7 @@ pub async fn delete_location(ctx: &LibraryContext, location_id: i32) -> Result<(
}))
.await;
println!("Location {} deleted", location_id);
info!("Location {} deleted", location_id);
Ok(())
}
@@ -251,15 +244,21 @@ pub async fn delete_location(ctx: &LibraryContext, location_id: i32) -> Result<(
#[derive(Error, Debug)]
pub enum LocationError {
#[error("Failed to create location (uuid {uuid:?})")]
CreateFailure { uuid: String },
#[error("Failed to read location dotfile")]
DotfileReadFailure(io::Error),
CreateFailure { uuid: Uuid },
#[error("Failed to read location dotfile (path: {1:?})")]
DotfileReadFailure(io::Error, PathBuf),
#[error("Failed to serialize dotfile for location (at path: {1:?})")]
DotfileSerializeFailure(serde_json::Error, String),
#[error("Location not found (uuid: {1:?})")]
DotfileWriteFailure(io::Error, String),
#[error("Location not found (uuid: {0:?})")]
NotFound(String),
DotfileSerializeFailure(serde_json::Error, PathBuf),
#[error("Dotfile location is read only (at path: {0:?})")]
ReadonlyDotFileLocationFailure(PathBuf),
#[error("Failed to write dotfile (path: {1:?})")]
DotfileWriteFailure(io::Error, PathBuf),
#[error("Location not found (path: {0:?})")]
PathNotFound(PathBuf),
#[error("Location not found (uuid: {0})")]
UuidNotFound(Uuid),
#[error("Location not found (id: {0})")]
IdNotFound(i32),
#[error("Failed to open file from local os")]
FileReadError(io::Error),
#[error("Failed to read mounted volumes from local os")]

View File

@@ -11,11 +11,11 @@ use crate::{job, prisma};
#[derive(Error, Debug)]
pub enum SysError {
#[error("Location error")]
LocationError(#[from] LocationError),
Location(#[from] LocationError),
#[error("Error with system volumes")]
VolumeError(String),
Volume(String),
#[error("Error from job runner")]
JobError(#[from] job::JobError),
Job(#[from] job::JobError),
#[error("Database error")]
DatabaseError(#[from] prisma::QueryError),
Database(#[from] prisma::QueryError),
}

View File

@@ -33,7 +33,7 @@ impl Volume {
.volume()
.upsert(
node_id_mount_point_name(
ctx.node_local_id.clone(),
ctx.node_local_id,
volume.mount_point.to_string(),
volume.name.to_string(),
),
@@ -63,7 +63,7 @@ impl Volume {
Ok(())
}
pub fn get_volumes() -> Result<Vec<Volume>, SysError> {
let all_volumes: Vec<Volume> = System::new_all()
Ok(System::new_all()
.disks()
.iter()
.map(|disk| {
@@ -119,15 +119,8 @@ impl Volume {
is_root_filesystem: mount_point == "/",
}
})
.collect();
let volumes = all_volumes
.clone()
.into_iter()
.filter(|volume| !volume.mount_point.starts_with("/System"))
.collect();
Ok(volumes)
.collect())
}
}

View File

@@ -39,29 +39,29 @@ pub struct TagOnFile {
pub date_created: chrono::DateTime<chrono::Utc>,
}
impl Into<Tag> for tag::Data {
fn into(self) -> Tag {
Tag {
id: self.id,
pub_id: self.pub_id,
name: self.name,
color: self.color,
total_files: self.total_files,
redundancy_goal: self.redundancy_goal,
date_created: self.date_created.into(),
date_modified: self.date_modified.into(),
impl From<tag::Data> for Tag {
fn from(data: tag::Data) -> Self {
Self {
id: data.id,
pub_id: data.pub_id,
name: data.name,
color: data.color,
total_files: data.total_files,
redundancy_goal: data.redundancy_goal,
date_created: data.date_created.into(),
date_modified: data.date_modified.into(),
}
}
}
impl Into<TagOnFile> for tag_on_file::Data {
fn into(self) -> TagOnFile {
TagOnFile {
tag_id: self.tag_id,
tag: self.tag.map(|t| (*t).into()),
file_id: self.file_id,
file: self.file.map(|f| (*f).into()),
date_created: self.date_created.into(),
impl From<tag_on_file::Data> for TagOnFile {
fn from(data: tag_on_file::Data) -> Self {
Self {
tag_id: data.tag_id,
tag: data.tag.map(|t| (*t).into()),
file_id: data.file_id,
file: data.file.map(|f| (*f).into()),
date_created: data.date_created.into(),
}
}
}

View File

@@ -12,10 +12,10 @@ static MIGRATIONS_DIR: Dir = include_dir!("$CARGO_MANIFEST_DIR/prisma/migrations
#[derive(Error, Debug)]
pub enum MigrationError {
#[error("An error occurred while initialising a new database connection")]
DatabaseIntialisation(#[from] NewClientError),
DatabaseInitialization(#[from] NewClientError),
#[error("An error occurred with the database while applying migrations")]
DatabaseError(#[from] prisma_client_rust::queries::Error),
#[error("An error occured reading the embedded migration files. {0}. Please report to Spacedrive developers!")]
#[error("An error occurred reading the embedded migration files. {0}. Please report to Spacedrive developers!")]
InvalidEmbeddedMigration(&'static str),
}
@@ -28,7 +28,7 @@ pub async fn load_and_migrate(db_url: &str) -> Result<PrismaClient, MigrationErr
"SELECT name FROM sqlite_master WHERE type='table' AND name='_migrations'"
))
.await?
.len() == 0;
.is_empty();
if migrations_table_missing {
client._execute_raw(raw!(INIT_MIGRATION)).await?;
@@ -44,11 +44,9 @@ pub async fn load_and_migrate(db_url: &str) -> Result<PrismaClient, MigrationErr
))
.and_then(|name| {
name.to_str()
.ok_or_else(|| {
MigrationError::InvalidEmbeddedMigration(
"File name contains malformed characters",
)
})
.ok_or(MigrationError::InvalidEmbeddedMigration(
"File name contains malformed characters",
))
.map(|name| (name, dir))
})
})
@@ -65,7 +63,7 @@ pub async fn load_and_migrate(db_url: &str) -> Result<PrismaClient, MigrationErr
.collect::<Result<Vec<_>, _>>()?;
// We sort the migrations so they are always applied in the correct order
migration_directories.sort_by(|(_, a_time, _), (_, b_time, _)| a_time.cmp(&b_time));
migration_directories.sort_by(|(_, a_time, _), (_, b_time, _)| a_time.cmp(b_time));
for (name, _, dir) in migration_directories {
let migration_file_raw = dir
@@ -105,7 +103,7 @@ pub async fn load_and_migrate(db_url: &str) -> Result<PrismaClient, MigrationErr
.await?;
// Split the migrations file up into each individual step and apply them all
let steps = migration_file_raw.split(";").collect::<Vec<&str>>();
let steps = migration_file_raw.split(';').collect::<Vec<&str>>();
let steps = &steps[0..steps.len() - 1];
for (i, step) in steps.iter().enumerate() {
client._execute_raw(raw!(*step)).await?;