[ENG-1809] Make core more resilient to crashes (#2574)

* Catching panics

* Re-enable unwind on release profile

* Reverting deps due to bad assertion

* Comment why the deps where reverted

---------

Co-authored-by: Vítor Vasconcellos <vasconcellos.dev@gmail.com>
This commit is contained in:
Ericson "Fogo" Soares
2024-06-30 00:59:01 -03:00
committed by GitHub
parent c419c7969a
commit 4eadb0de25
8 changed files with 114 additions and 46 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -36,7 +36,7 @@ gix-ignore = "0.11.2"
globset = "0.4.14"
http = "0.2" # Update blocked by axum
hyper = "0.14" # Update blocked due to API breaking changes
image = "0.25.1"
image = "0.24.9" # Update blocked due to https://github.com/image-rs/image/issues/2230
itertools = "0.13.0"
lending-stream = "1.0"
libc = "0.2"
@@ -66,7 +66,7 @@ tracing-subscriber = "0.3.18"
tracing-test = "0.2.5"
uhlc = "0.6.0" # Must follow version used by specta
uuid = "1.8"
webp = "0.3.0"
webp = "0.2.6" # Update blocked by image
[workspace.dependencies.prisma-client-rust]
git = "https://github.com/brendonovich/prisma-client-rust"
@@ -101,8 +101,6 @@ libp2p-stream = { git = "https://github.com/spacedriveapp/rust-libp2p.git", rev
blake3 = { git = "https://github.com/spacedriveapp/blake3.git", rev = "d3aab416c12a75c2bfabce33bcd594e428a79069" }
# Due to image crate version bump
pdfium-render = { git = "https://github.com/fogodev/pdfium-render.git", rev = "e7aa1111f441c49e857cebda15b4e51b24356aaa" }
[profile.dev]
# Make compilation faster on macOS
@@ -143,7 +141,7 @@ incremental = false
# Optimize release builds
[profile.release]
panic = "abort" # Strip expensive panic clean-up logic
panic = "unwind" # Sadly we need unwind to avoid unexpected crashes on third party crates
codegen-units = 1 # Compile crates one after another so the compiler can optimize better
lto = true # Enables link to optimizations
opt-level = "s" # Optimize for binary size

View File

@@ -30,6 +30,9 @@ pub enum JobSystemError {
#[error(transparent)]
Report(#[from] ReportError),
#[error("internal job panic! <id='{0}'>")]
Panic(JobId),
}
impl From<JobSystemError> for rspc::Error {

View File

@@ -13,6 +13,7 @@ use std::{
hash::{Hash, Hasher},
marker::PhantomData,
ops::{Deref, DerefMut},
panic::AssertUnwindSafe,
path::Path,
pin::pin,
sync::Arc,
@@ -21,7 +22,7 @@ use std::{
use async_channel as chan;
use chrono::{DateTime, Utc};
use futures::{stream, Future, StreamExt};
use futures::{stream, Future, FutureExt, StreamExt};
use futures_concurrency::{
future::{Join, TryJoin},
stream::Merge,
@@ -750,15 +751,29 @@ where
trace!("Dispatching job");
spawn(to_spawn_job::<OuterCtx, _, _>(
self.id,
self.job,
ctx.clone(),
None,
base_dispatcher,
commands_rx,
done_tx,
));
spawn({
let id = self.id;
let job = self.job;
let ctx = ctx.clone();
async move {
if AssertUnwindSafe(to_spawn_job::<OuterCtx, _, _>(
id,
job,
ctx,
None,
base_dispatcher,
commands_rx,
done_tx,
))
.catch_unwind()
.await
.is_err()
{
error!("job panicked");
}
}
});
JobHandle {
id: self.id,
@@ -791,15 +806,29 @@ where
trace!("Resuming job");
spawn(to_spawn_job::<OuterCtx, _, _>(
self.id,
self.job,
ctx.clone(),
serialized_tasks,
base_dispatcher,
commands_rx,
done_tx,
));
spawn({
let id = self.id;
let job = self.job;
let ctx = ctx.clone();
async move {
if AssertUnwindSafe(to_spawn_job::<OuterCtx, _, _>(
id,
job,
ctx,
serialized_tasks,
base_dispatcher,
commands_rx,
done_tx,
))
.catch_unwind()
.await
.is_err()
{
error!("job panicked");
}
}
});
JobHandle {
id: self.id,
@@ -855,9 +884,14 @@ async fn to_spawn_job<OuterCtx, JobCtx, J>(
spawn(
async move {
tx.send(job.run::<OuterCtx>(dispatcher, ctx).await)
.await
.expect("job run channel closed");
tx.send(
AssertUnwindSafe(job.run::<OuterCtx>(dispatcher, ctx))
.catch_unwind()
.await
.unwrap_or(Err(Error::JobSystem(JobSystemError::Panic(job_id)))),
)
.await
.expect("job run channel closed");
}
.in_current_span(),
);

View File

@@ -14,6 +14,7 @@ use sd_file_ext::extensions::{VideoExtension, ALL_VIDEO_EXTENSIONS};
use std::{
ops::Deref,
panic,
path::{Path, PathBuf},
str::FromStr,
time::Duration,
@@ -313,9 +314,9 @@ pub async fn generate_thumbnail(
}
fn inner_generate_image_thumbnail(
file_path: PathBuf,
file_path: &PathBuf,
) -> Result<Vec<u8>, thumbnailer::NonCriticalThumbnailerError> {
let mut img = format_image(&file_path).map_err(|e| {
let mut img = format_image(file_path).map_err(|e| {
thumbnailer::NonCriticalThumbnailerError::FormatImage(file_path.clone(), e.to_string())
})?;
@@ -336,7 +337,7 @@ fn inner_generate_image_thumbnail(
// this corrects the rotation/flip of the image based on the *available* exif data
// not all images have exif data, so we don't error. we also don't rotate HEIF as that's against the spec
if let Some(orientation) = Orientation::from_path(&file_path) {
if let Some(orientation) = Orientation::from_path(file_path) {
if ConvertibleExtension::try_from(file_path.as_ref())
.expect("we already checked if the image was convertible")
.should_rotate()
@@ -347,7 +348,10 @@ fn inner_generate_image_thumbnail(
// Create the WebP encoder for the above image
let encoder = Encoder::from_image(&img).map_err(|reason| {
thumbnailer::NonCriticalThumbnailerError::WebPEncoding(file_path, reason.to_string())
thumbnailer::NonCriticalThumbnailerError::WebPEncoding(
file_path.clone(),
reason.to_string(),
)
})?;
// Type `WebPMemory` is !Send, which makes the `Future` in this function `!Send`,
@@ -378,7 +382,19 @@ async fn generate_image_thumbnail(
move || {
// Handling error on receiver side
let _ = tx.send(inner_generate_image_thumbnail(file_path));
let _ = tx.send(
panic::catch_unwind(|| inner_generate_image_thumbnail(&file_path)).unwrap_or_else(
move |_| {
Err(
thumbnailer::NonCriticalThumbnailerError::PanicWhileGeneratingThumbnail(
file_path,
"Internal panic on third party crate".to_string(),
),
)
},
),
);
}
});

View File

@@ -44,7 +44,7 @@ impl<E: RunError> System<E> {
let workers_count = usize::max(
std::thread::available_parallelism().map_or_else(
|e| {
error!("Failed to get available parallelism in the job system: {e:#?}");
error!(?e, "Failed to get available parallelism in the job system");
1
},
NonZeroUsize::get,

View File

@@ -1,6 +1,8 @@
use std::{
any::Any,
collections::{HashMap, VecDeque},
future::pending,
panic::AssertUnwindSafe,
pin::pin,
sync::{
atomic::{AtomicBool, Ordering},
@@ -53,7 +55,7 @@ struct AbortAndSuspendSignalers {
struct RunningTask {
id: TaskId,
kind: PendingTaskKind,
handle: JoinHandle<()>,
handle: JoinHandle<Result<(), Box<dyn Any + Send>>>,
}
enum WaitingSuspendedTask {
@@ -150,7 +152,7 @@ impl<E: RunError> Runner<E> {
&mut self,
task_id: TaskId,
task_work_state: TaskWorkState<E>,
) -> JoinHandle<()> {
) -> JoinHandle<Result<(), Box<dyn Any + Send>>> {
let (abort_tx, abort_rx) = oneshot::channel();
let (suspend_tx, suspend_rx) = oneshot::channel();
@@ -163,13 +165,16 @@ impl<E: RunError> Runner<E> {
);
let handle = spawn(
run_single_task(
task_work_state,
self.task_output_tx.clone(),
suspend_rx,
abort_rx,
AssertUnwindSafe(
run_single_task(
task_work_state,
self.task_output_tx.clone(),
suspend_rx,
abort_rx,
)
.in_current_span(),
)
.in_current_span(),
.catch_unwind(),
);
trace!("Task runner spawned");
@@ -624,8 +629,14 @@ impl<E: RunError> Runner<E> {
}
}
if let Err(e) = handle.await {
error!(%task_id, ?e, "Task failed to join");
match handle.await {
Ok(Ok(())) => { /* Everything is Awesome! */ }
Ok(Err(_)) => {
error!(%task_id, "Task panicked");
}
Err(e) => {
error!(%task_id, ?e, "Task failed to join");
}
}
stolen_task_tx.close();
@@ -806,8 +817,14 @@ impl<E: RunError> Runner<E> {
assert_eq!(*finished_task_id, old_task_id, "Task output id mismatch"); // Sanity check
if let Err(e) = handle.await {
error!(?e, "Task failed to join");
match handle.await {
Ok(Ok(())) => { /* Everything is Awesome! */ }
Ok(Err(_)) => {
error!("Task panicked");
}
Err(e) => {
error!(?e, "Task failed to join");
}
}
if let Some((next_task_kind, task_work_state)) = self.get_next_task() {

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "1.78"
channel = "1.79"