[ENG-1055] Fix previewing large files (#1304)

* Proper streaming from disk

* Handle `If-Range`

* fix bad merge

---------

Co-authored-by: Brendan Allan <brendonovich@outlook.com>
This commit is contained in:
Oscar Beaumont
2023-09-06 16:06:29 +08:00
committed by GitHub
parent 5a0d0396d0
commit 65ea570d0b
4 changed files with 112 additions and 23 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -1,4 +1,8 @@
use std::{io, net::TcpListener, sync::Arc};
use std::{
io,
net::{Ipv4Addr, TcpListener},
sync::Arc,
};
use axum::{
extract::{Query, State, TypedHeader},
@@ -45,8 +49,13 @@ pub fn sd_server_plugin<R: Runtime>(node: Arc<Node>) -> io::Result<TauriPlugin<R
))
.fallback(|| async { "404 Not Found: We're past the event horizon..." });
// Only allow current device to access it and randomise port
let listener = TcpListener::bind("127.0.0.1:0")?;
let port = std::env::var("SD_PORT")
.ok()
.and_then(|port| port.parse().ok())
.unwrap_or(0); // randomise port
// Only allow current device to access it
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port))?;
let listen_addr = listener.local_addr()?;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);

View File

@@ -105,6 +105,7 @@ tar = "0.4.40"
tempfile = "^3.5.0"
axum = "0.6.20"
http-body = "0.4.5"
pin-project-lite = "0.2.13"
bytes = "1.4.0"
[target.'cfg(target_os = "macos")'.dependencies]

View File

@@ -23,21 +23,23 @@ use std::{
use async_stream::stream;
use axum::{
body::{self, Body, BoxBody, Full, StreamBody},
body::{self, Body, BoxBody, Full, HttpBody, StreamBody},
extract::{self, State},
http::{self, request, HeaderValue, Method, Request, Response, StatusCode},
http::{self, header, request, HeaderMap, HeaderValue, Method, Request, Response, StatusCode},
middleware::{self, Next},
routing::get,
Router,
};
use bytes::Bytes;
use futures::Stream;
use http_range::HttpRange;
use mini_moka::sync::Cache;
use pin_project_lite::pin_project;
use sd_file_ext::text::is_text;
use sd_p2p::{spaceblock::Range, spacetunnel::RemoteIdentity};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt, AsyncWrite},
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWrite, Take},
};
use tokio_util::{io::ReaderStream, sync::PollSender};
use tracing::{debug, error};
@@ -55,6 +57,9 @@ struct CacheValue {
const MAX_TEXT_READ_LENGTH: usize = 10 * 1024; // 10KB
// default capacity 64KiB
const DEFAULT_CAPACITY: usize = 65536;
#[derive(Debug, Clone)]
pub enum ServeFrom {
/// Serve from the local filesystem
@@ -315,7 +320,13 @@ async fn serve_file(
if let Ok(metadata) = metadata {
// We only accept range queries if `files.metadata() == Ok(_)`
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept-Ranges
resp = resp.header("Accept-Ranges", HeaderValue::from_static("bytes"));
resp = resp
.header("Accept-Ranges", HeaderValue::from_static("bytes"))
.header(
"Content-Length",
HeaderValue::from_str(&metadata.len().to_string())
.expect("number won't fail conversion"),
);
// Empty files
if metadata.len() == 0 {
@@ -326,6 +337,7 @@ async fn serve_file(
}
// ETag
let mut status_code = StatusCode::PARTIAL_CONTENT;
if let Ok(time) = metadata.modified() {
let etag_header = format!(
r#""{}""#,
@@ -342,6 +354,7 @@ async fn serve_file(
error!("Failed to convert ETag into header value!");
}
// Used for normal requests
if let Some(etag) = req.headers.get("If-None-Match") {
if etag.as_bytes() == etag_header.as_bytes() {
return Ok(resp
@@ -349,7 +362,15 @@ async fn serve_file(
.body(body::boxed(Full::from(""))));
}
}
}
// Used checking if the resource has been modified since starting the download
if let Some(if_range) = req.headers.get("If-Range") {
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-Range
if if_range.as_bytes() != etag_header.as_bytes() {
status_code = StatusCode::OK
}
}
};
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
if req.method == Method::GET {
@@ -360,23 +381,34 @@ async fn serve_file(
// TODO: Multipart requests are not support, yet
if ranges.len() != 1 {
todo!(); // TODO: Error handling
return Ok(resp
.header(
header::CONTENT_RANGE,
HeaderValue::from_str(&format!("bytes */{}", metadata.len()))
.map_err(internal_server_error)?,
)
.status(StatusCode::RANGE_NOT_SATISFIABLE)
.body(body::boxed(Full::from(""))));
}
let range = ranges.first().expect("checked above");
if (range.start + range.length) > metadata.len() {
return Ok(resp
.header(
header::CONTENT_RANGE,
HeaderValue::from_str(&format!("bytes */{}", metadata.len()))
.map_err(internal_server_error)?,
)
.status(StatusCode::RANGE_NOT_SATISFIABLE)
.body(body::boxed(Full::from(""))));
}
file.seek(SeekFrom::Start(range.start))
.await
.map_err(internal_server_error)?;
// TODO: Serve using streaming body instead of loading the entire chunk. - Right now my impl is not working correctly
let mut buf = Vec::with_capacity(range.length as usize);
file.take(range.length)
.read_to_end(&mut buf)
.await
.map_err(internal_server_error)?;
return Ok(resp
.status(StatusCode::PARTIAL_CONTENT)
.status(status_code)
.header(
"Content-Range",
HeaderValue::from_str(&format!(
@@ -392,12 +424,11 @@ async fn serve_file(
HeaderValue::from_str(&range.length.to_string())
.map_err(internal_server_error)?,
)
.body(body::boxed(Full::from(buf))));
// TODO: Serve as stream instead of fixed set of bytes -> Show allow only loading part in the chunk into memory at a time. This will also be probs be required or P2P over custom URI.
// .body(body::boxed(Limited::new(
// StreamBody::new(ReaderStream::new(file)),
// range.length.try_into().expect("integer overflow"),
// )));
.body(body::boxed(AsyncReadBody::with_capacity_limited(
file,
DEFAULT_CAPACITY,
range.length,
))));
}
}
}
@@ -534,6 +565,54 @@ async fn plz_for_the_love_of_all_that_is_good_replace_this_with_the_db_instead_o
})
}
// This code was taken from: https://github.com/tower-rs/tower-http/blob/e8eb54966604ea7fa574a2a25e55232f5cfe675b/tower-http/src/services/fs/mod.rs#L30
pin_project! {
// NOTE: This could potentially be upstreamed to `http-body`.
/// Adapter that turns an [`impl AsyncRead`][tokio::io::AsyncRead] to an [`impl Body`][http_body::Body].
#[derive(Debug)]
pub struct AsyncReadBody<T> {
#[pin]
reader: ReaderStream<T>,
}
}
impl<T> AsyncReadBody<T>
where
T: AsyncRead,
{
fn with_capacity_limited(
read: T,
capacity: usize,
max_read_bytes: u64,
) -> AsyncReadBody<Take<T>> {
AsyncReadBody {
reader: ReaderStream::with_capacity(read.take(max_read_bytes), capacity),
}
}
}
impl<T> HttpBody for AsyncReadBody<T>
where
T: AsyncRead,
{
type Data = Bytes;
type Error = io::Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
self.project().reader.poll_next(cx)
}
fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
/// Allowing wrapping an `mpsc::Sender` into an `AsyncWrite`
pub struct MpscToAsyncWrite(PollSender<io::Result<Bytes>>);