From 9fd809737693d92f73bb97ff9cf65ea525e0d901 Mon Sep 17 00:00:00 2001 From: Jamie Pine <32987599+jamiepine@users.noreply.github.com> Date: Mon, 28 Mar 2022 02:20:43 -0700 Subject: [PATCH] added percentage to job report --- packages/core/src/job/jobs.rs | 4 +- packages/core/src/job/mod.rs | 207 -------------------------------- packages/core/src/job/worker.rs | 4 + packages/core/src/lib.rs | 1 - 4 files changed, 7 insertions(+), 209 deletions(-) diff --git a/packages/core/src/job/jobs.rs b/packages/core/src/job/jobs.rs index be22ba56d..4b5eedc14 100644 --- a/packages/core/src/job/jobs.rs +++ b/packages/core/src/job/jobs.rs @@ -99,7 +99,7 @@ pub struct JobReport { pub completed_task_count: i64, pub message: String, - // pub percentage_complete: f64, + pub percentage_complete: f64, #[ts(type = "string")] pub seconds_elapsed: i64, } @@ -116,6 +116,7 @@ impl Into for JobData { date_created: self.date_created, date_modified: self.date_modified, message: String::new(), + percentage_complete: 0.0, seconds_elapsed: self.seconds_elapsed, } } @@ -131,6 +132,7 @@ impl JobReport { status: JobStatus::Queued, task_count: 0, completed_task_count: 0, + percentage_complete: 0.0, message: String::new(), seconds_elapsed: 0, } diff --git a/packages/core/src/job/mod.rs b/packages/core/src/job/mod.rs index 3ee6a7b42..ee0938755 100644 --- a/packages/core/src/job/mod.rs +++ b/packages/core/src/job/mod.rs @@ -13,210 +13,3 @@ pub enum JobError { #[error("Database error")] DatabaseError(#[from] db::DatabaseError), } - -// pub struct JobContext { -// pub core_ctx: CoreContext, -// pub job_data: JobReport, -// } - -// #[derive(Debug)] -// pub enum JobCommand { -// Create(Box), -// Update { id: i64, data: JobUpdateEvent }, -// Completed { id: i64 }, -// } - -// #[derive(Debug)] -// pub struct JobUpdateEvent { -// pub task_count: Option, -// pub completed_task_count: Option, -// pub message: Option, -// } - -// // a struct to handle the runtime and execution of jobs -// pub struct Jobs { -// pub job_sender_channel: Sender, -// pub running_job: Mutex>, -// } - -// impl Jobs { -// pub fn new() -> (Self, mpsc::Receiver) { -// let (job_sender, job_receiver) = mpsc::channel(100); -// ( -// Self { -// job_sender_channel: job_sender, -// running_job: Mutex::new(None), -// }, -// job_receiver, -// ) -// } - -// pub fn start(&self, ctx: CoreContext, mut job_receiver: mpsc::Receiver) { -// // open a thread to handle job execution -// tokio::spawn(async move { -// // local memory for job queue -// let mut queued_jobs: Vec<(Box, JobReport)> = vec![]; - -// loop { -// tokio::select! { -// // when job is received via message channel -// Some(request) = job_receiver.recv() => { -// match request { -// // create a new job -// JobCommand::Create(job) => { -// // create job report and save to database -// let mut report = JobReport::new(); -// println!("Creating job: {:?} Metadata: {:?}", &job, &report); -// report.create(&ctx).await; -// // queue the job -// queued_jobs.push((job, report)); - -// let current_running_job = self.running_job.lock().await; - -// if current_running_job.is_none() { -// // replace the running job mutex with this job -// let (current_job, current_report) = queued_jobs.pop().unwrap(); -// current_running_job.replace(current_report); -// // push job id into running jobs vector -// let id = report.id; -// let ctx = ctx.clone(); - -// // open a dedicated blocking thread to run job -// tokio::task::spawn_blocking(move || { -// // asynchronously call run method -// let handle = tokio::runtime::Handle::current(); -// let job_sender = ctx.job_sender.clone(); - -// handle.block_on(current_report.update(&ctx, None, Some(JobStatus::Running))).unwrap(); -// handle.block_on(job.run(JobContext { core_ctx: ctx.clone(), job_data: current_report.clone() })).unwrap(); - -// job_sender.send(JobCommand::Completed { id }).unwrap(); - -// }); -// } -// } -// // update a running job -// JobCommand::Update { id, data } => { -// let ctx = ctx.clone(); -// // find running job in memory by id -// let running_job = get_job(&id).unwrap_or_else(|| panic!("Job not found")); -// // update job data -// running_job.update(&ctx, Some(data), None).await.unwrap(); -// // emit event to invalidate client cache -// ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning)).await; -// }, -// JobCommand::Completed { id } => { -// let ctx = ctx.clone(); -// let running_job = get_job(&id).unwrap_or_else(|| panic!("Job not found")); -// running_job.update(&ctx, None, Some(JobStatus::Completed)).await.unwrap(); -// ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetRunning)).await; -// ctx.emit(CoreEvent::InvalidateQuery(ClientQuery::JobGetHistory)).await; - -// } -// } -// } -// } -// } -// }); -// } - -// pub async fn handle_job_command(&mut self, job: JobCommand) { -// self.job_sender_channel.send(job).await.unwrap_or(()); -// } -// } - -// impl JobReport { -// pub fn new() -> Self { -// Self { -// id: 0, -// // client_id: 0, -// date_created: chrono::Utc::now(), -// date_modified: chrono::Utc::now(), -// status: JobStatus::Queued, -// task_count: 0, -// completed_task_count: 0, -// message: String::new(), -// } -// } -// pub async fn create(&mut self, ctx: &CoreContext) { -// // let config = client::get(); -// let job = ctx -// .database -// .job() -// .create_one( -// prisma::Job::action().set(1), -// // prisma::Job::clients().link(prisma::Client::id().equals(config.client_uuid)), -// vec![], -// ) -// .exec() -// .await; -// self.id = job.id; -// } -// pub async fn update( -// &mut self, -// ctx: &CoreContext, -// changes: Option, -// status: Option, -// ) -> Result<()> { -// match changes { -// Some(changes) => { -// if changes.task_count.is_some() { -// self.task_count = changes.task_count.unwrap(); -// } -// if changes.completed_task_count.is_some() { -// self.completed_task_count = changes.completed_task_count.unwrap(); -// } -// if changes.message.is_some() { -// self.message = changes.message.unwrap(); -// } -// }, -// None => {}, -// } -// if status.is_some() { -// self.status = status.unwrap(); - -// if self.status == JobStatus::Completed { -// ctx.database -// .job() -// .find_unique(prisma::Job::id().equals(self.id)) -// .update(vec![ -// prisma::Job::status().set(self.status.int_value()), -// prisma::Job::task_count().set(self.task_count), -// prisma::Job::completed_task_count().set(self.completed_task_count), -// prisma::Job::date_modified().set(chrono::Utc::now()), -// ]) -// .exec() -// .await; -// } -// } -// println!("JOB REPORT: {:?}", self); - -// Ok(()) -// } - -// pub async fn get_running(ctx: &CoreContext) -> Result, JobError> { -// let db = &ctx.database; -// let jobs = db -// .job() -// .find_many(vec![prisma::Job::status().equals(JobStatus::Running.int_value())]) -// .exec() -// .await; - -// Ok(jobs.into_iter().map(|j| j.into()).collect()) -// } - -// pub async fn get_history(ctx: &CoreContext) -> Result, JobError> { -// let db = &ctx.database; -// let jobs = db -// .job() -// .find_many(vec![or(vec![ -// prisma::Job::status().equals(JobStatus::Completed.int_value()), -// prisma::Job::status().equals(JobStatus::Canceled.int_value()), -// prisma::Job::status().equals(JobStatus::Queued.int_value()), -// ])]) -// .exec() -// .await; - -// Ok(jobs.into_iter().map(|j| j.into()).collect()) -// } -// } diff --git a/packages/core/src/job/worker.rs b/packages/core/src/job/worker.rs index 966c3c93b..668c69e5a 100644 --- a/packages/core/src/job/worker.rs +++ b/packages/core/src/job/worker.rs @@ -147,6 +147,9 @@ impl Worker { JobReportUpdate::CompletedTaskCount(completed_task_count) => { worker.job_report.completed_task_count = completed_task_count as i64; + worker.job_report.percentage_complete = + (worker.job_report.completed_task_count as f64 + / worker.job_report.task_count as f64) * 100.0; }, JobReportUpdate::Message(message) => { worker.job_report.message = message; @@ -155,6 +158,7 @@ impl Worker { worker.job_report.seconds_elapsed = seconds as i64; }, } + worker.job_report.date_modified = chrono::Utc::now(); } ctx.emit(CoreEvent::InvalidateQueryDebounced( ClientQuery::JobGetRunning, diff --git a/packages/core/src/lib.rs b/packages/core/src/lib.rs index 4d2bb4130..e900e0ad6 100644 --- a/packages/core/src/lib.rs +++ b/packages/core/src/lib.rs @@ -90,7 +90,6 @@ impl CoreContext { }); } pub async fn emit(&self, event: CoreEvent) { - println!("emitting event {:?}", event); self.event_sender.send(event).await.unwrap_or_else(|e| { error!("Failed to emit event. {:?}", e); });