chore(workers): improve logging, set header timeouts (#9171)

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-03-30 17:26:55 +02:00
committed by GitHub
parent 520e1ce3cd
commit dd3376e0a9
4 changed files with 242 additions and 21 deletions

View File

@@ -476,9 +476,22 @@ func (s *backendSupervisor) startBackend(backend, backendPath string) (string, e
return clientAddr, nil
}
cancel()
// Check if the process died (e.g. OOM, CUDA error, missing libs)
if !proc.IsAlive() {
stderrTail := readLastLinesFromFile(proc.StderrPath(), 20)
xlog.Warn("Backend process died during startup", "backend", backend, "stderr", stderrTail)
s.mu.Lock()
delete(s.processes, backend)
s.freePorts = append(s.freePorts, port)
s.mu.Unlock()
return "", fmt.Errorf("backend process %s died during startup. Last stderr:\n%s", backend, stderrTail)
}
}
xlog.Warn("Backend gRPC server not ready after waiting, proceeding anyway", "backend", backend, "addr", clientAddr)
// Log stderr to help diagnose why the backend isn't responding
stderrTail := readLastLinesFromFile(proc.StderrPath(), 20)
xlog.Warn("Backend gRPC server not ready after waiting, proceeding anyway", "backend", backend, "addr", clientAddr, "stderr", stderrTail)
return clientAddr, nil
}
@@ -525,6 +538,20 @@ func (s *backendSupervisor) stopAllBackends() {
}
}
// readLastLinesFromFile reads the last n lines from a file.
// Returns an empty string if the file cannot be read.
func readLastLinesFromFile(path string, n int) string {
data, err := os.ReadFile(path)
if err != nil {
return ""
}
lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n")
if len(lines) > n {
lines = lines[len(lines)-n:]
}
return strings.Join(lines, "\n")
}
// isRunning returns whether a specific backend process is currently running.
func (s *backendSupervisor) isRunning(backend string) bool {
s.mu.Lock()

View File

@@ -3,12 +3,17 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/mudler/LocalAI/core/services/storage"
@@ -19,25 +24,58 @@ import (
// Files are transferred between the frontend and backend nodes over a small
// HTTP server running alongside the gRPC backend process.
type HTTPFileStager struct {
httpAddrFor func(nodeID string) (string, error)
token string
client *http.Client
httpAddrFor func(nodeID string) (string, error)
token string
client *http.Client
responseTimeout time.Duration // timeout waiting for server response after upload
maxRetries int // number of retry attempts for transient failures
}
// NewHTTPFileStager creates a new HTTP file stager.
// httpAddrFor should return the HTTP address (host:port) for the given node ID.
// token is the registration token used for authentication.
func NewHTTPFileStager(httpAddrFor func(nodeID string) (string, error), token string) *HTTPFileStager {
timeout := 30 * time.Minute
responseTimeout := 30 * time.Minute
if v := os.Getenv("LOCALAI_FILE_TRANSFER_TIMEOUT"); v != "" {
if d, err := time.ParseDuration(v); err == nil {
timeout = d
responseTimeout = d
}
}
maxRetries := 3
if v := os.Getenv("LOCALAI_FILE_TRANSFER_RETRIES"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n >= 0 {
maxRetries = n
}
}
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 15 * time.Second, // aggressive keepalive for LAN transfers
}).DialContext,
ForceAttemptHTTP2: false, // HTTP/2 flow control can stall large uploads
MaxIdleConns: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
WriteBufferSize: 256 << 10, // 256 KB
ReadBufferSize: 256 << 10, // 256 KB
}
return &HTTPFileStager{
httpAddrFor: httpAddrFor,
token: token,
client: &http.Client{Timeout: timeout},
client: &http.Client{
// No Timeout set — for large uploads, http.Client.Timeout covers the
// entire request lifecycle including the body upload. If it fires
// mid-write, Go closes the connection causing "connection reset by peer"
// on the server. Instead we use ResponseHeaderTimeout on the transport
// to cover only the wait-for-server-response phase.
Transport: transport,
},
responseTimeout: responseTimeout,
maxRetries: maxRetries,
}
}
@@ -49,25 +87,69 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke
return "", fmt.Errorf("resolving HTTP address for node %s: %w", nodeID, err)
}
fi, err := os.Stat(localPath)
if err != nil {
return "", fmt.Errorf("stat local file %s: %w", localPath, err)
}
fileSize := fi.Size()
url := fmt.Sprintf("http://%s/v1/files/%s", addr, key)
xlog.Info("Uploading file to remote node", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "url", url)
var lastErr error
attempts := h.maxRetries + 1 // maxRetries=3 means 4 total attempts (1 initial + 3 retries)
for attempt := 1; attempt <= attempts; attempt++ {
if attempt > 1 {
backoff := time.Duration(5<<(attempt-2)) * time.Second // 5s, 10s, 20s
xlog.Warn("Retrying file upload", "node", nodeID, "file", filepath.Base(localPath),
"attempt", attempt, "of", attempts, "backoff", backoff, "lastError", lastErr)
select {
case <-ctx.Done():
return "", fmt.Errorf("upload cancelled during retry backoff: %w", ctx.Err())
case <-time.After(backoff):
}
}
result, err := h.doUpload(ctx, addr, nodeID, localPath, key, url, fileSize)
if err == nil {
if attempt > 1 {
xlog.Info("File upload succeeded after retry", "node", nodeID, "file", filepath.Base(localPath), "attempt", attempt)
}
return result, nil
}
lastErr = err
if !isTransientError(err) {
xlog.Error("File upload failed with non-transient error", "node", nodeID, "file", filepath.Base(localPath), "error", err)
return "", err
}
xlog.Warn("File upload failed with transient error", "node", nodeID, "file", filepath.Base(localPath),
"attempt", attempt, "of", attempts, "error", err)
}
return "", fmt.Errorf("uploading %s to node %s failed after %d attempts: %w", localPath, nodeID, attempts, lastErr)
}
// doUpload performs a single upload attempt.
func (h *HTTPFileStager) doUpload(ctx context.Context, addr, nodeID, localPath, key, url string, fileSize int64) (string, error) {
f, err := os.Open(localPath)
if err != nil {
return "", fmt.Errorf("opening local file %s: %w", localPath, err)
}
defer f.Close()
fi, _ := f.Stat()
var fileSize int64
if fi != nil {
fileSize = fi.Size()
var body io.Reader = f
// For files > 100MB, wrap with progress logging
const progressThreshold = 100 << 20
if fileSize > progressThreshold {
body = newProgressReader(f, fileSize, filepath.Base(localPath), nodeID)
}
url := fmt.Sprintf("http://%s/v1/files/%s", addr, key)
xlog.Debug("HTTP upload starting", "node", nodeID, "url", url, "localPath", localPath, "fileSize", fileSize)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, f)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body)
if err != nil {
return "", fmt.Errorf("creating request: %w", err)
}
req.ContentLength = fileSize // explicit Content-Length for progress tracking
req.Header.Set("Content-Type", "application/octet-stream")
if h.token != "" {
req.Header.Set("Authorization", "Bearer "+h.token)
@@ -75,13 +157,15 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke
resp, err := h.client.Do(req)
if err != nil {
xlog.Error("File upload failed", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "error", err)
return "", fmt.Errorf("uploading %s to node %s: %w", localPath, nodeID, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return "", fmt.Errorf("upload to node %s failed with status %d: %s", nodeID, resp.StatusCode, string(body))
respBody, _ := io.ReadAll(resp.Body)
xlog.Error("File upload rejected by remote node", "node", nodeID, "file", filepath.Base(localPath), "status", resp.StatusCode, "response", string(respBody))
return "", fmt.Errorf("upload to node %s failed with status %d: %s", nodeID, resp.StatusCode, string(respBody))
}
var result struct {
@@ -91,10 +175,113 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke
return "", fmt.Errorf("decoding upload response: %w", err)
}
xlog.Debug("HTTP upload complete", "node", nodeID, "remotePath", result.LocalPath, "fileSize", fileSize)
xlog.Info("File upload complete", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "remotePath", result.LocalPath)
return result.LocalPath, nil
}
// isTransientError returns true if the error is likely transient and worth retrying.
func isTransientError(err error) bool {
if err == nil {
return false
}
// Connection reset by peer
if errors.Is(err, syscall.ECONNRESET) {
return true
}
// Broken pipe
if errors.Is(err, syscall.EPIPE) {
return true
}
// Connection refused (worker might be restarting)
if errors.Is(err, syscall.ECONNREFUSED) {
return true
}
// Context deadline exceeded (but not cancelled — cancelled means the caller gave up)
if errors.Is(err, context.DeadlineExceeded) {
return true
}
// net.Error timeout
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return true
}
// Check for "connection reset" in the error string as a fallback
// (some wrapped errors lose the syscall.Errno)
msg := err.Error()
if strings.Contains(msg, "connection reset") ||
strings.Contains(msg, "broken pipe") ||
strings.Contains(msg, "connection refused") ||
strings.Contains(msg, "EOF") {
return true
}
return false
}
// progressReader wraps an io.Reader and logs upload progress periodically.
type progressReader struct {
reader io.Reader
total int64
read int64
file string
node string
lastLog time.Time
lastPct int
start time.Time
mu sync.Mutex
}
func newProgressReader(r io.Reader, total int64, file, node string) *progressReader {
return &progressReader{
reader: r,
total: total,
file: file,
node: node,
start: time.Now(),
lastLog: time.Now(),
}
}
func (pr *progressReader) Read(p []byte) (int, error) {
n, err := pr.reader.Read(p)
if n > 0 {
pr.mu.Lock()
pr.read += int64(n)
pct := int(pr.read * 100 / pr.total)
now := time.Now()
// Log every 10% or every 30 seconds
if pct/10 > pr.lastPct/10 || now.Sub(pr.lastLog) >= 30*time.Second {
elapsed := now.Sub(pr.start)
var speed string
if elapsed > 0 {
bytesPerSec := float64(pr.read) / elapsed.Seconds()
speed = humanFileSize(int64(bytesPerSec)) + "/s"
}
xlog.Info("Upload progress", "node", pr.node, "file", pr.file,
"progress", fmt.Sprintf("%d%%", pct),
"sent", humanFileSize(pr.read), "total", humanFileSize(pr.total),
"speed", speed)
pr.lastLog = now
pr.lastPct = pct
}
pr.mu.Unlock()
}
return n, err
}
// humanFileSize returns a human-readable file size string.
func humanFileSize(b int64) string {
const unit = 1024
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := int64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "KMGTPE"[exp])
}
func (h *HTTPFileStager) FetchRemote(ctx context.Context, nodeID, remotePath, localDst string) error {
// For staging files (not under models/ or data/), the worker's file transfer
// server resolves the key as a relative path under its staging directory.

View File

@@ -97,7 +97,10 @@ func StartFileTransferServerWithListener(lis net.Listener, stagingDir, modelsDir
}
addr := lis.Addr().String()
server := &http.Server{Handler: mux}
server := &http.Server{
Handler: mux,
ReadHeaderTimeout: 30 * time.Second, // prevent slowloris; does not affect body reads
}
go func() {
xlog.Info("HTTP file transfer server started", "addr", addr, "stagingDir", stagingDir, "modelsDir", modelsDir, "dataDir", dataDir)
@@ -119,6 +122,8 @@ func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir,
r.Body = http.MaxBytesReader(w, r.Body, maxUploadSize)
}
xlog.Info("Receiving file upload", "key", key, "contentLength", r.ContentLength, "remote", r.RemoteAddr)
// Route keyed files to the appropriate directory
targetDir, relName := resolveKeyToDir(key, stagingDir, modelsDir, dataDir)
@@ -144,11 +149,12 @@ func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir,
n, err := io.Copy(f, r.Body)
if err != nil {
os.Remove(dstPath)
xlog.Error("File upload failed", "key", key, "bytesReceived", n, "contentLength", r.ContentLength, "remote", r.RemoteAddr, "error", err)
http.Error(w, fmt.Sprintf("writing file: %v", err), http.StatusInternalServerError)
return
}
xlog.Debug("HTTP file upload complete", "key", key, "path", dstPath, "size", n)
xlog.Info("File upload complete", "key", key, "path", dstPath, "size", n)
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(map[string]string{"local_path": dstPath}); err != nil {

View File

@@ -368,7 +368,7 @@ func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, paralle
// simply remove the {ModelsPath}/{trackingKey}/ directory.
func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, opts *pb.ModelOptions, trackingKey string) (*pb.ModelOptions, error) {
opts = proto.Clone(opts).(*pb.ModelOptions)
xlog.Debug("Staging model files for remote node", "node", node.Name, "modelFile", opts.ModelFile, "trackingKey", trackingKey)
xlog.Info("Staging model files for remote node", "node", node.Name, "modelFile", opts.ModelFile, "trackingKey", trackingKey)
// Derive the frontend models directory from ModelFile and Model.
// Example: ModelFile="/models/sd-cpp/models/flux.gguf", Model="sd-cpp/models/flux.gguf"
@@ -419,6 +419,7 @@ func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, op
if err != nil {
// ModelFile is required — fail the whole operation
if f.name == "ModelFile" {
xlog.Error("Failed to stage model file for remote node", "node", node.Name, "field", f.name, "path", localPath, "error", err)
return nil, fmt.Errorf("staging model file: %w", err)
}
// Optional files: clear the path so the backend doesn't try a non-existent frontend path