From 94fbb03352f7705120b58394c03f03ea234c55a2 Mon Sep 17 00:00:00 2001 From: James Scott Date: Mon, 15 Jun 2026 13:27:49 -0700 Subject: [PATCH] logtail: add stateless generic UploadLogs (#20005) Add UploadLogs, a stateless alternative to NewLogger for callers that want to push a batch of log entries without the background uploader, ring buffer, stderr echoing, or network-up gating that a Logger provides. Entries are encoded, batched up to the server's maximum upload size, and POSTed synchronously; unlike Logger it does not retry. The Logger construction is split into a new unexported newLogger so the connection/encode/upload machinery is shared without starting the background goroutine. Log entries are modeled as a generic LogEntry[T] whose Value is inlined (via go-json-experiment) alongside the reserved "logtail" metadata member. T may be a struct (or pointer), a map with a string key, or a jsontext.Value; use jsontext.Value to mix differently-shaped payloads in a single upload. UploadLogs fills in client_time/proc_id/proc_seq from the Config where the caller leaves them zero. Updates tailscale/corp#40908 Change-Id: Idbf23cd0eb8233082fbdb9abed0f6f153b9225ba Signed-off-by: James Scott --- logtail/logtail.go | 148 ++++++++++++++++++++- logtail/logtail_omit.go | 17 +++ logtail/logtail_test.go | 283 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 421 insertions(+), 27 deletions(-) diff --git a/logtail/logtail.go b/logtail/logtail.go index fd89656e8..a441c2306 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -15,6 +15,7 @@ "expvar" "fmt" "io" + "iter" "log" mrand "math/rand/v2" "net/http" @@ -27,6 +28,7 @@ "time" "github.com/creachadair/msync/trigger" + jsonv2 "github.com/go-json-experiment/json" "github.com/go-json-experiment/json/jsontext" "tailscale.com/envknob" "tailscale.com/metrics" @@ -57,7 +59,10 @@ // but not too large to be a notable waste of memory if retained forever. const bufferSize = 4 << 10 -func NewLogger(cfg Config, logf tslogger.Logf) *Logger { +// newLogger constructs a *Logger from cfg, applying defaults, but does not start +// the background uploading goroutine. It is shared by [NewLogger] and the +// stateless [UploadLogs]. +func newLogger(cfg Config) *Logger { if cfg.BaseURL == "" { cfg.BaseURL = "https://" + DefaultHost } @@ -134,6 +139,14 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { logger.compressLogs = cfg.CompressLogs logger.disabled.Store(cfg.Disabled) + return logger +} + +// NewLogger returns a new Logger that splits logs as configured between local +// logging facilities and uploading to a log server in the background. +func NewLogger(cfg Config, logf tslogger.Logf) *Logger { + logger := newLogger(cfg) + ctx, cancel := context.WithCancel(context.Background()) logger.uploadCancel = cancel @@ -144,6 +157,139 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { return logger } +// Logtail is the reserved "logtail" metadata member of a [LogEntry]. +// +// Zero-valued fields are omitted when an entry is uploaded. [UploadLogs] fills +// in any fields the caller leaves zero from its [Config] (e.g. ClientTime and +// ProcID), so most callers can leave this empty. +type Logtail struct { + // ClientTime is the time the entry was generated. If zero, [UploadLogs] + // fills it with the current time unless Config.SkipClientTime is set. + ClientTime time.Time `json:"client_time,omitzero"` + // ProcID is an ephemeral process identifier; see Config.IncludeProcID. + ProcID uint32 `json:"proc_id,omitzero"` + // ProcSeq is an ephemeral per-process sequence number; see + // Config.IncludeProcSequence. + ProcSeq uint64 `json:"proc_seq,omitzero"` +} + +// LogEntry is a single log entry to be uploaded via [UploadLogs]. +// +// It marshals to a JSON object whose reserved "logtail" member carries the +// metadata in Logtail and whose remaining members are taken from Value, which +// is inlined at the top level alongside "logtail". +// +// Value must marshal to a JSON object: T must be a Go struct (or pointer to +// one), a Go map with a string key, or a [jsontext.Value] holding an object +// (for example jsontext.Value(`{"text":"hello"}`)). This is enforced when the +// entry is uploaded, not at compile time. Use T = [jsontext.Value] to mix +// differently-shaped payloads in a single upload. +type LogEntry[T any] struct { + Logtail Logtail `json:"logtail,omitzero"` + Value T `json:",inline"` +} + +// UploadLogs uploads entries to the log server described by conf and returns +// once they have all been sent (or an upload fails). It returns the number of +// entries that were successfully uploaded. +// +// It is a stateless alternative to [NewLogger] for callers that just want to +// push a batch of log entries without the background uploader, ring buffer, +// stderr echoing, or network-up gating that a [Logger] provides. Each entry is +// marshaled to JSON, its [Logtail] metadata is filled in from conf where the +// caller left it zero (honoring conf.SkipClientTime, conf.IncludeProcID, and +// conf.IncludeProcSequence), and entries are batched up to the server's maximum +// upload size and POSTed synchronously (compressed when conf.CompressLogs is +// set). +// +// Unlike [Logger], UploadLogs does not retry: if a batch fails to upload it +// returns the count of entries sent in prior batches and the error immediately, +// and any remaining entries are not sent. The conf.Stderr and conf.Bus fields +// are ignored. +func UploadLogs[T any](ctx context.Context, conf Config, entries iter.Seq[LogEntry[T]]) (int, error) { + conf.Stderr = io.Discard // pure uploader: never echo to stderr + conf.Bus = nil // no netmon/eventbus subscription for a one-shot + lg := newLogger(conf) + + maxLen := cmp.Or(lg.maxUploadSize, maxSize) + if lg.lowMem { + maxLen /= lowMemRatio + } + + // body accumulates a JSON array of encoded entries: "[e1,e2,...]". + // The framing mirrors Logger.drainPending. + body := make([]byte, 0, bufferSize) // reused across batches + body = append(body, '[') + + // sent counts entries confirmed uploaded; pending counts entries + // accumulated in body but not yet uploaded. A successful sendBatch moves + // pending into sent. + var sent, pending int + sendBatch := func() error { + if len(body) <= len("[") { + return nil + } + out := bytes.TrimRight(body, ",") + out = append(out, ']') + origlen := -1 // sentinel value: uncompressed + // Don't attempt to compress tiny bodies; not worth the CPU cycles. + if lg.compressLogs && len(out) > 256 { + zbody := zstdframe.AppendEncode(nil, out, + zstdframe.FastestCompression, zstdframe.LowMemory(true)) + // Only send it compressed if the bandwidth savings are sufficient. + if len(out)-len(zbody) > 64 { + origlen = len(out) + out = zbody + } + } + // upload is synchronous, so it is safe to reuse body's backing array + // for the next batch once upload returns. + if _, err := lg.upload(ctx, out, origlen); err != nil { + return err + } + body = body[:len("[")] + sent += pending + pending = 0 + return nil + } + + var procSeq uint64 + for e := range entries { + if err := ctx.Err(); err != nil { + return sent, err + } + + // Fill in metadata from conf, preserving any caller-set values. + if e.Logtail.ClientTime.IsZero() && !lg.skipClientTime { + e.Logtail.ClientTime = lg.clock.Now().UTC() + } + if e.Logtail.ProcID == 0 { + e.Logtail.ProcID = lg.procID + } + procSeq++ + if lg.includeProcSequence { + e.Logtail.ProcSeq = procSeq + } + + enc, err := jsonv2.Marshal(e) + if err != nil { + return sent, fmt.Errorf("logtail: encoding entry %d: %w", procSeq, err) + } + + // Flush the current batch before adding an entry that would overflow it. + if len(body) > len("[") && len(body)+len(enc) > maxLen { + if err := sendBatch(); err != nil { + return sent, err + } + } + body = append(body, enc...) + body = append(body, ',') + pending++ + } + err := sendBatch() + return sent, err +} + // Logger writes logs, splitting them as configured between local // logging facilities and uploading to a log server. type Logger struct { diff --git a/logtail/logtail_omit.go b/logtail/logtail_omit.go index 98f1c6a0e..34cb10337 100644 --- a/logtail/logtail_omit.go +++ b/logtail/logtail_omit.go @@ -7,6 +7,8 @@ import ( "context" + "iter" + "time" tslogger "tailscale.com/types/logger" "tailscale.com/types/logid" @@ -18,6 +20,21 @@ type Logger struct{ type Buffer any +type Logtail struct { + ClientTime time.Time `json:"client_time,omitzero"` + ProcID uint32 `json:"proc_id,omitzero"` + ProcSeq uint64 `json:"proc_seq,omitzero"` +} + +type LogEntry[T any] struct { + Logtail Logtail `json:"logtail,omitzero"` + Value T `json:",inline"` +} + +func UploadLogs[T any](ctx context.Context, conf Config, entries iter.Seq[LogEntry[T]]) error { + return nil +} + func Disable() {} func (*Logger) SetEnabled(enabled bool) {} diff --git a/logtail/logtail_test.go b/logtail/logtail_test.go index 0e9d0f262..2fe11af48 100644 --- a/logtail/logtail_test.go +++ b/logtail/logtail_test.go @@ -12,6 +12,7 @@ "net" "net/http" "os" + "slices" "strings" "sync" "testing" @@ -70,34 +71,12 @@ type LogtailTestServer struct { func newTestLogtailServer(t *testing.T) (*LogtailTestServer, *Logger) { // Enable the logtail started message envknob.Setenv("TS_DEBUG_LOGTAIL", "1") - ts := &LogtailTestServer{ - // max channel backlog = 1 "started" + #logLines x "log line" + 1 "closed" - uploaded: make(chan []byte, 2+logLines), - } - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - t.Error("failed to read HTTP request") - } - ts.uploaded <- body - }) + conf, uploaded := newCaptureServer(t, 0) + conf.Bus = eventbustest.NewBus(t) + ts := &LogtailTestServer{uploaded: uploaded} - ln := memnet.Listen("logtail-test:0") - httpsrv := &http.Server{Handler: handler} - go httpsrv.Serve(ln) - t.Cleanup(func() { - httpsrv.Close() - ln.Close() - }) - - logger := NewLogger(Config{ - BaseURL: "http://" + ln.Addr().String(), - Bus: eventbustest.NewBus(t), - HTTPC: &http.Client{ - Transport: &http.Transport{DialContext: ln.Dial}, - }, - }, t.Logf) + logger := NewLogger(conf, t.Logf) // There is always an initial "logtail started" message. body := <-ts.uploaded @@ -264,6 +243,258 @@ func unmarshalOne(t *testing.T, body []byte) map[string]any { return entries[0] } +// newCaptureServer wires up an in-memory HTTP server (via memnet) that sends +// each uploaded request body to the returned channel and responds with +// respStatus (0 means 200 OK), returning a Config whose HTTPC dials it. +func newCaptureServer(t *testing.T, respStatus int) (Config, chan []byte) { + t.Helper() + // max channel backlog = 1 "started" + #logLines x "log line" + 1 "closed" + uploaded := make(chan []byte, 2+logLines) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Errorf("failed to read HTTP request: %v", err) + } + uploaded <- body + if respStatus != 0 { + w.WriteHeader(respStatus) + } + }) + + ln := memnet.Listen("logtail-test:0") + httpsrv := &http.Server{Handler: handler} + go httpsrv.Serve(ln) + t.Cleanup(func() { + httpsrv.Close() + ln.Close() + }) + + conf := Config{ + BaseURL: "http://" + ln.Addr().String(), + HTTPC: &http.Client{Transport: &http.Transport{DialContext: ln.Dial}}, + } + return conf, uploaded +} + +func TestUploadLogs(t *testing.T) { + t.Run("InlinesValueAlongsideLogtail", func(t *testing.T) { + conf, got := newCaptureServer(t, 0) + entries := []LogEntry[jsontext.Value]{ + {Value: jsontext.Value(`{"text":"first line"}`)}, + {Value: jsontext.Value(`{"text":"second line","extra":42}`)}, + } + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err != nil { + t.Fatal(err) + } else if n != len(entries) { + t.Errorf("uploaded %d entries, want %d", n, len(entries)) + } + + var all []map[string]any + body := <-got + if err := json.Unmarshal(body, &all); err != nil { + t.Fatalf("unmarshal %q: %v", body, err) + } + if len(all) != 2 { + t.Fatalf("got %d entries, want 2", len(all)) + } + if got, want := all[0]["text"], "first line"; got != want { + t.Errorf("entry 0 text = %v; want %q", got, want) + } + if got, want := all[1]["text"], "second line"; got != want { + t.Errorf("entry 1 text = %v; want %q", got, want) + } + if got, want := all[1]["extra"], float64(42); got != want { + t.Errorf("entry 1 extra = %v; want %v", got, want) + } + for i, e := range all { + lt, ok := e["logtail"].(map[string]any) + if !ok { + t.Errorf("entry %d missing logtail metadata", i) + continue + } + if _, ok := lt["client_time"]; !ok { + t.Errorf("entry %d missing client_time", i) + } + } + }) + + t.Run("TypedStructPayload", func(t *testing.T) { + conf, got := newCaptureServer(t, 0) + type record struct { + Text string `json:"text"` + Count int `json:"count"` + } + entries := []LogEntry[record]{ + {Value: record{Text: "hi", Count: 3}}, + } + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err != nil { + t.Fatal(err) + } else if n != len(entries) { + t.Errorf("uploaded %d entries, want %d", n, len(entries)) + } + e := unmarshalOne(t, <-got) + if got, want := e["text"], "hi"; got != want { + t.Errorf("text = %v; want %q", got, want) + } + if got, want := e["count"], float64(3); got != want { + t.Errorf("count = %v; want %v", got, want) + } + if _, ok := e["logtail"].(map[string]any); !ok { + t.Errorf("missing logtail metadata") + } + }) + + t.Run("MapPayload", func(t *testing.T) { + conf, got := newCaptureServer(t, 0) + entries := []LogEntry[map[string]any]{ + {Value: map[string]any{"text": "m", "n": 5}}, + } + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err != nil { + t.Fatal(err) + } else if n != len(entries) { + t.Errorf("uploaded %d entries, want %d", n, len(entries)) + } + e := unmarshalOne(t, <-got) + if got, want := e["text"], "m"; got != want { + t.Errorf("text = %v; want %q", got, want) + } + if got, want := e["n"], float64(5); got != want { + t.Errorf("n = %v; want %v", got, want) + } + }) + + t.Run("MapWithNamedStringKey", func(t *testing.T) { + type label string // ~string key + conf, got := newCaptureServer(t, 0) + entries := []LogEntry[map[label]int]{ + {Value: map[label]int{"a": 1, "b": 2}}, + } + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err != nil { + t.Fatal(err) + } else if n != len(entries) { + t.Errorf("uploaded %d entries, want %d", n, len(entries)) + } + e := unmarshalOne(t, <-got) + if got, want := e["a"], float64(1); got != want { + t.Errorf("a = %v; want %v", got, want) + } + if got, want := e["b"], float64(2); got != want { + t.Errorf("b = %v; want %v", got, want) + } + }) + + t.Run("PointerToStructPayload", func(t *testing.T) { + type record struct { + Text string `json:"text"` + } + conf, got := newCaptureServer(t, 0) + entries := []LogEntry[*record]{ + {Value: &record{Text: "ptr"}}, + } + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err != nil { + t.Fatal(err) + } else if n != len(entries) { + t.Errorf("uploaded %d entries, want %d", n, len(entries)) + } + e := unmarshalOne(t, <-got) + if got, want := e["text"], "ptr"; got != want { + t.Errorf("text = %v; want %q", got, want) + } + }) + + t.Run("NilPointerPayloadOmitsInlinedValue", func(t *testing.T) { + type record struct { + Text string `json:"text"` + } + conf, got := newCaptureServer(t, 0) + entries := []LogEntry[*record]{ + {Value: nil}, + } + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err != nil { + t.Fatal(err) + } else if n != len(entries) { + t.Errorf("uploaded %d entries, want %d", n, len(entries)) + } + e := unmarshalOne(t, <-got) + if _, ok := e["text"]; ok { + t.Errorf("expected no inlined members for nil pointer payload, got %v", e) + } + if _, ok := e["logtail"].(map[string]any); !ok { + t.Errorf("missing logtail metadata") + } + }) + + t.Run("RejectsNon-objectPayload", func(t *testing.T) { + conf, _ := newCaptureServer(t, 0) + entries := []LogEntry[int]{{Value: 5}} + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err == nil { + t.Fatal("expected an error marshaling a non-object inline payload") + } else if n != 0 { + t.Errorf("uploaded %d entries, want 0", n) + } + }) + + t.Run("PreservesCaller-setLogtailFields", func(t *testing.T) { + conf, got := newCaptureServer(t, 0) + entries := []LogEntry[jsontext.Value]{ + {Logtail: Logtail{ProcID: 1234}, Value: jsontext.Value(`{"text":"x"}`)}, + } + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err != nil { + t.Fatal(err) + } else if n != len(entries) { + t.Errorf("uploaded %d entries, want %d", n, len(entries)) + } + e := unmarshalOne(t, <-got) + lt := e["logtail"].(map[string]any) + if got, want := lt["proc_id"], float64(1234); got != want { + t.Errorf("proc_id = %v; want %v", got, want) + } + }) + + t.Run("UploadErrorOnNon-200", func(t *testing.T) { + conf, _ := newCaptureServer(t, http.StatusInternalServerError) + entries := []LogEntry[jsontext.Value]{{Value: jsontext.Value(`{"text":"x"}`)}} + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err == nil { + t.Fatal("expected an error when the server responds non-200") + } else if n != 0 { + t.Errorf("uploaded %d entries, want 0", n) + } + }) + + t.Run("IncludesIncrementingProc_seq", func(t *testing.T) { + conf, got := newCaptureServer(t, 0) + conf.IncludeProcSequence = true + entries := []LogEntry[jsontext.Value]{ + {Value: jsontext.Value(`{"text":"one"}`)}, + {Value: jsontext.Value(`{"text":"two"}`)}, + {Value: jsontext.Value(`{"text":"three"}`)}, + } + if n, err := UploadLogs(context.Background(), conf, slices.Values(entries)); err != nil { + t.Fatal(err) + } else if n != len(entries) { + t.Errorf("uploaded %d entries, want %d", n, len(entries)) + } + + var all []map[string]any + if err := json.Unmarshal(<-got, &all); err != nil { + t.Fatal(err) + } + if len(all) != 3 { + t.Fatalf("got %d entries, want 3", len(all)) + } + for i, e := range all { + lt := e["logtail"].(map[string]any) + seq, ok := lt["proc_seq"].(float64) + if !ok { + t.Fatalf("entry %d missing proc_seq", i) + } + if int(seq) != i+1 { + t.Errorf("entry %d proc_seq = %v; want %d", i, seq, i+1) + } + } + }) +} + type simpleMemBuf struct { Buffer buf bytes.Buffer