From dd3376e0a9bea4411bd5e43e40e18dd688dc3346 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 30 Mar 2026 17:26:55 +0200 Subject: [PATCH] chore(workers): improve logging, set header timeouts (#9171) Signed-off-by: Ettore Di Giacinto --- core/cli/worker.go | 29 ++- core/services/nodes/file_stager_http.go | 221 ++++++++++++++++++-- core/services/nodes/file_transfer_server.go | 10 +- core/services/nodes/router.go | 3 +- 4 files changed, 242 insertions(+), 21 deletions(-) diff --git a/core/cli/worker.go b/core/cli/worker.go index 4042ae352..f4c707e0c 100644 --- a/core/cli/worker.go +++ b/core/cli/worker.go @@ -476,9 +476,22 @@ func (s *backendSupervisor) startBackend(backend, backendPath string) (string, e return clientAddr, nil } cancel() + + // Check if the process died (e.g. OOM, CUDA error, missing libs) + if !proc.IsAlive() { + stderrTail := readLastLinesFromFile(proc.StderrPath(), 20) + xlog.Warn("Backend process died during startup", "backend", backend, "stderr", stderrTail) + s.mu.Lock() + delete(s.processes, backend) + s.freePorts = append(s.freePorts, port) + s.mu.Unlock() + return "", fmt.Errorf("backend process %s died during startup. Last stderr:\n%s", backend, stderrTail) + } } - xlog.Warn("Backend gRPC server not ready after waiting, proceeding anyway", "backend", backend, "addr", clientAddr) + // Log stderr to help diagnose why the backend isn't responding + stderrTail := readLastLinesFromFile(proc.StderrPath(), 20) + xlog.Warn("Backend gRPC server not ready after waiting, proceeding anyway", "backend", backend, "addr", clientAddr, "stderr", stderrTail) return clientAddr, nil } @@ -525,6 +538,20 @@ func (s *backendSupervisor) stopAllBackends() { } } +// readLastLinesFromFile reads the last n lines from a file. +// Returns an empty string if the file cannot be read. +func readLastLinesFromFile(path string, n int) string { + data, err := os.ReadFile(path) + if err != nil { + return "" + } + lines := strings.Split(strings.TrimRight(string(data), "\n"), "\n") + if len(lines) > n { + lines = lines[len(lines)-n:] + } + return strings.Join(lines, "\n") +} + // isRunning returns whether a specific backend process is currently running. func (s *backendSupervisor) isRunning(backend string) bool { s.mu.Lock() diff --git a/core/services/nodes/file_stager_http.go b/core/services/nodes/file_stager_http.go index 53804ebbe..f22ea0f1f 100644 --- a/core/services/nodes/file_stager_http.go +++ b/core/services/nodes/file_stager_http.go @@ -3,12 +3,17 @@ package nodes import ( "context" "encoding/json" + "errors" "fmt" "io" + "net" "net/http" "os" "path/filepath" + "strconv" "strings" + "sync" + "syscall" "time" "github.com/mudler/LocalAI/core/services/storage" @@ -19,25 +24,58 @@ import ( // Files are transferred between the frontend and backend nodes over a small // HTTP server running alongside the gRPC backend process. type HTTPFileStager struct { - httpAddrFor func(nodeID string) (string, error) - token string - client *http.Client + httpAddrFor func(nodeID string) (string, error) + token string + client *http.Client + responseTimeout time.Duration // timeout waiting for server response after upload + maxRetries int // number of retry attempts for transient failures } // NewHTTPFileStager creates a new HTTP file stager. // httpAddrFor should return the HTTP address (host:port) for the given node ID. // token is the registration token used for authentication. func NewHTTPFileStager(httpAddrFor func(nodeID string) (string, error), token string) *HTTPFileStager { - timeout := 30 * time.Minute + responseTimeout := 30 * time.Minute if v := os.Getenv("LOCALAI_FILE_TRANSFER_TIMEOUT"); v != "" { if d, err := time.ParseDuration(v); err == nil { - timeout = d + responseTimeout = d } } + + maxRetries := 3 + if v := os.Getenv("LOCALAI_FILE_TRANSFER_RETRIES"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n >= 0 { + maxRetries = n + } + } + + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 15 * time.Second, // aggressive keepalive for LAN transfers + }).DialContext, + ForceAttemptHTTP2: false, // HTTP/2 flow control can stall large uploads + MaxIdleConns: 10, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + WriteBufferSize: 256 << 10, // 256 KB + ReadBufferSize: 256 << 10, // 256 KB + } + return &HTTPFileStager{ httpAddrFor: httpAddrFor, token: token, - client: &http.Client{Timeout: timeout}, + client: &http.Client{ + // No Timeout set — for large uploads, http.Client.Timeout covers the + // entire request lifecycle including the body upload. If it fires + // mid-write, Go closes the connection causing "connection reset by peer" + // on the server. Instead we use ResponseHeaderTimeout on the transport + // to cover only the wait-for-server-response phase. + Transport: transport, + }, + responseTimeout: responseTimeout, + maxRetries: maxRetries, } } @@ -49,25 +87,69 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke return "", fmt.Errorf("resolving HTTP address for node %s: %w", nodeID, err) } + fi, err := os.Stat(localPath) + if err != nil { + return "", fmt.Errorf("stat local file %s: %w", localPath, err) + } + fileSize := fi.Size() + + url := fmt.Sprintf("http://%s/v1/files/%s", addr, key) + xlog.Info("Uploading file to remote node", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "url", url) + + var lastErr error + attempts := h.maxRetries + 1 // maxRetries=3 means 4 total attempts (1 initial + 3 retries) + for attempt := 1; attempt <= attempts; attempt++ { + if attempt > 1 { + backoff := time.Duration(5<<(attempt-2)) * time.Second // 5s, 10s, 20s + xlog.Warn("Retrying file upload", "node", nodeID, "file", filepath.Base(localPath), + "attempt", attempt, "of", attempts, "backoff", backoff, "lastError", lastErr) + select { + case <-ctx.Done(): + return "", fmt.Errorf("upload cancelled during retry backoff: %w", ctx.Err()) + case <-time.After(backoff): + } + } + + result, err := h.doUpload(ctx, addr, nodeID, localPath, key, url, fileSize) + if err == nil { + if attempt > 1 { + xlog.Info("File upload succeeded after retry", "node", nodeID, "file", filepath.Base(localPath), "attempt", attempt) + } + return result, nil + } + lastErr = err + + if !isTransientError(err) { + xlog.Error("File upload failed with non-transient error", "node", nodeID, "file", filepath.Base(localPath), "error", err) + return "", err + } + xlog.Warn("File upload failed with transient error", "node", nodeID, "file", filepath.Base(localPath), + "attempt", attempt, "of", attempts, "error", err) + } + + return "", fmt.Errorf("uploading %s to node %s failed after %d attempts: %w", localPath, nodeID, attempts, lastErr) +} + +// doUpload performs a single upload attempt. +func (h *HTTPFileStager) doUpload(ctx context.Context, addr, nodeID, localPath, key, url string, fileSize int64) (string, error) { f, err := os.Open(localPath) if err != nil { return "", fmt.Errorf("opening local file %s: %w", localPath, err) } defer f.Close() - fi, _ := f.Stat() - var fileSize int64 - if fi != nil { - fileSize = fi.Size() + var body io.Reader = f + // For files > 100MB, wrap with progress logging + const progressThreshold = 100 << 20 + if fileSize > progressThreshold { + body = newProgressReader(f, fileSize, filepath.Base(localPath), nodeID) } - url := fmt.Sprintf("http://%s/v1/files/%s", addr, key) - xlog.Debug("HTTP upload starting", "node", nodeID, "url", url, "localPath", localPath, "fileSize", fileSize) - - req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, f) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, body) if err != nil { return "", fmt.Errorf("creating request: %w", err) } + req.ContentLength = fileSize // explicit Content-Length for progress tracking req.Header.Set("Content-Type", "application/octet-stream") if h.token != "" { req.Header.Set("Authorization", "Bearer "+h.token) @@ -75,13 +157,15 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke resp, err := h.client.Do(req) if err != nil { + xlog.Error("File upload failed", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "error", err) return "", fmt.Errorf("uploading %s to node %s: %w", localPath, nodeID, err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return "", fmt.Errorf("upload to node %s failed with status %d: %s", nodeID, resp.StatusCode, string(body)) + respBody, _ := io.ReadAll(resp.Body) + xlog.Error("File upload rejected by remote node", "node", nodeID, "file", filepath.Base(localPath), "status", resp.StatusCode, "response", string(respBody)) + return "", fmt.Errorf("upload to node %s failed with status %d: %s", nodeID, resp.StatusCode, string(respBody)) } var result struct { @@ -91,10 +175,113 @@ func (h *HTTPFileStager) EnsureRemote(ctx context.Context, nodeID, localPath, ke return "", fmt.Errorf("decoding upload response: %w", err) } - xlog.Debug("HTTP upload complete", "node", nodeID, "remotePath", result.LocalPath, "fileSize", fileSize) + xlog.Info("File upload complete", "node", nodeID, "file", filepath.Base(localPath), "size", humanFileSize(fileSize), "remotePath", result.LocalPath) return result.LocalPath, nil } +// isTransientError returns true if the error is likely transient and worth retrying. +func isTransientError(err error) bool { + if err == nil { + return false + } + // Connection reset by peer + if errors.Is(err, syscall.ECONNRESET) { + return true + } + // Broken pipe + if errors.Is(err, syscall.EPIPE) { + return true + } + // Connection refused (worker might be restarting) + if errors.Is(err, syscall.ECONNREFUSED) { + return true + } + // Context deadline exceeded (but not cancelled — cancelled means the caller gave up) + if errors.Is(err, context.DeadlineExceeded) { + return true + } + // net.Error timeout + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + // Check for "connection reset" in the error string as a fallback + // (some wrapped errors lose the syscall.Errno) + msg := err.Error() + if strings.Contains(msg, "connection reset") || + strings.Contains(msg, "broken pipe") || + strings.Contains(msg, "connection refused") || + strings.Contains(msg, "EOF") { + return true + } + return false +} + +// progressReader wraps an io.Reader and logs upload progress periodically. +type progressReader struct { + reader io.Reader + total int64 + read int64 + file string + node string + lastLog time.Time + lastPct int + start time.Time + mu sync.Mutex +} + +func newProgressReader(r io.Reader, total int64, file, node string) *progressReader { + return &progressReader{ + reader: r, + total: total, + file: file, + node: node, + start: time.Now(), + lastLog: time.Now(), + } +} + +func (pr *progressReader) Read(p []byte) (int, error) { + n, err := pr.reader.Read(p) + if n > 0 { + pr.mu.Lock() + pr.read += int64(n) + pct := int(pr.read * 100 / pr.total) + now := time.Now() + // Log every 10% or every 30 seconds + if pct/10 > pr.lastPct/10 || now.Sub(pr.lastLog) >= 30*time.Second { + elapsed := now.Sub(pr.start) + var speed string + if elapsed > 0 { + bytesPerSec := float64(pr.read) / elapsed.Seconds() + speed = humanFileSize(int64(bytesPerSec)) + "/s" + } + xlog.Info("Upload progress", "node", pr.node, "file", pr.file, + "progress", fmt.Sprintf("%d%%", pct), + "sent", humanFileSize(pr.read), "total", humanFileSize(pr.total), + "speed", speed) + pr.lastLog = now + pr.lastPct = pct + } + pr.mu.Unlock() + } + return n, err +} + +// humanFileSize returns a human-readable file size string. +func humanFileSize(b int64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "KMGTPE"[exp]) +} + func (h *HTTPFileStager) FetchRemote(ctx context.Context, nodeID, remotePath, localDst string) error { // For staging files (not under models/ or data/), the worker's file transfer // server resolves the key as a relative path under its staging directory. diff --git a/core/services/nodes/file_transfer_server.go b/core/services/nodes/file_transfer_server.go index 76eca8530..b60713c16 100644 --- a/core/services/nodes/file_transfer_server.go +++ b/core/services/nodes/file_transfer_server.go @@ -97,7 +97,10 @@ func StartFileTransferServerWithListener(lis net.Listener, stagingDir, modelsDir } addr := lis.Addr().String() - server := &http.Server{Handler: mux} + server := &http.Server{ + Handler: mux, + ReadHeaderTimeout: 30 * time.Second, // prevent slowloris; does not affect body reads + } go func() { xlog.Info("HTTP file transfer server started", "addr", addr, "stagingDir", stagingDir, "modelsDir", modelsDir, "dataDir", dataDir) @@ -119,6 +122,8 @@ func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, r.Body = http.MaxBytesReader(w, r.Body, maxUploadSize) } + xlog.Info("Receiving file upload", "key", key, "contentLength", r.ContentLength, "remote", r.RemoteAddr) + // Route keyed files to the appropriate directory targetDir, relName := resolveKeyToDir(key, stagingDir, modelsDir, dataDir) @@ -144,11 +149,12 @@ func handleUpload(w http.ResponseWriter, r *http.Request, stagingDir, modelsDir, n, err := io.Copy(f, r.Body) if err != nil { os.Remove(dstPath) + xlog.Error("File upload failed", "key", key, "bytesReceived", n, "contentLength", r.ContentLength, "remote", r.RemoteAddr, "error", err) http.Error(w, fmt.Sprintf("writing file: %v", err), http.StatusInternalServerError) return } - xlog.Debug("HTTP file upload complete", "key", key, "path", dstPath, "size", n) + xlog.Info("File upload complete", "key", key, "path", dstPath, "size", n) w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(map[string]string{"local_path": dstPath}); err != nil { diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index 042290c9b..c61a11d91 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -368,7 +368,7 @@ func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, paralle // simply remove the {ModelsPath}/{trackingKey}/ directory. func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, opts *pb.ModelOptions, trackingKey string) (*pb.ModelOptions, error) { opts = proto.Clone(opts).(*pb.ModelOptions) - xlog.Debug("Staging model files for remote node", "node", node.Name, "modelFile", opts.ModelFile, "trackingKey", trackingKey) + xlog.Info("Staging model files for remote node", "node", node.Name, "modelFile", opts.ModelFile, "trackingKey", trackingKey) // Derive the frontend models directory from ModelFile and Model. // Example: ModelFile="/models/sd-cpp/models/flux.gguf", Model="sd-cpp/models/flux.gguf" @@ -419,6 +419,7 @@ func (r *SmartRouter) stageModelFiles(ctx context.Context, node *BackendNode, op if err != nil { // ModelFile is required — fail the whole operation if f.name == "ModelFile" { + xlog.Error("Failed to stage model file for remote node", "node", node.Name, "field", f.name, "path", localPath, "error", err) return nil, fmt.Errorf("staging model file: %w", err) } // Optional files: clear the path so the backend doesn't try a non-existent frontend path