Compare commits

...

13 Commits

Author SHA1 Message Date
Gregory Schier
3e3ca675e6 PR fixes 2025-12-22 09:18:02 -08:00
Gregory Schier
6e9d3cabf9 Fix empty content length 2025-12-22 09:00:45 -08:00
Gregory Schier
fe40d6ef56 Better response error tracking 2025-12-22 08:58:51 -08:00
Gregory Schier
6b2443fc60 Mark bindings as generated 2025-12-22 08:17:42 -08:00
Gregory Schier
ff80ca34d0 Gen models again 2025-12-22 08:09:40 -08:00
Gregory Schier
303cddbd86 Gen models 2025-12-22 08:09:26 -08:00
Gregory Schier
9824fb9d23 Imports 2025-12-22 08:08:01 -08:00
Gregory Schier
e974db0293 Block large request bodies from showing 2025-12-22 08:05:17 -08:00
Gregory Schier
07a9d03ffb Remove log and tweak 2025-12-22 07:46:33 -08:00
Gregory Schier
11f4604758 Some minor fixes 2025-12-22 07:42:29 -08:00
Gregory Schier
b1e0cfec91 Show sent request body 2025-12-22 07:15:58 -08:00
Gregory Schier
7828af7522 Capture and store request bodies in blob DB
- Add TeeReader to fork body streams while sending
- Wrap request body with TeeReader in execute_transaction
- Buffer and write 1MB chunks to blob DB with body_id format {response_id}.request
- Support both Bytes and Stream body types
2025-12-21 15:06:56 -08:00
Gregory Schier
b03ea11509 Add blob database for storing body chunks
- Create separate blobs.sqlite database for large body storage
- Add body_chunks table with body_id, chunk_index, and data columns
- Implement BlobManager and BlobContext with core CRUD methods
- Add BlobManagerExt trait for app_handle.blobs() access
- Support streaming writes with configurable chunk sizes
2025-12-21 15:01:10 -08:00
27 changed files with 1116 additions and 253 deletions

1
.gitattributes vendored
View File

@@ -1,5 +1,6 @@
src-tauri/vendored/**/* linguist-generated=true
src-tauri/gen/schemas/**/* linguist-generated=true
**/bindings/* linguist-generated=true
# Ensure consistent line endings for test files that check exact content
src-tauri/yaak-http/tests/test.txt text eol=lf

View File

@@ -12,7 +12,7 @@ export type HttpRequest = { model: "http_request", id: string, createdAt: string
export type HttpRequestHeader = { enabled?: boolean, name: string, value: string, id?: string, };
export type HttpResponse = { model: "http_response", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, bodyPath: string | null, contentLength: number | null, elapsed: number, elapsedHeaders: number, error: string | null, headers: Array<HttpResponseHeader>, remoteAddr: string | null, status: number, statusReason: string | null, state: HttpResponseState, url: string, version: string | null, };
export type HttpResponse = { model: "http_response", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, bodyPath: string | null, contentLength: number | null, contentLengthCompressed: number | null, elapsed: number, elapsedHeaders: number, error: string | null, headers: Array<HttpResponseHeader>, remoteAddr: string | null, requestContentLength: number | null, requestHeaders: Array<HttpResponseHeader>, status: number, statusReason: string | null, state: HttpResponseState, url: string, version: string | null, };
export type HttpResponseHeader = { name: string, value: string, };

View File

@@ -1,23 +1,27 @@
use crate::error::Error::GenericError;
use crate::error::Result;
use crate::render::render_http_request;
use crate::response_err;
use log::debug;
use log::{debug, error, warn};
use reqwest_cookie_store::{CookieStore, CookieStoreMutex};
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tauri::{AppHandle, Manager, Runtime, WebviewWindow};
use tokio::fs::{File, create_dir_all};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::sync::watch::Receiver;
use tokio_util::bytes::Bytes;
use yaak_http::client::{
HttpConnectionOptions, HttpConnectionProxySetting, HttpConnectionProxySettingAuth,
};
use yaak_http::manager::HttpConnectionManager;
use yaak_http::sender::ReqwestSender;
use yaak_http::tee_reader::TeeReader;
use yaak_http::transaction::HttpTransaction;
use yaak_http::types::{SendableHttpRequest, SendableHttpRequestOptions, append_query_params};
use yaak_http::types::{
SendableBody, SendableHttpRequest, SendableHttpRequestOptions, append_query_params,
};
use yaak_models::blob_manager::{BlobManagerExt, BodyChunk};
use yaak_models::models::{
CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseEvent, HttpResponseHeader,
HttpResponseState, ProxySetting, ProxySettingAuth,
@@ -32,6 +36,55 @@ use yaak_plugins::template_callback::PluginTemplateCallback;
use yaak_templates::RenderOptions;
use yaak_tls::find_client_certificate;
/// Chunk size for storing request bodies (1MB)
const REQUEST_BODY_CHUNK_SIZE: usize = 1024 * 1024;
/// Context for managing response state during HTTP transactions.
/// Handles both persisted responses (stored in DB) and ephemeral responses (in-memory only).
struct ResponseContext<R: Runtime> {
app_handle: AppHandle<R>,
response: HttpResponse,
update_source: UpdateSource,
}
impl<R: Runtime> ResponseContext<R> {
fn new(app_handle: AppHandle<R>, response: HttpResponse, update_source: UpdateSource) -> Self {
Self { app_handle, response, update_source }
}
/// Whether this response is persisted (has a non-empty ID)
fn is_persisted(&self) -> bool {
!self.response.id.is_empty()
}
/// Update the response state. For persisted responses, fetches from DB, applies the
/// closure, and updates the DB. For ephemeral responses, just applies the closure
/// to the in-memory response.
fn update<F>(&mut self, func: F) -> Result<()>
where
F: FnOnce(&mut HttpResponse),
{
if self.is_persisted() {
let r = self.app_handle.with_tx(|tx| {
let mut r = tx.get_http_response(&self.response.id)?;
func(&mut r);
tx.update_http_response_if_id(&r, &self.update_source)?;
Ok(r)
})?;
self.response = r;
Ok(())
} else {
func(&mut self.response);
Ok(())
}
}
/// Get the current response state
fn response(&self) -> &HttpResponse {
&self.response
}
}
pub async fn send_http_request<R: Runtime>(
window: &WebviewWindow<R>,
unrendered_request: &HttpRequest,
@@ -62,25 +115,38 @@ pub async fn send_http_request_with_context<R: Runtime>(
plugin_context: &PluginContext,
) -> Result<HttpResponse> {
let app_handle = window.app_handle().clone();
let response = Arc::new(Mutex::new(og_response.clone()));
let update_source = UpdateSource::from_window(window);
let mut response_ctx =
ResponseContext::new(app_handle.clone(), og_response.clone(), update_source);
// Execute the inner send logic and handle errors consistently
let start = Instant::now();
let result = send_http_request_inner(
window,
unrendered_request,
og_response,
environment,
cookie_jar,
cancelled_rx,
plugin_context,
&mut response_ctx,
)
.await;
match result {
Ok(response) => Ok(response),
Err(e) => {
Ok(response_err(&app_handle, &*response.lock().await, e.to_string(), &update_source))
let error = e.to_string();
let elapsed = start.elapsed().as_millis() as i32;
warn!("Failed to send request: {error:?}");
let _ = response_ctx.update(|r| {
r.state = HttpResponseState::Closed;
r.elapsed = elapsed;
if r.elapsed_headers == 0 {
r.elapsed_headers = elapsed;
}
r.error = Some(error);
});
Ok(response_ctx.response().clone())
}
}
}
@@ -88,26 +154,24 @@ pub async fn send_http_request_with_context<R: Runtime>(
async fn send_http_request_inner<R: Runtime>(
window: &WebviewWindow<R>,
unrendered_request: &HttpRequest,
og_response: &HttpResponse,
environment: Option<Environment>,
cookie_jar: Option<CookieJar>,
cancelled_rx: &Receiver<bool>,
plugin_context: &PluginContext,
response_ctx: &mut ResponseContext<R>,
) -> Result<HttpResponse> {
let app_handle = window.app_handle().clone();
let plugin_manager = app_handle.state::<PluginManager>();
let connection_manager = app_handle.state::<HttpConnectionManager>();
let settings = window.db().get_settings();
let wrk_id = &unrendered_request.workspace_id;
let fld_id = unrendered_request.folder_id.as_deref();
let env_id = environment.map(|e| e.id);
let resp_id = og_response.id.clone();
let workspace = window.db().get_workspace(wrk_id)?;
let response = Arc::new(Mutex::new(og_response.clone()));
let update_source = UpdateSource::from_window(window);
let workspace_id = &unrendered_request.workspace_id;
let folder_id = unrendered_request.folder_id.as_deref();
let environment_id = environment.map(|e| e.id);
let workspace = window.db().get_workspace(workspace_id)?;
let (resolved, auth_context_id) = resolve_http_request(window, unrendered_request)?;
let cb = PluginTemplateCallback::new(window.app_handle(), &plugin_context, RenderPurpose::Send);
let env_chain = window.db().resolve_environments(&workspace.id, fld_id, env_id.as_deref())?;
let env_chain =
window.db().resolve_environments(&workspace.id, folder_id, environment_id.as_deref())?;
let request = render_http_request(&resolved, env_chain, &cb, &RenderOptions::throw()).await?;
// Build the sendable request using the new SendableHttpRequest type
@@ -195,35 +259,7 @@ async fn send_http_request_inner<R: Runtime>(
)
.await?;
let start_for_cancellation = Instant::now();
let final_resp = execute_transaction(
client,
sendable_request,
response.clone(),
&resp_id,
&app_handle,
&update_source,
cancelled_rx.clone(),
)
.await;
match final_resp {
Ok(r) => Ok(r),
Err(e) => match app_handle.db().get_http_response(&resp_id) {
Ok(mut r) => {
r.state = HttpResponseState::Closed;
r.elapsed = start_for_cancellation.elapsed().as_millis() as i32;
r.elapsed_headers = start_for_cancellation.elapsed().as_millis() as i32;
r.error = Some(e.to_string());
app_handle
.db()
.update_http_response_if_id(&r, &UpdateSource::from_window(window))
.expect("Failed to update response");
Ok(r)
}
_ => Err(GenericError("Ephemeral request was cancelled".to_string())),
},
}
execute_transaction(client, sendable_request, response_ctx, cancelled_rx.clone()).await
}
pub fn resolve_http_request<R: Runtime>(
@@ -245,13 +281,15 @@ pub fn resolve_http_request<R: Runtime>(
async fn execute_transaction<R: Runtime>(
client: reqwest::Client,
sendable_request: SendableHttpRequest,
response: Arc<Mutex<HttpResponse>>,
response_id: &String,
app_handle: &AppHandle<R>,
update_source: &UpdateSource,
mut sendable_request: SendableHttpRequest,
response_ctx: &mut ResponseContext<R>,
mut cancelled_rx: Receiver<bool>,
) -> Result<HttpResponse> {
let app_handle = &response_ctx.app_handle.clone();
let response_id = response_ctx.response().id.clone();
let workspace_id = response_ctx.response().workspace_id.clone();
let is_persisted = response_ctx.is_persisted();
let sender = ReqwestSender::with_client(client);
let transaction = HttpTransaction::new(sender);
let start = Instant::now();
@@ -263,30 +301,81 @@ async fn execute_transaction<R: Runtime>(
.map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
.collect();
{
// Update response with headers info and mark as connected
let mut r = response.lock().await;
// Update response with headers info
response_ctx.update(|r| {
r.url = sendable_request.url.clone();
r.request_headers = request_headers.clone();
app_handle.db().update_http_response_if_id(&r, &update_source)?;
}
r.request_headers = request_headers;
})?;
// Create channel for receiving events and spawn a task to store them in DB
let (event_tx, mut event_rx) =
tokio::sync::mpsc::unbounded_channel::<yaak_http::sender::HttpResponseEvent>();
// Write events to DB in a task
{
// Write events to DB in a task (only for persisted responses)
if is_persisted {
let response_id = response_id.clone();
let workspace_id = response.lock().await.workspace_id.clone();
let app_handle = app_handle.clone();
let update_source = update_source.clone();
let update_source = response_ctx.update_source.clone();
let workspace_id = workspace_id.clone();
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
let db_event = HttpResponseEvent::new(&response_id, &workspace_id, event.into());
let _ = app_handle.db().upsert(&db_event, &update_source);
let _ = app_handle.db().upsert_http_response_event(&db_event, &update_source);
}
});
} else {
// For ephemeral responses, just drain the events
tokio::spawn(async move { while event_rx.recv().await.is_some() {} });
};
// Capture request body as it's sent (only for persisted responses)
let body_id = format!("{}.request", response_id);
sendable_request.body = match sendable_request.body {
Some(SendableBody::Bytes(bytes)) => {
if is_persisted {
write_bytes_to_db_sync(response_ctx, &body_id, bytes.clone())?;
}
Some(SendableBody::Bytes(bytes))
}
Some(SendableBody::Stream(stream)) => {
// Wrap stream with TeeReader to capture data as it's read
let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
let tee_reader = TeeReader::new(stream, body_chunk_tx);
let pinned: Pin<Box<dyn AsyncRead + Send + 'static>> = Box::pin(tee_reader);
if is_persisted {
// Spawn task to write request body chunks to blob DB
let app_handle = app_handle.clone();
let response_id = response_id.clone();
let workspace_id = workspace_id.clone();
let body_id = body_id.clone();
let update_source = response_ctx.update_source.clone();
tauri::async_runtime::spawn(async move {
if let Err(e) = write_stream_chunks_to_db(
app_handle,
&body_id,
&workspace_id,
&response_id,
&update_source,
body_chunk_rx,
)
.await
{
error!("Error writing stream chunks to DB: {}", e);
};
});
} else {
// For ephemeral responses, just drain the body chunks
tauri::async_runtime::spawn(async move {
let mut rx = body_chunk_rx;
while rx.recv().await.is_some() {}
});
}
// For streams, size is determined after streaming completes
Some(SendableBody::Stream(pinned))
}
None => None,
};
// Execute the transaction with cancellation support
@@ -309,32 +398,27 @@ async fn execute_transaction<R: Runtime>(
// Extract metadata before consuming the body (headers are available immediately)
// Url might change, so update again
let headers: Vec<HttpResponseHeader> = http_response
.headers
.iter()
.map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
.collect();
{
// Update response with headers info and mark as connected
let mut r = response.lock().await;
response_ctx.update(|r| {
r.body_path = Some(body_path.to_string_lossy().to_string());
r.elapsed_headers = start.elapsed().as_millis() as i32;
r.status = http_response.status as i32;
r.status_reason = http_response.status_reason.clone().clone();
r.url = http_response.url.clone().clone();
r.status_reason = http_response.status_reason.clone();
r.url = http_response.url.clone();
r.remote_addr = http_response.remote_addr.clone();
r.version = http_response.version.clone().clone();
r.headers = headers.clone();
r.version = http_response.version.clone();
r.headers = http_response
.headers
.iter()
.map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
.collect();
r.content_length = http_response.content_length.map(|l| l as i32);
r.state = HttpResponseState::Connected;
r.request_headers = http_response
.request_headers
.iter()
.map(|(n, v)| HttpResponseHeader { name: n.clone(), value: v.clone() })
.collect();
r.state = HttpResponseState::Connected;
app_handle.db().update_http_response_if_id(&r, &update_source)?;
}
})?;
// Get the body stream for manual consumption
let mut body_stream = http_response.into_body_stream()?;
@@ -378,11 +462,11 @@ async fn execute_transaction<R: Runtime>(
.map_err(|e| GenericError(format!("Failed to flush file: {}", e)))?;
written_bytes += n;
// Update response in DB with progress
let mut r = response.lock().await;
r.elapsed = start.elapsed().as_millis() as i32; // Approx until the end
r.content_length = Some(written_bytes as i32);
app_handle.db().update_http_response_if_id(&r, &update_source)?;
// Update response with progress
response_ctx.update(|r| {
r.elapsed = start.elapsed().as_millis() as i32;
r.content_length = Some(written_bytes as i32);
})?;
}
Err(e) => {
return Err(GenericError(format!("Failed to read response body: {}", e)));
@@ -391,16 +475,106 @@ async fn execute_transaction<R: Runtime>(
}
// Final update with closed state
let mut resp = response.lock().await.clone();
resp.elapsed = start.elapsed().as_millis() as i32;
resp.state = HttpResponseState::Closed;
resp.body_path = Some(
body_path.to_str().ok_or(GenericError(format!("Invalid path {body_path:?}",)))?.to_string(),
);
response_ctx.update(|r| {
r.elapsed = start.elapsed().as_millis() as i32;
r.state = HttpResponseState::Closed;
})?;
app_handle.db().update_http_response_if_id(&resp, &update_source)?;
Ok(response_ctx.response().clone())
}
Ok(resp)
fn write_bytes_to_db_sync<R: Runtime>(
response_ctx: &mut ResponseContext<R>,
body_id: &str,
data: Bytes,
) -> Result<()> {
if data.is_empty() {
return Ok(());
}
// Write in chunks if data is large
let mut offset = 0;
let mut chunk_index = 0;
while offset < data.len() {
let end = std::cmp::min(offset + REQUEST_BODY_CHUNK_SIZE, data.len());
let chunk_data = data.slice(offset..end).to_vec();
let chunk = BodyChunk::new(body_id, chunk_index, chunk_data);
response_ctx.app_handle.blobs().insert_chunk(&chunk)?;
offset = end;
chunk_index += 1;
}
// Update the response with the total request body size
response_ctx.update(|r| {
r.request_content_length = Some(data.len() as i32);
})?;
Ok(())
}
async fn write_stream_chunks_to_db<R: Runtime>(
app_handle: AppHandle<R>,
body_id: &str,
workspace_id: &str,
response_id: &str,
update_source: &UpdateSource,
mut rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
) -> Result<()> {
let mut buffer = Vec::with_capacity(REQUEST_BODY_CHUNK_SIZE);
let mut chunk_index = 0;
let mut total_bytes: usize = 0;
while let Some(data) = rx.recv().await {
total_bytes += data.len();
buffer.extend_from_slice(&data);
// Flush when buffer reaches chunk size
while buffer.len() >= REQUEST_BODY_CHUNK_SIZE {
debug!("Writing chunk {chunk_index} to DB");
let chunk_data: Vec<u8> = buffer.drain(..REQUEST_BODY_CHUNK_SIZE).collect();
let chunk = BodyChunk::new(body_id, chunk_index, chunk_data);
app_handle.blobs().insert_chunk(&chunk)?;
app_handle.db().upsert_http_response_event(
&HttpResponseEvent::new(
response_id,
workspace_id,
yaak_http::sender::HttpResponseEvent::ChunkSent {
bytes: REQUEST_BODY_CHUNK_SIZE,
}
.into(),
),
update_source,
)?;
chunk_index += 1;
}
}
// Flush remaining data
if !buffer.is_empty() {
let chunk = BodyChunk::new(body_id, chunk_index, buffer);
debug!("Flushing remaining data {chunk_index} {}", chunk.data.len());
app_handle.blobs().insert_chunk(&chunk)?;
app_handle.db().upsert_http_response_event(
&HttpResponseEvent::new(
response_id,
workspace_id,
yaak_http::sender::HttpResponseEvent::ChunkSent { bytes: chunk.data.len() }.into(),
),
update_source,
)?;
}
// Update the response with the total request body size
app_handle.with_tx(|tx| {
debug!("Updating final body length {total_bytes}");
if let Ok(mut response) = tx.get_http_response(&response_id) {
response.request_content_length = Some(total_bytes as i32);
tx.update_http_response_if_id(&response, update_source)?;
}
Ok(())
})?;
Ok(())
}
async fn apply_authentication<R: Runtime>(

View File

@@ -32,6 +32,7 @@ use yaak_common::window::WorkspaceWindowTrait;
use yaak_grpc::manager::GrpcHandle;
use yaak_grpc::{Code, ServiceDefinition, serialize_message};
use yaak_mac_window::AppHandleMacWindowExt;
use yaak_models::blob_manager::BlobManagerExt;
use yaak_models::models::{
AnyModel, CookieJar, Environment, GrpcConnection, GrpcConnectionState, GrpcEvent,
GrpcEventType, GrpcRequest, HttpRequest, HttpResponse, HttpResponseEvent, HttpResponseState,
@@ -784,7 +785,7 @@ async fn cmd_http_response_body<R: Runtime>(
) -> YaakResult<FilterResponse> {
let body_path = match response.body_path {
None => {
return Err(GenericError("Response body path not set".to_string()));
return Ok(FilterResponse { content: String::new(), error: None });
}
Some(p) => p,
};
@@ -809,6 +810,23 @@ async fn cmd_http_response_body<R: Runtime>(
}
}
#[tauri::command]
async fn cmd_http_request_body<R: Runtime>(
app_handle: AppHandle<R>,
response_id: &str,
) -> YaakResult<Option<Vec<u8>>> {
let body_id = format!("{}.request", response_id);
let chunks = app_handle.blobs().get_chunks(&body_id)?;
if chunks.is_empty() {
return Ok(None);
}
// Concatenate all chunks
let body: Vec<u8> = chunks.into_iter().flat_map(|c| c.data).collect();
Ok(Some(body))
}
#[tauri::command]
async fn cmd_get_sse_events(file_path: &str) -> YaakResult<Vec<ServerSentEvent>> {
let body = fs::read(file_path)?;
@@ -835,9 +853,7 @@ async fn cmd_get_http_response_events<R: Runtime>(
app_handle: AppHandle<R>,
response_id: &str,
) -> YaakResult<Vec<HttpResponseEvent>> {
use yaak_models::models::HttpResponseEventIden;
let events: Vec<HttpResponseEvent> =
app_handle.db().find_many(HttpResponseEventIden::ResponseId, response_id, None)?;
let events: Vec<HttpResponseEvent> = app_handle.db().list_http_response_events(response_id)?;
Ok(events)
}
@@ -1174,23 +1190,6 @@ async fn cmd_send_http_request<R: Runtime>(
Ok(r)
}
fn response_err<R: Runtime>(
app_handle: &AppHandle<R>,
response: &HttpResponse,
error: String,
update_source: &UpdateSource,
) -> HttpResponse {
warn!("Failed to send request: {error:?}");
let mut response = response.clone();
response.state = HttpResponseState::Closed;
response.error = Some(error.clone());
response = app_handle
.db()
.update_http_response_if_id(&response, update_source)
.expect("Failed to update response");
response
}
#[tauri::command]
async fn cmd_install_plugin<R: Runtime>(
directory: &str,
@@ -1468,6 +1467,7 @@ pub fn run() {
cmd_delete_send_history,
cmd_dismiss_notification,
cmd_export_data,
cmd_http_request_body,
cmd_http_response_body,
cmd_format_json,
cmd_get_http_authentication_summaries,

View File

@@ -11,6 +11,7 @@ pub mod manager;
pub mod path_placeholders;
mod proto;
pub mod sender;
pub mod tee_reader;
pub mod transaction;
pub mod types;

View File

@@ -0,0 +1,157 @@
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use tokio::sync::mpsc;
/// A reader that forwards all read data to a channel while also returning it to the caller.
/// This allows capturing request body data as it's being sent.
pub struct TeeReader<R> {
inner: R,
tx: mpsc::UnboundedSender<Vec<u8>>,
}
impl<R> TeeReader<R> {
pub fn new(inner: R, tx: mpsc::UnboundedSender<Vec<u8>>) -> Self {
Self { inner, tx }
}
}
impl<R: AsyncRead + Unpin> AsyncRead for TeeReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let before_len = buf.filled().len();
match Pin::new(&mut self.inner).poll_read(cx, buf) {
Poll::Ready(Ok(())) => {
let after_len = buf.filled().len();
if after_len > before_len {
// Data was read, send a copy to the channel
let data = buf.filled()[before_len..after_len].to_vec();
// Ignore send errors - receiver might have been dropped
let _ = self.tx.send(data);
}
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tokio::io::AsyncReadExt;
#[tokio::test]
async fn test_tee_reader_captures_all_data() {
let data = b"Hello, World!";
let cursor = Cursor::new(data.to_vec());
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tee = TeeReader::new(cursor, tx);
let mut output = Vec::new();
tee.read_to_end(&mut output).await.unwrap();
// Verify the reader returns the correct data
assert_eq!(output, data);
// Verify the channel received the data
let mut captured = Vec::new();
while let Ok(chunk) = rx.try_recv() {
captured.extend(chunk);
}
assert_eq!(captured, data);
}
#[tokio::test]
async fn test_tee_reader_with_chunked_reads() {
let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let cursor = Cursor::new(data.to_vec());
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tee = TeeReader::new(cursor, tx);
// Read in small chunks
let mut buf = [0u8; 5];
let mut output = Vec::new();
loop {
let n = tee.read(&mut buf).await.unwrap();
if n == 0 {
break;
}
output.extend_from_slice(&buf[..n]);
}
// Verify the reader returns the correct data
assert_eq!(output, data);
// Verify the channel received all chunks
let mut captured = Vec::new();
while let Ok(chunk) = rx.try_recv() {
captured.extend(chunk);
}
assert_eq!(captured, data);
}
#[tokio::test]
async fn test_tee_reader_empty_data() {
let data: Vec<u8> = vec![];
let cursor = Cursor::new(data.clone());
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tee = TeeReader::new(cursor, tx);
let mut output = Vec::new();
tee.read_to_end(&mut output).await.unwrap();
// Verify empty output
assert!(output.is_empty());
// Verify no data was sent to channel
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_tee_reader_works_when_receiver_dropped() {
let data = b"Hello, World!";
let cursor = Cursor::new(data.to_vec());
let (tx, rx) = mpsc::unbounded_channel();
// Drop the receiver before reading
drop(rx);
let mut tee = TeeReader::new(cursor, tx);
let mut output = Vec::new();
// Should still work even though receiver is dropped
tee.read_to_end(&mut output).await.unwrap();
assert_eq!(output, data);
}
#[tokio::test]
async fn test_tee_reader_large_data() {
// Test with 1MB of data
let data: Vec<u8> = (0..1024 * 1024).map(|i| (i % 256) as u8).collect();
let cursor = Cursor::new(data.clone());
let (tx, mut rx) = mpsc::unbounded_channel();
let mut tee = TeeReader::new(cursor, tx);
let mut output = Vec::new();
tee.read_to_end(&mut output).await.unwrap();
// Verify the reader returns the correct data
assert_eq!(output, data);
// Verify the channel received all data
let mut captured = Vec::new();
while let Ok(chunk) = rx.try_recv() {
captured.extend(chunk);
}
assert_eq!(captured, data);
}
}

View File

@@ -38,7 +38,7 @@ export type HttpRequest = { model: "http_request", id: string, createdAt: string
export type HttpRequestHeader = { enabled?: boolean, name: string, value: string, id?: string, };
export type HttpResponse = { model: "http_response", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, bodyPath: string | null, contentLength: number | null, contentLengthCompressed: number | null, elapsed: number, elapsedHeaders: number, error: string | null, headers: Array<HttpResponseHeader>, remoteAddr: string | null, requestHeaders: Array<HttpResponseHeader>, status: number, statusReason: string | null, state: HttpResponseState, url: string, version: string | null, };
export type HttpResponse = { model: "http_response", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, bodyPath: string | null, contentLength: number | null, contentLengthCompressed: number | null, elapsed: number, elapsedHeaders: number, error: string | null, headers: Array<HttpResponseHeader>, remoteAddr: string | null, requestContentLength: number | null, requestHeaders: Array<HttpResponseHeader>, status: number, statusReason: string | null, state: HttpResponseState, url: string, version: string | null, };
export type HttpResponseEvent = { model: "http_response_event", id: string, createdAt: string, updatedAt: string, workspaceId: string, responseId: string, event: HttpResponseEventData, };
@@ -47,7 +47,7 @@ export type HttpResponseEvent = { model: "http_response_event", id: string, crea
* This mirrors `yaak_http::sender::HttpResponseEvent` but with serde support.
* The `From` impl is in yaak-http to avoid circular dependencies.
*/
export type HttpResponseEventData = { "type": "start_request" } | { "type": "end_request" } | { "type": "setting", name: string, value: string, } | { "type": "info", message: string, } | { "type": "redirect", url: string, status: number, behavior: string, } | { "type": "send_url", method: string, path: string, } | { "type": "receive_url", version: string, status: string, } | { "type": "header_up", name: string, value: string, } | { "type": "header_down", name: string, value: string, } | { "type": "chunk_sent", bytes: number, } | { "type": "chunk_received", bytes: number, };
export type HttpResponseEventData = { "type": "setting", name: string, value: string, } | { "type": "info", message: string, } | { "type": "redirect", url: string, status: number, behavior: string, } | { "type": "send_url", method: string, path: string, } | { "type": "receive_url", version: string, status: string, } | { "type": "header_up", name: string, value: string, } | { "type": "header_down", name: string, value: string, } | { "type": "chunk_sent", bytes: number, } | { "type": "chunk_received", bytes: number, };
export type HttpResponseHeader = { name: string, value: string, };

View File

@@ -0,0 +1,12 @@
CREATE TABLE body_chunks
(
id TEXT PRIMARY KEY,
body_id TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
data BLOB NOT NULL,
created_at DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL,
UNIQUE (body_id, chunk_index)
);
CREATE INDEX idx_body_chunks_body_id ON body_chunks (body_id, chunk_index);

View File

@@ -0,0 +1,2 @@
ALTER TABLE http_responses
ADD COLUMN request_content_length INTEGER;

View File

@@ -0,0 +1,372 @@
use crate::error::Result;
use crate::util::generate_prefixed_id;
use include_dir::{Dir, include_dir};
use log::{debug, info};
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{OptionalExtension, params};
use std::sync::{Arc, Mutex};
use tauri::{Manager, Runtime, State};
static BLOB_MIGRATIONS_DIR: Dir = include_dir!("$CARGO_MANIFEST_DIR/blob_migrations");
/// A chunk of body data stored in the blob database.
#[derive(Debug, Clone)]
pub struct BodyChunk {
pub id: String,
pub body_id: String,
pub chunk_index: i32,
pub data: Vec<u8>,
}
impl BodyChunk {
pub fn new(body_id: impl Into<String>, chunk_index: i32, data: Vec<u8>) -> Self {
Self { id: generate_prefixed_id("bc"), body_id: body_id.into(), chunk_index, data }
}
}
/// Extension trait for accessing the blob manager from app handle.
pub trait BlobManagerExt<'a, R> {
fn blob_manager(&'a self) -> State<'a, BlobManager>;
fn blobs(&'a self) -> BlobContext;
}
impl<'a, R: Runtime, M: Manager<R>> BlobManagerExt<'a, R> for M {
fn blob_manager(&'a self) -> State<'a, BlobManager> {
self.state::<BlobManager>()
}
fn blobs(&'a self) -> BlobContext {
let manager = self.state::<BlobManager>();
manager.inner().connect()
}
}
/// Manages the blob database connection pool.
#[derive(Debug, Clone)]
pub struct BlobManager {
pool: Arc<Mutex<Pool<SqliteConnectionManager>>>,
}
impl BlobManager {
pub fn new(pool: Pool<SqliteConnectionManager>) -> Self {
Self { pool: Arc::new(Mutex::new(pool)) }
}
pub fn connect(&self) -> BlobContext {
let conn = self
.pool
.lock()
.expect("Failed to gain lock on blob DB")
.get()
.expect("Failed to get blob DB connection from pool");
BlobContext { conn }
}
}
/// Context for blob database operations.
pub struct BlobContext {
conn: r2d2::PooledConnection<SqliteConnectionManager>,
}
impl BlobContext {
/// Insert a single chunk.
pub fn insert_chunk(&self, chunk: &BodyChunk) -> Result<()> {
self.conn.execute(
"INSERT INTO body_chunks (id, body_id, chunk_index, data) VALUES (?1, ?2, ?3, ?4)",
params![chunk.id, chunk.body_id, chunk.chunk_index, chunk.data],
)?;
Ok(())
}
/// Get all chunks for a body, ordered by chunk_index.
pub fn get_chunks(&self, body_id: &str) -> Result<Vec<BodyChunk>> {
let mut stmt = self.conn.prepare(
"SELECT id, body_id, chunk_index, data FROM body_chunks
WHERE body_id = ?1 ORDER BY chunk_index ASC",
)?;
let chunks = stmt
.query_map(params![body_id], |row| {
Ok(BodyChunk {
id: row.get(0)?,
body_id: row.get(1)?,
chunk_index: row.get(2)?,
data: row.get(3)?,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(chunks)
}
/// Delete all chunks for a body.
pub fn delete_chunks(&self, body_id: &str) -> Result<()> {
self.conn.execute("DELETE FROM body_chunks WHERE body_id = ?1", params![body_id])?;
Ok(())
}
/// Delete all chunks matching a body_id prefix (e.g., "rs_abc123.%" to delete all bodies for a response).
pub fn delete_chunks_like(&self, body_id_prefix: &str) -> Result<()> {
self.conn
.execute("DELETE FROM body_chunks WHERE body_id LIKE ?1", params![body_id_prefix])?;
Ok(())
}
}
/// Get total size of a body without loading data.
impl BlobContext {
pub fn get_body_size(&self, body_id: &str) -> Result<usize> {
let size: i64 = self
.conn
.query_row(
"SELECT COALESCE(SUM(LENGTH(data)), 0) FROM body_chunks WHERE body_id = ?1",
params![body_id],
|row| row.get(0),
)
.unwrap_or(0);
Ok(size as usize)
}
/// Check if a body exists.
pub fn body_exists(&self, body_id: &str) -> Result<bool> {
let count: i64 = self
.conn
.query_row(
"SELECT COUNT(*) FROM body_chunks WHERE body_id = ?1",
params![body_id],
|row| row.get(0),
)
.unwrap_or(0);
Ok(count > 0)
}
}
/// Run migrations for the blob database.
pub fn migrate_blob_db(pool: &Pool<SqliteConnectionManager>) -> Result<()> {
info!("Running blob database migrations");
// Create migrations tracking table
pool.get()?.execute(
"CREATE TABLE IF NOT EXISTS _blob_migrations (
version TEXT PRIMARY KEY,
description TEXT NOT NULL,
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL
)",
[],
)?;
// Read and sort all .sql files
let mut entries: Vec<_> = BLOB_MIGRATIONS_DIR
.entries()
.iter()
.filter(|e| e.path().extension().map(|ext| ext == "sql").unwrap_or(false))
.collect();
entries.sort_by_key(|e| e.path());
let mut ran_migrations = 0;
for entry in &entries {
let filename = entry.path().file_name().unwrap().to_str().unwrap();
let version = filename.split('_').next().unwrap();
// Check if already applied
let already_applied: Option<i64> = pool
.get()?
.query_row("SELECT 1 FROM _blob_migrations WHERE version = ?", [version], |r| r.get(0))
.optional()?;
if already_applied.is_some() {
debug!("Skipping already applied blob migration: {}", filename);
continue;
}
let sql =
entry.as_file().unwrap().contents_utf8().expect("Failed to read blob migration file");
info!("Applying blob migration: {}", filename);
let conn = pool.get()?;
conn.execute_batch(sql)?;
// Record migration
conn.execute(
"INSERT INTO _blob_migrations (version, description) VALUES (?, ?)",
params![version, filename],
)?;
ran_migrations += 1;
}
if ran_migrations == 0 {
info!("No blob migrations to run");
} else {
info!("Ran {} blob migration(s)", ran_migrations);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_pool() -> Pool<SqliteConnectionManager> {
let manager = SqliteConnectionManager::memory();
let pool = Pool::builder().max_size(1).build(manager).unwrap();
migrate_blob_db(&pool).unwrap();
pool
}
#[test]
fn test_insert_and_get_chunks() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
let body_id = "rs_test123.request";
let chunk1 = BodyChunk::new(body_id, 0, b"Hello, ".to_vec());
let chunk2 = BodyChunk::new(body_id, 1, b"World!".to_vec());
ctx.insert_chunk(&chunk1).unwrap();
ctx.insert_chunk(&chunk2).unwrap();
let chunks = ctx.get_chunks(body_id).unwrap();
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0].chunk_index, 0);
assert_eq!(chunks[0].data, b"Hello, ");
assert_eq!(chunks[1].chunk_index, 1);
assert_eq!(chunks[1].data, b"World!");
}
#[test]
fn test_get_chunks_ordered_by_index() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
let body_id = "rs_test123.request";
// Insert out of order
ctx.insert_chunk(&BodyChunk::new(body_id, 2, b"C".to_vec())).unwrap();
ctx.insert_chunk(&BodyChunk::new(body_id, 0, b"A".to_vec())).unwrap();
ctx.insert_chunk(&BodyChunk::new(body_id, 1, b"B".to_vec())).unwrap();
let chunks = ctx.get_chunks(body_id).unwrap();
assert_eq!(chunks.len(), 3);
assert_eq!(chunks[0].data, b"A");
assert_eq!(chunks[1].data, b"B");
assert_eq!(chunks[2].data, b"C");
}
#[test]
fn test_delete_chunks() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
let body_id = "rs_test123.request";
ctx.insert_chunk(&BodyChunk::new(body_id, 0, b"data".to_vec())).unwrap();
assert!(ctx.body_exists(body_id).unwrap());
ctx.delete_chunks(body_id).unwrap();
assert!(!ctx.body_exists(body_id).unwrap());
assert_eq!(ctx.get_chunks(body_id).unwrap().len(), 0);
}
#[test]
fn test_delete_chunks_like() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
// Insert chunks for same response but different body types
ctx.insert_chunk(&BodyChunk::new("rs_abc.request", 0, b"req".to_vec())).unwrap();
ctx.insert_chunk(&BodyChunk::new("rs_abc.response", 0, b"resp".to_vec())).unwrap();
ctx.insert_chunk(&BodyChunk::new("rs_other.request", 0, b"other".to_vec())).unwrap();
// Delete all bodies for rs_abc
ctx.delete_chunks_like("rs_abc.%").unwrap();
// rs_abc bodies should be gone
assert!(!ctx.body_exists("rs_abc.request").unwrap());
assert!(!ctx.body_exists("rs_abc.response").unwrap());
// rs_other should still exist
assert!(ctx.body_exists("rs_other.request").unwrap());
}
#[test]
fn test_get_body_size() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
let body_id = "rs_test123.request";
ctx.insert_chunk(&BodyChunk::new(body_id, 0, b"Hello".to_vec())).unwrap();
ctx.insert_chunk(&BodyChunk::new(body_id, 1, b"World".to_vec())).unwrap();
let size = ctx.get_body_size(body_id).unwrap();
assert_eq!(size, 10); // "Hello" + "World" = 10 bytes
}
#[test]
fn test_get_body_size_empty() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
let size = ctx.get_body_size("nonexistent").unwrap();
assert_eq!(size, 0);
}
#[test]
fn test_body_exists() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
assert!(!ctx.body_exists("rs_test.request").unwrap());
ctx.insert_chunk(&BodyChunk::new("rs_test.request", 0, b"data".to_vec())).unwrap();
assert!(ctx.body_exists("rs_test.request").unwrap());
}
#[test]
fn test_multiple_bodies_isolated() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
ctx.insert_chunk(&BodyChunk::new("body1", 0, b"data1".to_vec())).unwrap();
ctx.insert_chunk(&BodyChunk::new("body2", 0, b"data2".to_vec())).unwrap();
let chunks1 = ctx.get_chunks("body1").unwrap();
let chunks2 = ctx.get_chunks("body2").unwrap();
assert_eq!(chunks1.len(), 1);
assert_eq!(chunks1[0].data, b"data1");
assert_eq!(chunks2.len(), 1);
assert_eq!(chunks2[0].data, b"data2");
}
#[test]
fn test_large_chunk() {
let pool = create_test_pool();
let manager = BlobManager::new(pool);
let ctx = manager.connect();
// 1MB chunk
let large_data: Vec<u8> = (0..1024 * 1024).map(|i| (i % 256) as u8).collect();
let body_id = "rs_large.request";
ctx.insert_chunk(&BodyChunk::new(body_id, 0, large_data.clone())).unwrap();
let chunks = ctx.get_chunks(body_id).unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].data, large_data);
assert_eq!(ctx.get_body_size(body_id).unwrap(), 1024 * 1024);
}
}

View File

@@ -67,7 +67,7 @@ impl<'a> DbContext<'a> {
.expect("Failed to run find on DB")
}
pub fn find_all<'s, M>(&self) -> Result<Vec<M>>
pub(crate) fn find_all<'s, M>(&self) -> Result<Vec<M>>
where
M: Into<AnyModel> + Clone + UpsertModelInfo,
{
@@ -82,7 +82,7 @@ impl<'a> DbContext<'a> {
Ok(items.map(|v| v.unwrap()).collect())
}
pub fn find_many<'s, M>(
pub(crate) fn find_many<'s, M>(
&self,
col: impl IntoColumnRef,
value: impl Into<SimpleExpr>,
@@ -115,7 +115,7 @@ impl<'a> DbContext<'a> {
Ok(items.map(|v| v.unwrap()).collect())
}
pub fn upsert<M>(&self, model: &M, source: &UpdateSource) -> Result<M>
pub(crate) fn upsert<M>(&self, model: &M, source: &UpdateSource) -> Result<M>
where
M: Into<AnyModel> + From<AnyModel> + UpsertModelInfo + Clone,
{

View File

@@ -1,3 +1,4 @@
use crate::blob_manager::{BlobManager, migrate_blob_db};
use crate::commands::*;
use crate::migrate::migrate_db;
use crate::query_manager::QueryManager;
@@ -14,6 +15,7 @@ use tauri_plugin_dialog::{DialogExt, MessageDialogKind};
mod commands;
pub mod blob_manager;
mod connection_or_tx;
pub mod db_context;
pub mod error;
@@ -50,7 +52,9 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
create_dir_all(app_path.clone()).expect("Problem creating App directory!");
let db_file_path = app_path.join("db.sqlite");
let blob_db_file_path = app_path.join("blobs.sqlite");
// Main database pool
let manager = SqliteConnectionManager::file(db_file_path);
let pool = Pool::builder()
.max_size(100) // Up from 10 (just in case)
@@ -68,7 +72,26 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
return Err(Box::from(e.to_string()));
};
// Blob database pool
let blob_manager = SqliteConnectionManager::file(blob_db_file_path);
let blob_pool = Pool::builder()
.max_size(50)
.connection_timeout(Duration::from_secs(10))
.build(blob_manager)
.unwrap();
if let Err(e) = migrate_blob_db(&blob_pool) {
error!("Failed to run blob database migration {e:?}");
app_handle
.dialog()
.message(e.to_string())
.kind(MessageDialogKind::Error)
.blocking_show();
return Err(Box::from(e.to_string()));
};
app_handle.manage(SqliteConnection::new(pool.clone()));
app_handle.manage(BlobManager::new(blob_pool));
{
let (tx, rx) = mpsc::channel();

View File

@@ -1329,6 +1329,7 @@ pub struct HttpResponse {
pub error: Option<String>,
pub headers: Vec<HttpResponseHeader>,
pub remote_addr: Option<String>,
pub request_content_length: Option<i32>,
pub request_headers: Vec<HttpResponseHeader>,
pub status: i32,
pub status_reason: Option<String>,
@@ -1382,6 +1383,7 @@ impl UpsertModelInfo for HttpResponse {
(StatusReason, self.status_reason.into()),
(Url, self.url.into()),
(Version, self.version.into()),
(RequestContentLength, self.request_content_length.into()),
])
}
@@ -1396,6 +1398,7 @@ impl UpsertModelInfo for HttpResponse {
HttpResponseIden::Error,
HttpResponseIden::Headers,
HttpResponseIden::RemoteAddr,
HttpResponseIden::RequestContentLength,
HttpResponseIden::RequestHeaders,
HttpResponseIden::State,
HttpResponseIden::Status,
@@ -1431,6 +1434,7 @@ impl UpsertModelInfo for HttpResponse {
state: serde_json::from_str(format!(r#""{state}""#).as_str()).unwrap(),
body_path: r.get("body_path")?,
headers: serde_json::from_str(headers.as_str()).unwrap_or_default(),
request_content_length: r.get("request_content_length").unwrap_or_default(),
request_headers: serde_json::from_str(
r.get::<_, String>("request_headers").unwrap_or_default().as_str(),
)

View File

@@ -0,0 +1,18 @@
use crate::db_context::DbContext;
use crate::error::Result;
use crate::models::{HttpResponseEvent, HttpResponseEventIden};
use crate::util::UpdateSource;
impl<'a> DbContext<'a> {
pub fn list_http_response_events(&self, response_id: &str) -> Result<Vec<HttpResponseEvent>> {
self.find_many(HttpResponseEventIden::ResponseId, response_id, None)
}
pub fn upsert_http_response_event(
&self,
http_response_event: &HttpResponseEvent,
source: &UpdateSource,
) -> Result<HttpResponseEvent> {
self.upsert(http_response_event, source)
}
}

View File

@@ -8,6 +8,7 @@ mod grpc_connections;
mod grpc_events;
mod grpc_requests;
mod http_requests;
mod http_response_events;
mod http_responses;
mod key_values;
mod plugin_key_values;

View File

@@ -12,7 +12,7 @@ export type HttpRequest = { model: "http_request", id: string, createdAt: string
export type HttpRequestHeader = { enabled?: boolean, name: string, value: string, id?: string, };
export type HttpResponse = { model: "http_response", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, bodyPath: string | null, contentLength: number | null, contentLengthCompressed: number | null, elapsed: number, elapsedHeaders: number, error: string | null, headers: Array<HttpResponseHeader>, remoteAddr: string | null, requestHeaders: Array<HttpResponseHeader>, status: number, statusReason: string | null, state: HttpResponseState, url: string, version: string | null, };
export type HttpResponse = { model: "http_response", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, bodyPath: string | null, contentLength: number | null, contentLengthCompressed: number | null, elapsed: number, elapsedHeaders: number, error: string | null, headers: Array<HttpResponseHeader>, remoteAddr: string | null, requestContentLength: number | null, requestHeaders: Array<HttpResponseHeader>, status: number, statusReason: string | null, state: HttpResponseState, url: string, version: string | null, };
export type HttpResponseHeader = { name: string, value: string, };

View File

@@ -1,5 +1,5 @@
/* tslint:disable */
/* eslint-disable */
export function unescape_template(template: string): any;
export function parse_template(template: string): any;
export function escape_template(template: string): any;
export function parse_template(template: string): any;

View File

@@ -1,4 +1,5 @@
import * as wasm from "./yaak_templates_bg.wasm";
export * from "./yaak_templates_bg.js";
import { __wbg_set_wasm } from "./yaak_templates_bg.js";
__wbg_set_wasm(wasm);
__wbg_set_wasm(wasm);
wasm.__wbindgen_start();

View File

@@ -4,35 +4,6 @@ export function __wbg_set_wasm(val) {
}
const heap = new Array(128).fill(undefined);
heap.push(undefined, null, true, false);
let heap_next = heap.length;
function addHeapObject(obj) {
if (heap_next === heap.length) heap.push(heap.length + 1);
const idx = heap_next;
heap_next = heap[idx];
heap[idx] = obj;
return idx;
}
function getObject(idx) { return heap[idx]; }
function dropObject(idx) {
if (idx < 132) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
function debugString(val) {
// primitive types
const type = typeof val;
@@ -184,48 +155,24 @@ function getStringFromWasm0(ptr, len) {
ptr = ptr >>> 0;
return cachedTextDecoder.decode(getUint8ArrayMemory0().subarray(ptr, ptr + len));
}
function takeFromExternrefTable0(idx) {
const value = wasm.__wbindgen_export_2.get(idx);
wasm.__externref_table_dealloc(idx);
return value;
}
/**
* @param {string} template
* @returns {any}
*/
export function unescape_template(template) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passStringToWasm0(template, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.unescape_template(retptr, ptr0, len0);
var r0 = getDataViewMemory0().getInt32(retptr + 4 * 0, true);
var r1 = getDataViewMemory0().getInt32(retptr + 4 * 1, true);
var r2 = getDataViewMemory0().getInt32(retptr + 4 * 2, true);
if (r2) {
throw takeObject(r1);
}
return takeObject(r0);
} finally {
wasm.__wbindgen_add_to_stack_pointer(16);
}
}
/**
* @param {string} template
* @returns {any}
*/
export function parse_template(template) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passStringToWasm0(template, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.parse_template(retptr, ptr0, len0);
var r0 = getDataViewMemory0().getInt32(retptr + 4 * 0, true);
var r1 = getDataViewMemory0().getInt32(retptr + 4 * 1, true);
var r2 = getDataViewMemory0().getInt32(retptr + 4 * 2, true);
if (r2) {
throw takeObject(r1);
}
return takeObject(r0);
} finally {
wasm.__wbindgen_add_to_stack_pointer(16);
const ptr0 = passStringToWasm0(template, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const len0 = WASM_VECTOR_LEN;
const ret = wasm.unescape_template(ptr0, len0);
if (ret[2]) {
throw takeFromExternrefTable0(ret[1]);
}
return takeFromExternrefTable0(ret[0]);
}
/**
@@ -233,61 +180,69 @@ export function parse_template(template) {
* @returns {any}
*/
export function escape_template(template) {
try {
const retptr = wasm.__wbindgen_add_to_stack_pointer(-16);
const ptr0 = passStringToWasm0(template, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.escape_template(retptr, ptr0, len0);
var r0 = getDataViewMemory0().getInt32(retptr + 4 * 0, true);
var r1 = getDataViewMemory0().getInt32(retptr + 4 * 1, true);
var r2 = getDataViewMemory0().getInt32(retptr + 4 * 2, true);
if (r2) {
throw takeObject(r1);
}
return takeObject(r0);
} finally {
wasm.__wbindgen_add_to_stack_pointer(16);
const ptr0 = passStringToWasm0(template, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const len0 = WASM_VECTOR_LEN;
const ret = wasm.escape_template(ptr0, len0);
if (ret[2]) {
throw takeFromExternrefTable0(ret[1]);
}
return takeFromExternrefTable0(ret[0]);
}
/**
* @param {string} template
* @returns {any}
*/
export function parse_template(template) {
const ptr0 = passStringToWasm0(template, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const len0 = WASM_VECTOR_LEN;
const ret = wasm.parse_template(ptr0, len0);
if (ret[2]) {
throw takeFromExternrefTable0(ret[1]);
}
return takeFromExternrefTable0(ret[0]);
}
export function __wbg_new_405e22f390576ce2() {
const ret = new Object();
return addHeapObject(ret);
return ret;
};
export function __wbg_new_78feb108b6472713() {
const ret = new Array();
return addHeapObject(ret);
return ret;
};
export function __wbg_set_37837023f3d740e8(arg0, arg1, arg2) {
getObject(arg0)[arg1 >>> 0] = takeObject(arg2);
arg0[arg1 >>> 0] = arg2;
};
export function __wbg_set_3f1d0b984ed272ed(arg0, arg1, arg2) {
getObject(arg0)[takeObject(arg1)] = takeObject(arg2);
arg0[arg1] = arg2;
};
export function __wbindgen_debug_string(arg0, arg1) {
const ret = debugString(getObject(arg1));
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const ret = debugString(arg1);
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const len1 = WASM_VECTOR_LEN;
getDataViewMemory0().setInt32(arg0 + 4 * 1, len1, true);
getDataViewMemory0().setInt32(arg0 + 4 * 0, ptr1, true);
};
export function __wbindgen_object_clone_ref(arg0) {
const ret = getObject(arg0);
return addHeapObject(ret);
};
export function __wbindgen_object_drop_ref(arg0) {
takeObject(arg0);
export function __wbindgen_init_externref_table() {
const table = wasm.__wbindgen_export_2;
const offset = table.grow(4);
table.set(0, undefined);
table.set(offset + 0, undefined);
table.set(offset + 1, null);
table.set(offset + 2, true);
table.set(offset + 3, false);
;
};
export function __wbindgen_string_new(arg0, arg1) {
const ret = getStringFromWasm0(arg0, arg1);
return addHeapObject(ret);
return ret;
};
export function __wbindgen_throw(arg0, arg1) {

View File

Binary file not shown.

View File

@@ -1,9 +1,11 @@
/* tslint:disable */
/* eslint-disable */
export const memory: WebAssembly.Memory;
export const escape_template: (a: number, b: number, c: number) => void;
export const parse_template: (a: number, b: number, c: number) => void;
export const unescape_template: (a: number, b: number, c: number) => void;
export const __wbindgen_export_0: (a: number, b: number) => number;
export const __wbindgen_export_1: (a: number, b: number, c: number, d: number) => number;
export const __wbindgen_add_to_stack_pointer: (a: number) => number;
export const escape_template: (a: number, b: number) => [number, number, number];
export const parse_template: (a: number, b: number) => [number, number, number];
export const unescape_template: (a: number, b: number) => [number, number, number];
export const __wbindgen_malloc: (a: number, b: number) => number;
export const __wbindgen_realloc: (a: number, b: number, c: number, d: number) => number;
export const __wbindgen_export_2: WebAssembly.Table;
export const __externref_table_dealloc: (a: number) => void;
export const __wbindgen_start: () => void;

View File

@@ -0,0 +1,58 @@
import type { HttpResponse } from '@yaakapp-internal/models';
import { type ReactNode, useMemo } from 'react';
import { getRequestBodyText as getHttpResponseRequestBodyText } from '../hooks/useHttpRequestBody';
import { useToggle } from '../hooks/useToggle';
import { isProbablyTextContentType } from '../lib/contentType';
import { getContentTypeFromHeaders } from '../lib/model_util';
import { CopyButton } from './CopyButton';
import { Banner } from './core/Banner';
import { Button } from './core/Button';
import { InlineCode } from './core/InlineCode';
import { SizeTag } from './core/SizeTag';
import { HStack } from './core/Stacks';
interface Props {
children: ReactNode;
response: HttpResponse;
}
const LARGE_BYTES = 2 * 1000 * 1000;
export function ConfirmLargeResponseRequest({ children, response }: Props) {
const [showLargeResponse, toggleShowLargeResponse] = useToggle();
const isProbablyText = useMemo(() => {
const contentType = getContentTypeFromHeaders(response.headers);
return isProbablyTextContentType(contentType);
}, [response.headers]);
const contentLength = response.requestContentLength ?? 0;
const isLarge = contentLength > LARGE_BYTES;
if (!showLargeResponse && isLarge) {
return (
<Banner color="primary" className="flex flex-col gap-3">
<p>
Showing content over{' '}
<InlineCode>
<SizeTag contentLength={LARGE_BYTES} />
</InlineCode>{' '}
may impact performance
</p>
<HStack wrap space={2}>
<Button color="primary" size="xs" onClick={toggleShowLargeResponse}>
Reveal Request Body
</Button>
{isProbablyText && (
<CopyButton
color="secondary"
variant="border"
size="xs"
text={() => getHttpResponseRequestBodyText(response).then((d) => d?.bodyText ?? '')}
/>
)}
</HStack>
</Banner>
);
}
return <>{children}</>;
}

View File

@@ -24,7 +24,8 @@ import { TabContent, Tabs } from './core/Tabs/Tabs';
import { EmptyStateText } from './EmptyStateText';
import { ErrorBoundary } from './ErrorBoundary';
import { RecentHttpResponsesDropdown } from './RecentHttpResponsesDropdown';
import { ResponseEvents } from './ResponseEvents';
import { RequestBodyViewer } from './RequestBodyViewer';
import { HttpResponseTimeline } from './HttpResponseTimeline';
import { ResponseHeaders } from './ResponseHeaders';
import { ResponseInfo } from './ResponseInfo';
import { AudioViewer } from './responseViewers/AudioViewer';
@@ -34,6 +35,7 @@ import { HTMLOrTextViewer } from './responseViewers/HTMLOrTextViewer';
import { ImageViewer } from './responseViewers/ImageViewer';
import { SvgViewer } from './responseViewers/SvgViewer';
import { VideoViewer } from './responseViewers/VideoViewer';
import { ConfirmLargeResponseRequest } from './ConfirmLargeResponseRequest';
const PdfViewer = lazy(() =>
import('./responseViewers/PdfViewer').then((m) => ({ default: m.PdfViewer })),
@@ -46,9 +48,10 @@ interface Props {
}
const TAB_BODY = 'body';
const TAB_REQUEST = 'request';
const TAB_HEADERS = 'headers';
const TAB_INFO = 'info';
const TAB_EVENTS = 'events';
const TAB_TIMELINE = 'timeline';
export function HttpResponsePane({ style, className, activeRequestId }: Props) {
const { activeResponse, setPinnedResponseId, responses } = usePinnedHttpResponse(activeRequestId);
@@ -76,6 +79,12 @@ export function HttpResponsePane({ style, className, activeRequestId }: Props) {
],
},
},
{
value: TAB_REQUEST,
label: 'Request',
rightSlot:
(activeResponse?.requestContentLength ?? 0) > 0 ? <CountBadge count={true} /> : null,
},
{
value: TAB_HEADERS,
label: 'Headers',
@@ -87,7 +96,7 @@ export function HttpResponsePane({ style, className, activeRequestId }: Props) {
),
},
{
value: TAB_EVENTS,
value: TAB_TIMELINE,
label: 'Timeline',
rightSlot: <CountBadge count={responseEvents.data?.length ?? 0} />,
},
@@ -98,11 +107,12 @@ export function HttpResponsePane({ style, className, activeRequestId }: Props) {
],
[
activeResponse?.headers,
activeResponse?.requestContentLength,
activeResponse?.requestHeaders.length,
mimeType,
responseEvents.data?.length,
setViewMode,
viewMode,
activeResponse?.requestHeaders.length,
responseEvents.data?.length,
],
);
const activeTab = activeTabs?.[activeRequestId];
@@ -200,8 +210,8 @@ export function HttpResponsePane({ style, className, activeRequestId }: Props) {
</VStack>
</EmptyStateText>
) : activeResponse.state === 'closed' &&
activeResponse.contentLength === 0 ? (
<EmptyStateText>Empty </EmptyStateText>
(activeResponse.contentLength ?? 0) === 0 ? (
<EmptyStateText>Empty</EmptyStateText>
) : mimeType?.match(/^text\/event-stream/i) && viewMode === 'pretty' ? (
<EventStreamViewer response={activeResponse} />
) : mimeType?.match(/^image\/svg/) ? (
@@ -227,14 +237,19 @@ export function HttpResponsePane({ style, className, activeRequestId }: Props) {
</Suspense>
</ErrorBoundary>
</TabContent>
<TabContent value={TAB_REQUEST}>
<ConfirmLargeResponseRequest response={activeResponse}>
<RequestBodyViewer response={activeResponse} />
</ConfirmLargeResponseRequest>
</TabContent>
<TabContent value={TAB_HEADERS}>
<ResponseHeaders response={activeResponse} />
</TabContent>
<TabContent value={TAB_INFO}>
<ResponseInfo response={activeResponse} />
</TabContent>
<TabContent value={TAB_EVENTS}>
<ResponseEvents response={activeResponse} />
<TabContent value={TAB_TIMELINE}>
<HttpResponseTimeline response={activeResponse} />
</TabContent>
</Tabs>
</div>

View File

@@ -5,7 +5,7 @@ import type {
} from '@yaakapp-internal/models';
import classNames from 'classnames';
import { format } from 'date-fns';
import { Fragment, type ReactNode, useMemo, useState } from 'react';
import { type ReactNode, useMemo, useState } from 'react';
import { useHttpResponseEvents } from '../hooks/useHttpResponseEvents';
import { AutoScroller } from './core/AutoScroller';
import { Banner } from './core/Banner';
@@ -20,15 +20,11 @@ interface Props {
response: HttpResponse;
}
export function ResponseEvents({ response }: Props) {
return (
<Fragment key={response.id}>
<ActualResponseEvents response={response} />
</Fragment>
);
export function HttpResponseTimeline({ response }: Props) {
return <Inner key={response.id} response={response} />;
}
function ActualResponseEvents({ response }: Props) {
function Inner({ response }: Props) {
const [activeEventIndex, setActiveEventIndex] = useState<number | null>(null);
const { data: events, error, isLoading } = useHttpResponseEvents(response);
@@ -252,20 +248,6 @@ type EventDisplay = {
function getEventDisplay(event: HttpResponseEventData): EventDisplay {
switch (event.type) {
case 'start_request':
return {
icon: 'info',
color: 'secondary',
label: 'Start',
summary: 'Request started',
};
case 'end_request':
return {
icon: 'info',
color: 'secondary',
label: 'End',
summary: 'Request complete',
};
case 'setting':
return {
icon: 'settings',
@@ -321,14 +303,14 @@ function getEventDisplay(event: HttpResponseEventData): EventDisplay {
icon: 'info',
color: 'secondary',
label: 'Chunk',
summary: `${event.bytes} bytes sent`,
summary: `${formatBytes(event.bytes)} chunk sent`,
};
case 'chunk_received':
return {
icon: 'info',
color: 'secondary',
label: 'Chunk',
summary: `${event.bytes} bytes received`,
summary: `${formatBytes(event.bytes)} chunk received`,
};
default:
return {

View File

@@ -0,0 +1,52 @@
import type { HttpResponse } from '@yaakapp-internal/models';
import { useHttpRequestBody } from '../hooks/useHttpRequestBody';
import { languageFromContentType } from '../lib/contentType';
import { EmptyStateText } from './EmptyStateText';
import { Editor } from './core/Editor/LazyEditor';
import { LoadingIcon } from './core/LoadingIcon';
interface Props {
response: HttpResponse;
}
export function RequestBodyViewer({ response }: Props) {
return <RequestBodyViewerInner key={response.id} response={response} />;
}
function RequestBodyViewerInner({ response }: Props) {
const { data, isLoading, error } = useHttpRequestBody(response);
if (isLoading) {
return (
<EmptyStateText>
<LoadingIcon />
</EmptyStateText>
);
}
if (error) {
return <EmptyStateText>Error loading request body: {error.message}</EmptyStateText>;
}
if (data?.bodyText == null || data.bodyText.length === 0) {
return <EmptyStateText>No request body</EmptyStateText>;
}
const { bodyText } = data;
// Try to detect language from content-type header that was sent
const contentTypeHeader = response.requestHeaders.find(
(h) => h.name.toLowerCase() === 'content-type',
);
const contentType = contentTypeHeader?.value ?? null;
const language = languageFromContentType(contentType, bodyText);
return (
<Editor
readOnly
defaultValue={bodyText}
language={language}
stateKey={`request.body.${response.id}`}
/>
);
}

View File

@@ -0,0 +1,32 @@
import { useQuery } from '@tanstack/react-query';
import type { HttpResponse } from '@yaakapp-internal/models';
import { invokeCmd } from '../lib/tauri';
export function useHttpRequestBody(response: HttpResponse | null) {
return useQuery({
placeholderData: (prev) => prev, // Keep previous data on refetch
queryKey: ['request_body', response?.id, response?.state, response?.requestContentLength],
enabled: (response?.requestContentLength ?? 0) > 0,
queryFn: async () => {
return getRequestBodyText(response);
},
});
}
export async function getRequestBodyText(response: HttpResponse | null) {
if (response?.id == null) {
return null;
}
const data = await invokeCmd<number[] | null>('cmd_http_request_body', {
responseId: response.id,
});
if (data == null) {
return null;
}
const body = new Uint8Array(data);
const bodyText = new TextDecoder('utf-8', { fatal: false }).decode(body);
return { body, bodyText };
}

View File

@@ -25,6 +25,7 @@ type TauriCmd =
| 'cmd_grpc_reflect'
| 'cmd_grpc_request_actions'
| 'cmd_http_request_actions'
| 'cmd_http_request_body'
| 'cmd_http_response_body'
| 'cmd_import_data'
| 'cmd_install_plugin'