From d164db7fc37a2bcac56eeeef0055a1067e506d5d Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Thu, 26 May 2022 16:17:46 +0800 Subject: [PATCH] cleanup + update to prisma-client-rust 0.5.0 still waiting to rebase for changes to identifier --- .vscode/settings.json | 2 +- Cargo.lock | Bin 218854 -> 218926 bytes core/Cargo.toml | 2 +- core/prisma/Cargo.toml | 2 +- .../migration.sql | 0 .../migration.sql | 0 core/src/db/migrate.rs | 35 ++--- core/src/file/cas/identifier.rs | 141 ++++++++++-------- core/src/file/indexer/scan.rs | 85 +++++------ core/src/file/mod.rs | 19 ++- core/src/job/jobs.rs | 8 +- core/src/library/statistics.rs | 31 ++-- core/src/sys/locations.rs | 2 +- core/src/sys/volumes.rs | 36 ++--- 14 files changed, 184 insertions(+), 179 deletions(-) rename core/prisma/migrations/{20220523044216_init => 20220526035100_init}/migration.sql (100%) rename core/prisma/migrations/{0_migration_table => migration_table}/migration.sql (100%) diff --git a/.vscode/settings.json b/.vscode/settings.json index fba57d79b..3d1c13bb5 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -19,7 +19,7 @@ "upsert" ], "[rust]": { - "editor.defaultFormatter": "matklad.rust-analyzer" + "editor.defaultFormatter": "rust-lang.rust-analyzer" }, "rust-analyzer.procMacro.enable": true, "rust-analyzer.diagnostics.experimental.enable": false diff --git a/Cargo.lock b/Cargo.lock index 626db307097957d6d50d188b74360746c626e47f..5f2869ac6385911521b9ee353485af4a6f66bd82 100644 GIT binary patch delta 2212 zcmcJRF-%iY6o&IoQEQ>n$ZJbk+N8vWqU1f^ea}}16DTnbm^c|16EBbZ7+8?a*%2&h zZqm~b1wtICMnco1ccTe|F_9Q1Mn@NQFz6x<-kQKn2xq_fzH`q#=gOb+2Q6HNS&gbqoM@LLMDXvp1IQ##hN zM3Htd1u?ajfuH>(^>=rmU4$G>^|k59W$b*>**5>mh^dWH$fQZGq4YES&5)o`?^i-q zpTMq7!`JzC506)%MuI;HIn*IHr5;tG&M(}9wxxIa>1!Clr6ZWTaDMmypdSB%%FI7v C7o5@n delta 2236 zcmZ3tj`!JG-VGiz0xXg&O%g2(5{*(U(hQ9fjVuk#(o&5qQjLuaEey?!5);i0Q!Px5 zEtQE>IQij!vFZFGOq!do&xm5Ajq!GG809x_n{|zaCdP06G@pfoCMHjgS#h5x27>}z zbn~{AMPF%SeBXS=L$opZZ$FdU^mu1x*2$Mw%1>8V$e2SLd+aAN#ZNELVv^dPy@=75 zk0zE(3t;A+e14_!cAt%mJytZaVmmPDJE>1k*vQB`dH-_l={tm(#B@yb43tfh&5})$ z(kzXWEmD(A(~J`h%~I2nfuy0Og>j;#sj+#gX{xDl(&UZv#3skC;F_*4!o)Xyt22}E z^dq87Qqz5OnRupO6J_ETC`v6XO)W0b%}C5k$w@6Lp02-@Np5<#2v9!2mq~njtT2<% zbbDbY*6DXdm;@+uiVCx-p276lqD-38pNay_78GOJLz8fuZY<8Uf;J`tv!23q-9?Pz z(<>yH{?W#cxHXJi(_d&aX>8|{VmirB8%uy$M0$IK7Lx#tO6`6nMz-k_)R_#Y=TBtf z*)FKV^g)a^J^@CF*!Egyre~70F&~%-1-9ReW!h&*8}osM;vJfpyd4(mut reader: R) -> Result { @@ -28,9 +29,9 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<()> { let client = &ctx.database; match client - ._query_raw::( - "SELECT name FROM sqlite_master WHERE type='table' AND name='_migrations'", - ) + ._query_raw::(raw!( + "SELECT name FROM sqlite_master WHERE type='table' AND name='_migrations'" + )) .await { Ok(data) => { @@ -38,7 +39,7 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<()> { #[cfg(debug_assertions)] println!("Migration table does not exist"); // execute migration - match client._execute_raw(INIT_MIGRATION).await { + match client._execute_raw(raw!(INIT_MIGRATION)).await { Ok(_) => {} Err(e) => { println!("Failed to create migration table: {}", e); @@ -46,9 +47,9 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<()> { }; let value: Vec = client - ._query_raw( - "SELECT name FROM sqlite_master WHERE type='table' AND name='_migrations'", - ) + ._query_raw(raw!( + "SELECT name FROM sqlite_master WHERE type='table' AND name='_migrations'" + )) .await .unwrap(); @@ -70,15 +71,15 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<()> { }) .collect::>(); - // migration_subdirs.sort_by(|a, b| { - // let a_name = a.path().file_name().unwrap().to_str().unwrap(); - // let b_name = b.path().file_name().unwrap().to_str().unwrap(); + migration_subdirs.sort_by(|a, b| { + let a_name = a.path().file_name().unwrap().to_str().unwrap(); + let b_name = b.path().file_name().unwrap().to_str().unwrap(); - // let a_time = a_name[..14].parse::().unwrap(); - // let b_time = b_name[..14].parse::().unwrap(); + let a_time = a_name[..14].parse::().unwrap(); + let b_time = b_name[..14].parse::().unwrap(); - // a_time.cmp(&b_time) - // }); + a_time.cmp(&b_time) + }); for subdir in migration_subdirs { println!("{:?}", subdir.path()); @@ -117,7 +118,7 @@ pub async fn run_migrations(ctx: &CoreContext) -> Result<()> { .await?; for (i, step) in steps.iter().enumerate() { - match client._execute_raw(&format!("{};", step)).await { + match client._execute_raw(raw!(*step)).await { Ok(_) => { #[cfg(debug_assertions)] println!("Step {} ran successfully", i); diff --git a/core/src/file/cas/identifier.rs b/core/src/file/cas/identifier.rs index d996cc833..b9635d80e 100644 --- a/core/src/file/cas/identifier.rs +++ b/core/src/file/cas/identifier.rs @@ -10,11 +10,11 @@ use crate::{ }; use anyhow::Result; use futures::executor::block_on; -use prisma_client_rust::Direction; +use prisma_client_rust::prisma_models::PrismaValue; +use prisma_client_rust::{raw, Direction}; use serde::{Deserialize, Serialize}; use super::checksum::generate_cas_id; - #[derive(Deserialize, Serialize, Debug)] pub struct FileCreated { pub id: i32, @@ -51,62 +51,74 @@ impl Job for FileIdentifierJob { let location_path = location.path.unwrap_or("".to_string()); let ctx = tokio::task::spawn_blocking(move || { - let mut completed: usize = 0; - let mut cursor: i32 = 1; + let completed: usize = 0; + let cursor: i32 = 1; - while completed < task_count { - let file_paths = block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)).unwrap(); - println!("Processing {:?} orphan files. ({} completed of {})", file_paths.len(), completed, task_count); + while completed < task_count { + let file_paths = block_on(get_orphan_file_paths(&ctx.core_ctx, cursor)).unwrap(); + println!( + "Processing {:?} orphan files. ({} completed of {})", + file_paths.len(), + completed, + task_count + ); - let mut rows: Vec = Vec::new(); - // only rows that have a valid cas_id to be inserted - for file_path in file_paths.iter() { - match prepare_file_values(&location_path, file_path) { - Ok(data) => { - rows.push(data); - } - Err(e) => { - - println!("Error processing file: {}", e); - continue; - } - }; - } - if rows.len() == 0 { - println!("No orphan files to process, finishing..."); - break; - } - let insert_files = format!( - r#"INSERT INTO files (cas_id, size_in_bytes) VALUES {} ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id"#, - rows.join(", ") - ); - - let files: Vec = block_on(db._query_raw(&insert_files)).unwrap(); + let mut rows = Vec::new(); + // only rows that have a valid cas_id to be inserted + for file_path in file_paths.iter() { + match prepare_file_values(&location_path, file_path) { + Ok(data) => { + rows.push(PrismaValue::List(data)); + } + Err(e) => { + println!("Error processing file: {}", e); + continue; + } + }; + } + if rows.len() == 0 { + println!("No orphan files to process, finishing..."); + break; + } - for file in files.iter() { - let update_file_path = format!( - r#"UPDATE file_paths SET file_id = "{}" WHERE temp_cas_id = "{}""#, - file.id, file.cas_id - ); - block_on(db._execute_raw(&update_file_path)).unwrap(); - } + panic!("temp_cas_id no longer exists. please fix this code!"); - let last_row = file_paths.last().unwrap(); + // let files: Vec = block_on(db._query_raw(raw!( + // &format!( + // "INSERT INTO files (cas_id, size_in_bytes) VALUES {} ON CONFLICT (cas_id) DO NOTHING RETURNING id, cas_id", + // vec!["({}, {}, {})"; rows.len()].join(",") + // ), + // PrismaValue::List(rows) + // ))).unwrap(); - cursor = last_row.id; - - completed += 1; - ctx.progress(vec![ - JobReportUpdate::CompletedTaskCount(completed), - JobReportUpdate::Message(format!( - "Processed {} of {} orphan files", - completed, - task_count - )), - ]); - } - ctx - }).await?; + // for file in files.iter() { + // block_on( + // db.file_path() + // .find_many(vec![file_path::temp_cas_id::equals(Some(file.cas_id.clone()))]) + // .update(vec![ + // file_path::id::set(file.id) + // ]) + // .exec() + // ).unwrap(); + // } + + // let last_row = file_paths.last().unwrap(); + + // cursor = last_row.id; + + // completed += 1; + // ctx.progress(vec![ + // JobReportUpdate::CompletedTaskCount(completed), + // JobReportUpdate::Message(format!( + // "Processed {} of {} orphan files", + // completed, + // task_count + // )), + // ]); + } + ctx + }) + .await?; let remaining = count_orphan_file_paths(&ctx.core_ctx).await?; @@ -131,11 +143,10 @@ struct CountRes { pub async fn count_orphan_file_paths(ctx: &CoreContext) -> Result { let db = &ctx.database; let files_count = db - ._query_raw::( - r#"SELECT COUNT(*) AS count FROM file_paths WHERE file_id IS NULL AND is_dir IS FALSE"#, - ) + ._query_raw::(raw!( + "SELECT COUNT(*) AS count FROM file_paths WHERE file_id IS NULL AND is_dir IS FALSE" + )) .await?; - println!("files: {:?}", files_count); Ok(files_count[0].count.unwrap_or(0)) } @@ -159,19 +170,21 @@ pub async fn get_orphan_file_paths( Ok(files) } -pub fn prepare_file_values(location_path: &str, file_path: &file_path::Data) -> Result { +pub fn prepare_file_values( + location_path: &str, + file_path: &file_path::Data, +) -> Result> { let path = format!("{}/{}", location_path, file_path.materialized_path); let metadata = fs::metadata(&path)?; let cas_id = { if !file_path.is_dir { - // TODO: remove unwrap - let mut x = generate_cas_id(&path, metadata.len()).unwrap(); - x.truncate(16); - x + let mut ret = generate_cas_id(&path, metadata.len()).unwrap(); + ret.truncate(16); + ret } else { "".to_string() } }; - // TODO: add all metadata - Ok(format!("(\"{}\",\"{}\")", cas_id, "0")) + + Ok(vec![PrismaValue::String(cas_id), PrismaValue::Int(0)]) } diff --git a/core/src/file/indexer/scan.rs b/core/src/file/indexer/scan.rs index 68cf2e135..97a51e28b 100644 --- a/core/src/file/indexer/scan.rs +++ b/core/src/file/indexer/scan.rs @@ -1,8 +1,9 @@ -use crate::file::cas::checksum::generate_cas_id; use crate::sys::locations::{create_location, LocationResource}; use crate::CoreContext; use anyhow::{anyhow, Result}; -use chrono::{DateTime, SecondsFormat, Utc}; +use prisma_client_rust::prisma_models::PrismaValue; +use prisma_client_rust::raw; +use prisma_client_rust::raw::Raw; use serde::{Deserialize, Serialize}; use std::ffi::OsStr; use std::{collections::HashMap, fs, path::Path, path::PathBuf, time::Instant}; @@ -35,7 +36,7 @@ pub async fn scan_path( } // grab the next id so we can increment in memory for batch inserting let first_file_id = match db - ._query_raw::(r#"SELECT MAX(id) id FROM file_paths"#) + ._query_raw::(raw!("SELECT MAX(id) id FROM file_paths")) .await { Ok(rows) => rows[0].id.unwrap_or(0), @@ -87,8 +88,8 @@ pub async fn scan_path( .unwrap_or(""); let parent_dir_id = dirs.get(&*parent_path); - let str = match path.as_os_str().to_str() { - Some(str) => str, + let path_str = match path.as_os_str().to_str() { + Some(path_str) => path_str, None => { println!("Error reading file {}", &path.display()); continue; @@ -96,7 +97,7 @@ pub async fn scan_path( }; on_progress(vec![ - ScanProgress::Message(format!("{}", str)), + ScanProgress::Message(format!("{}", path_str)), ScanProgress::ChunkCount(paths.len() / BATCH_SIZE), ]); @@ -128,18 +129,19 @@ pub async fn scan_path( on_progress(vec![ ScanProgress::SavedChunks(i as usize), ScanProgress::Message(format!( - "Writing {} of {} to library", + "Writing {} of {} to db", i * chunk.len(), paths.len(), )), ]); // vector to store active models - let mut files: Vec = Vec::new(); + let mut files: Vec = Vec::new(); + for (file_path, file_id, parent_dir_id, is_dir) in chunk { - files.push( + files.extend( match prepare_values(&file_path, *file_id, &location, parent_dir_id, *is_dir) { - Ok(file) => file, + Ok(values) => values.to_vec(), Err(e) => { println!("Error creating file model from path {:?}: {}", file_path, e); continue; @@ -147,15 +149,19 @@ pub async fn scan_path( }, ); } - let raw_sql = format!( - r#" - INSERT INTO file_paths (id, is_dir, location_id, materialized_path, name, extension, parent_id) - VALUES {} - "#, - files.join(", ") + + let raw = Raw::new( + &format!(" + INSERT INTO file_paths (id, is_dir, location_id, materialized_path, name, extension, parent_id) + VALUES {} + ", + vec!["({}, {}, {}, {}, {}, {}, {})"; chunk.len()].join(", ") + ), + files ); - // println!("{}", raw_sql); - let count = db._execute_raw(&raw_sql).await; + + let count = db._execute_raw(raw).await; + println!("Inserted {:?} records", count); } println!( @@ -175,8 +181,8 @@ fn prepare_values( location: &LocationResource, parent_id: &Option, is_dir: bool, -) -> Result { - // let metadata = fs::metadata(&file_path)?; +) -> Result<[PrismaValue; 7]> { + let metadata = fs::metadata(&file_path)?; let location_path = location.path.as_ref().unwrap().as_str(); // let size = metadata.len(); let name; @@ -203,37 +209,18 @@ fn prepare_values( None => return Err(anyhow!("{}", file_path.to_str().unwrap_or_default())), }; - // let cas_id = { - // if !metadata.is_dir() { - // // TODO: remove unwrap, skip and make sure to continue loop - // let mut x = generate_cas_id(&file_path.to_str().unwrap(), metadata.len()).unwrap(); - // x.truncate(16); - // x - // } else { - // "".to_string() - // } - // }; - - // let date_created: DateTime = metadata.created().unwrap().into(); - // let parsed_date_created = date_created.to_rfc3339_opts(SecondsFormat::Millis, true); - - let values = format!( - "({}, {}, {}, \"{}\", \"{}\", \"{}\", {})", - id, - is_dir, - location.id, - materialized_path, - name, - extension.to_lowercase(), + let values = [ + PrismaValue::Int(id as i64), + PrismaValue::Boolean(metadata.is_dir()), + PrismaValue::Int(location.id as i64), + PrismaValue::String(materialized_path.to_string()), + PrismaValue::String(name), + PrismaValue::String(extension.to_lowercase()), parent_id .clone() - .map(|id| format!("\"{}\"", &id)) - .unwrap_or("NULL".to_string()), - // parsed_date_created, - // cas_id - ); - - println!("{}", values); + .map(|id| PrismaValue::Int(id as i64)) + .unwrap_or(PrismaValue::Null), + ]; Ok(values) } diff --git a/core/src/file/mod.rs b/core/src/file/mod.rs index 43f612eda..a0dff8355 100644 --- a/core/src/file/mod.rs +++ b/core/src/file/mod.rs @@ -4,7 +4,6 @@ use thiserror::Error; use ts_rs::TS; use crate::{ - crypto::encryption::EncryptionAlgorithm, prisma::{self, file, file_path}, sys::SysError, }; @@ -91,7 +90,7 @@ impl Into for file::Data { integrity_checksum: self.integrity_checksum, kind: IntEnum::from_int(self.kind).unwrap(), size_in_bytes: self.size_in_bytes.to_string(), - // encryption: EncryptionAlgorithm::from_int(self.encryption).unwrap(), + // encryption: EncryptionAlgorithm::from_int(self.encryption).unwrap(), ipfs_id: self.ipfs_id, hidden: self.hidden, favorite: self.favorite, @@ -100,9 +99,9 @@ impl Into for file::Data { has_thumbstrip: self.has_thumbstrip, has_video_preview: self.has_video_preview, comment: self.comment, - date_created: self.date_created, - date_modified: self.date_modified, - date_indexed: self.date_indexed, + date_created: self.date_created.into(), + date_modified: self.date_modified.into(), + date_indexed: self.date_indexed.into(), paths: vec![], } } @@ -117,14 +116,14 @@ impl Into for file_path::Data { file_id: self.file_id, parent_id: self.parent_id, location_id: self.location_id, - date_indexed: self.date_indexed, - // permissions: self.permissions, + date_indexed: self.date_indexed.into(), + // permissions: self.permissions, has_local_thumbnail: false, name: self.name, extension: self.extension, - // temp_cas_id: self.temp_cas_id, - date_created: self.date_created, - date_modified: self.date_modified, + // temp_cas_id: self.temp_cas_id, + date_created: self.date_created.into(), + date_modified: self.date_modified.into(), } } } diff --git a/core/src/job/jobs.rs b/core/src/job/jobs.rs index 85d0d8969..fe6df38ab 100644 --- a/core/src/job/jobs.rs +++ b/core/src/job/jobs.rs @@ -52,7 +52,7 @@ impl Jobs { self.job_queue.push(job); } } - pub fn ingest_queue(&mut self, ctx: &CoreContext, job: Box) { + pub fn ingest_queue(&mut self, _ctx: &CoreContext, job: Box) { self.job_queue.push(job); } pub async fn complete(&mut self, ctx: &CoreContext, job_id: String) { @@ -124,8 +124,8 @@ impl Into for job::Data { status: JobStatus::from_int(self.status).unwrap(), task_count: self.task_count, completed_task_count: self.completed_task_count, - date_created: self.date_created, - date_modified: self.date_modified, + date_created: self.date_created.into(), + date_modified: self.date_modified.into(), message: String::new(), seconds_elapsed: self.seconds_elapsed, } @@ -170,7 +170,7 @@ impl JobReport { job::status::set(self.status.int_value()), job::task_count::set(self.task_count), job::completed_task_count::set(self.completed_task_count), - job::date_modified::set(chrono::Utc::now()), + job::date_modified::set(chrono::Utc::now().into()), job::seconds_elapsed::set(self.seconds_elapsed), ]) .exec() diff --git a/core/src/library/statistics.rs b/core/src/library/statistics.rs index 3b32d6ae9..5e02ec991 100644 --- a/core/src/library/statistics.rs +++ b/core/src/library/statistics.rs @@ -1,7 +1,7 @@ use crate::{ node::state, prisma::{library, library_statistics::*}, - sys::{self, volumes::Volume}, + sys::volumes::Volume, CoreContext, }; use fs_extra::dir::get_size; @@ -69,6 +69,7 @@ impl Statistics { }; Ok(library_statistics_db.into()) } + pub async fn calculate(ctx: &CoreContext) -> Result { let config = state::get(); let db = &ctx.database; @@ -135,20 +136,22 @@ impl Statistics { }; db.library_statistics() - .upsert(library_id::equals(library_local_id)) - .create( - library_id::set(library_local_id), - vec![library_db_size::set(statistics.library_db_size.clone())], + .upsert( + library_id::equals(library_local_id), + ( + library_id::set(library_local_id), + vec![library_db_size::set(statistics.library_db_size.clone())], + ), + vec![ + total_file_count::set(statistics.total_file_count.clone()), + total_bytes_used::set(statistics.total_bytes_used.clone()), + total_bytes_capacity::set(statistics.total_bytes_capacity.clone()), + total_bytes_free::set(statistics.total_bytes_free.clone()), + total_unique_bytes::set(statistics.total_unique_bytes.clone()), + preview_media_bytes::set(statistics.preview_media_bytes.clone()), + library_db_size::set(statistics.library_db_size.clone()), + ], ) - .update(vec![ - total_file_count::set(statistics.total_file_count.clone()), - total_bytes_used::set(statistics.total_bytes_used.clone()), - total_bytes_capacity::set(statistics.total_bytes_capacity.clone()), - total_bytes_free::set(statistics.total_bytes_free.clone()), - total_unique_bytes::set(statistics.total_unique_bytes.clone()), - preview_media_bytes::set(statistics.preview_media_bytes.clone()), - library_db_size::set(statistics.library_db_size.clone()), - ]) .exec() .await?; diff --git a/core/src/sys/locations.rs b/core/src/sys/locations.rs index 1179a8c27..f83c11b30 100644 --- a/core/src/sys/locations.rs +++ b/core/src/sys/locations.rs @@ -33,7 +33,7 @@ impl Into for location::Data { available_capacity: self.available_capacity, is_removable: self.is_removable, is_online: self.is_online, - date_created: self.date_created, + date_created: self.date_created.into(), } } } diff --git a/core/src/sys/volumes.rs b/core/src/sys/volumes.rs index bd0061574..2b7ca2cd4 100644 --- a/core/src/sys/volumes.rs +++ b/core/src/sys/volumes.rs @@ -35,28 +35,30 @@ impl Volume { // enter all volumes associate with this client add to db for volume in volumes { db.volume() - .upsert(node_id_mount_point_name( - config.node_id.clone(), - volume.mount_point.to_string(), - volume.name.to_string(), - )) - .create( - node_id::set(config.node_id), - name::set(volume.name), - mount_point::set(volume.mount_point), + .upsert( + node_id_mount_point_name( + config.node_id.clone(), + volume.mount_point.to_string(), + volume.name.to_string(), + ), + ( + node_id::set(config.node_id), + name::set(volume.name), + mount_point::set(volume.mount_point), + vec![ + disk_type::set(volume.disk_type.clone()), + filesystem::set(volume.file_system.clone()), + total_bytes_capacity::set(volume.total_capacity.to_string()), + total_bytes_available::set(volume.available_capacity.to_string()), + ], + ), vec![ - disk_type::set(volume.disk_type.clone()), - filesystem::set(volume.file_system.clone()), + disk_type::set(volume.disk_type), + filesystem::set(volume.file_system), total_bytes_capacity::set(volume.total_capacity.to_string()), total_bytes_available::set(volume.available_capacity.to_string()), ], ) - .update(vec![ - disk_type::set(volume.disk_type), - filesystem::set(volume.file_system), - total_bytes_capacity::set(volume.total_capacity.to_string()), - total_bytes_available::set(volume.available_capacity.to_string()), - ]) .exec() .await?; }