diff --git a/logtail/filch/filch.go b/logtail/filch/filch.go index d00206dd5..12ac647c4 100644 --- a/logtail/filch/filch.go +++ b/logtail/filch/filch.go @@ -1,148 +1,420 @@ // Copyright (c) Tailscale Inc & AUTHORS // SPDX-License-Identifier: BSD-3-Clause +//go:build !ts_omit_logtail + // Package filch is a file system queue that pilfers your stderr. // (A FILe CHannel that filches.) package filch import ( - "bufio" "bytes" + "cmp" + "errors" + "expvar" "fmt" "io" "os" + "slices" "sync" + + "tailscale.com/util/must" ) var stderrFD = 2 // a variable for testing -const defaultMaxFileSize = 50 << 20 +var errTooLong = errors.New("filch: line too long") +var errClosed = errors.New("filch: buffer is closed") + +const DefaultMaxLineSize = 64 << 10 +const DefaultMaxFileSize = 50 << 20 type Options struct { - ReplaceStderr bool // dup over fd 2 so everything written to stderr comes here - MaxFileSize int + // ReplaceStderr specifies whether to filch [os.Stderr] such that + // everything written there appears in the [Filch] buffer instead. + // In order to write to stderr instead of writing to [Filch], + // then use [Filch.OrigStderr]. + ReplaceStderr bool + + // MaxLineSize is the maximum line size that could be encountered, + // including the trailing newline. This is enforced as a hard limit. + // Writes larger than this will be rejected. Reads larger than this + // will report an error and skip over the long line. + // If zero, the [DefaultMaxLineSize] is used. + MaxLineSize int + + // MaxFileSize specifies the maximum space on disk to use for logs. + // This is not enforced as a hard limit, but rather a soft limit. + // If zero, then [DefaultMaxFileSize] is used. + MaxFileSize int } // A Filch uses two alternating files as a simplistic ring buffer. type Filch struct { + // OrigStderr is the original [os.Stderr] if [Options.ReplaceStderr] is specified. + // Writing directly to this avoids writing into the Filch buffer. + // Otherwise, it is nil. OrigStderr *os.File - mu sync.Mutex - cur *os.File - alt *os.File - altscan *bufio.Scanner - recovered int64 + // maxLineSize specifies the maximum line size to use. + maxLineSize int // immutable once set - maxFileSize int64 - writeCounter int + // maxFileSize specifies the max space either newer and older should use. + maxFileSize int64 // immutable once set - // buf is an initial buffer for altscan. - // As of August 2021, 99.96% of all log lines - // are below 4096 bytes in length. - // Since this cutoff is arbitrary, instead of using 4096, - // we subtract off the size of the rest of the struct - // so that the whole struct takes 4096 bytes - // (less on 32 bit platforms). - // This reduces allocation waste. - buf [4096 - 64]byte + mu sync.Mutex + newer *os.File // newer logs data; writes are appended to the end + older *os.File // older logs data; reads are consumed from the start + + newlyWrittenBytes int64 // bytes written directly to newer; reset upon rotation + newlyFilchedBytes int64 // bytes filched indirectly to newer; reset upon rotation + + wrBuf []byte // temporary buffer for writing; only used for writes without trailing newline + wrBufMaxLen int // maximum length of wrBuf; reduced upon every rotation + + rdBufIdx int // index into rdBuf for the next unread bytes + rdBuf []byte // temporary buffer for reading + rdBufMaxLen int // maximum length of rdBuf; reduced upon every rotation + + // Metrics (see [Filch.ExpVar] for details). + writeCalls expvar.Int + readCalls expvar.Int + rotateCalls expvar.Int + callErrors expvar.Int + writeBytes expvar.Int + readBytes expvar.Int + filchedBytes expvar.Int + droppedBytes expvar.Int + storedBytes expvar.Int +} + +// ExpVar report metrics about the buffer. +// +// - counter_write_calls: Total number of calls to [Filch.Write] +// (excludes calls when file is closed). +// +// - counter_read_calls: Total number of calls to [Filch.TryReadLine] +// (excludes calls when file is closed or no bytes). +// +// - counter_rotate_calls: Total number of calls to rotate the log files +// (excludes calls when there is nothing to rotate to). +// +// - counter_call_errors: Total number of calls returning errors. +// +// - counter_write_bytes: Total number of bytes written +// (includes bytes filched from stderr). +// +// - counter_read_bytes: Total number of bytes read +// (includes bytes filched from stderr). +// +// - counter_filched_bytes: Total number of bytes filched from stderr. +// +// - counter_dropped_bytes: Total number of bytes dropped +// (includes bytes filched from stderr and lines too long to read). +// +// - gauge_stored_bytes: Current number of bytes stored on disk. +func (f *Filch) ExpVar() expvar.Var { + m := new(expvar.Map) + m.Set("counter_write_calls", &f.writeCalls) + m.Set("counter_read_calls", &f.readCalls) + m.Set("counter_rotate_calls", &f.rotateCalls) + m.Set("counter_call_errors", &f.callErrors) + m.Set("counter_write_bytes", &f.writeBytes) + m.Set("counter_read_bytes", &f.readBytes) + m.Set("counter_filched_bytes", &f.filchedBytes) + m.Set("counter_dropped_bytes", &f.droppedBytes) + m.Set("gauge_stored_bytes", &f.storedBytes) + return m +} + +func (f *Filch) unreadReadBuffer() []byte { + return f.rdBuf[f.rdBufIdx:] +} +func (f *Filch) availReadBuffer() []byte { + return f.rdBuf[len(f.rdBuf):cap(f.rdBuf)] +} +func (f *Filch) resetReadBuffer() { + f.rdBufIdx, f.rdBuf = 0, f.rdBuf[:0] +} +func (f *Filch) moveReadBufferToFront() { + f.rdBufIdx, f.rdBuf = 0, f.rdBuf[:copy(f.rdBuf, f.rdBuf[f.rdBufIdx:])] +} +func (f *Filch) growReadBuffer() { + f.rdBuf = slices.Grow(f.rdBuf, cap(f.rdBuf)+1) +} +func (f *Filch) consumeReadBuffer(n int) { + f.rdBufIdx += n +} +func (f *Filch) appendReadBuffer(n int) { + f.rdBuf = f.rdBuf[:len(f.rdBuf)+n] + f.rdBufMaxLen = max(f.rdBufMaxLen, len(f.rdBuf)) } // TryReadline implements the logtail.Buffer interface. -func (f *Filch) TryReadLine() ([]byte, error) { +func (f *Filch) TryReadLine() (b []byte, err error) { f.mu.Lock() defer f.mu.Unlock() - - if f.altscan != nil { - if b, err := f.scan(); b != nil || err != nil { - return b, err - } + if f.older == nil { + return nil, io.EOF } - f.cur, f.alt = f.alt, f.cur - if f.OrigStderr != nil { - if err := dup2Stderr(f.cur); err != nil { + var tooLong bool // whether we are in a line that is too long + defer func() { + f.consumeReadBuffer(len(b)) + if tooLong || len(b) > f.maxLineSize { + f.droppedBytes.Add(int64(len(b))) + b, err = nil, cmp.Or(err, errTooLong) + } else { + f.readBytes.Add(int64(len(b))) + } + if len(b) != 0 || err != nil { + f.readCalls.Add(1) + } + if err != nil { + f.callErrors.Add(1) + } + }() + + for { + // Check if unread buffer already has the next line. + unread := f.unreadReadBuffer() + if i := bytes.IndexByte(unread, '\n') + len("\n"); i > 0 { + return unread[:i], nil + } + + // Check whether to make space for more data to read. + avail := f.availReadBuffer() + if len(avail) == 0 { + switch { + case len(unread) > f.maxLineSize: + tooLong = true + f.droppedBytes.Add(int64(len(unread))) + f.resetReadBuffer() + case len(unread) < cap(f.rdBuf)/10: + f.moveReadBufferToFront() + default: + f.growReadBuffer() + } + avail = f.availReadBuffer() // invariant: len(avail) > 0 + } + + // Read data into the available buffer. + n, err := f.older.Read(avail) + f.appendReadBuffer(n) + if err != nil { + if err == io.EOF { + unread = f.unreadReadBuffer() + if len(unread) == 0 { + if err := f.rotateLocked(); err != nil { + return nil, err + } + if f.storedBytes.Value() == 0 { + return nil, nil + } + continue + } + return unread, nil + } return nil, err } } - if _, err := f.alt.Seek(0, io.SeekStart); err != nil { - return nil, err - } - f.altscan = bufio.NewScanner(f.alt) - f.altscan.Buffer(f.buf[:], bufio.MaxScanTokenSize) - f.altscan.Split(splitLines) - return f.scan() } -func (f *Filch) scan() ([]byte, error) { - if f.altscan.Scan() { - return f.altscan.Bytes(), nil - } - err := f.altscan.Err() - err2 := f.alt.Truncate(0) - _, err3 := f.alt.Seek(0, io.SeekStart) - f.altscan = nil - if err != nil { - return nil, err - } - if err2 != nil { - return nil, err2 - } - if err3 != nil { - return nil, err3 - } - return nil, nil -} +var alwaysStatForTests bool // Write implements the logtail.Buffer interface. -func (f *Filch) Write(b []byte) (int, error) { +func (f *Filch) Write(b []byte) (n int, err error) { f.mu.Lock() defer f.mu.Unlock() - if f.writeCounter == 100 { - // Check the file size every 100 writes. - f.writeCounter = 0 - fi, err := f.cur.Stat() + if f.newer == nil { + return 0, errClosed + } + + defer func() { + f.writeCalls.Add(1) if err != nil { - return 0, err + f.callErrors.Add(1) } - if fi.Size() >= f.maxFileSize { - // This most likely means we are not draining. - // To limit the amount of space we use, throw away the old logs. - if err := moveContents(f.alt, f.cur); err != nil { + }() + + // To make sure we do not write data to disk unbounded + // (in the event that we are not draining fast enough) + // check whether we exceeded maxFileSize. + // If so, then force a file rotation. + if f.newlyWrittenBytes+f.newlyFilchedBytes > f.maxFileSize || f.writeCalls.Value()%100 == 0 || alwaysStatForTests { + f.statAndUpdateBytes() + if f.newlyWrittenBytes+f.newlyFilchedBytes > f.maxFileSize { + if err := f.rotateLocked(); err != nil { return 0, err } } } - f.writeCounter++ + // Write the log entry (appending a newline character if needed). + var newline string if len(b) == 0 || b[len(b)-1] != '\n' { - bnl := make([]byte, len(b)+1) - copy(bnl, b) - bnl[len(bnl)-1] = '\n' - return f.cur.Write(bnl) + newline = "\n" + f.wrBuf = append(append(f.wrBuf[:0], b...), newline...) + f.wrBufMaxLen = max(f.wrBufMaxLen, len(f.wrBuf)) + b = f.wrBuf } - return f.cur.Write(b) + if len(b) > f.maxLineSize { + for line := range bytes.Lines(b) { + if len(line) > f.maxLineSize { + return 0, errTooLong + } + } + } + n, err = f.newer.Write(b) + f.writeBytes.Add(int64(n)) + f.storedBytes.Add(int64(n)) + f.newlyWrittenBytes += int64(n) + return n - len(newline), err // subtract possibly appended newline } -// Close closes the Filch, releasing all os resources. -func (f *Filch) Close() (err error) { +func (f *Filch) statAndUpdateBytes() { + if fi, err := f.newer.Stat(); err == nil { + prevSize := f.newlyWrittenBytes + f.newlyFilchedBytes + filchedBytes := max(0, fi.Size()-prevSize) + f.writeBytes.Add(filchedBytes) + f.filchedBytes.Add(filchedBytes) + f.storedBytes.Add(filchedBytes) + f.newlyFilchedBytes += filchedBytes + } +} + +func (f *Filch) storedBytesForTest() int64 { + return must.Get(f.newer.Stat()).Size() + must.Get(f.older.Stat()).Size() +} + +var activeStderrWriteForTest sync.RWMutex + +// stderrWriteForTest calls [os.Stderr.Write], but respects calls to [waitIdleStderrForTest]. +func stderrWriteForTest(b []byte) int { + activeStderrWriteForTest.RLock() + defer activeStderrWriteForTest.RUnlock() + return must.Get(os.Stderr.Write(b)) +} + +// waitIdleStderrForTest waits until there are no active stderrWriteForTest calls. +func waitIdleStderrForTest() { + activeStderrWriteForTest.Lock() + defer activeStderrWriteForTest.Unlock() +} + +// rotateLocked swaps f.newer and f.older such that: +// +// - f.newer will be truncated and future writes will be appended to the end. +// - if [Options.ReplaceStderr], then stderr writes will redirect to f.newer +// - f.older will contain historical data, reads will consume from the start. +// - f.older is guaranteed to be immutable. +// +// There are two reasons for rotating: +// +// - The reader finished reading f.older. +// No data should be lost under this condition. +// +// - The writer exceeded a limit for f.newer. +// Data may be lost under this cxondition. +func (f *Filch) rotateLocked() error { + f.rotateCalls.Add(1) + + // Truncate the older file. + if fi, err := f.older.Stat(); err != nil { + return err + } else if fi.Size() > 0 { + // Update dropped bytes. + if pos, err := f.older.Seek(0, io.SeekCurrent); err == nil { + rdPos := pos - int64(len(f.unreadReadBuffer())) // adjust for data already read into the read buffer + f.droppedBytes.Add(max(0, fi.Size()-rdPos)) + } + f.resetReadBuffer() + + // Truncate the older file and write relative to the start. + if err := f.older.Truncate(0); err != nil { + return err + } + if _, err := f.older.Seek(0, io.SeekStart); err != nil { + return err + } + } + + // Swap newer and older. + f.newer, f.older = f.older, f.newer + + // If necessary, filch stderr into newer instead of older. + // This must be done after truncation otherwise + // we might lose some stderr data asynchronously written + // right in the middle of a rotation. + // Note that mutex does not prevent stderr writes. + prevSize := f.newlyWrittenBytes + f.newlyFilchedBytes + f.newlyWrittenBytes, f.newlyFilchedBytes = 0, 0 + if f.OrigStderr != nil { + if err := dup2Stderr(f.newer); err != nil { + return err + } + } + + // Update filched bytes and stored bytes metrics. + // This must be done after filching to newer + // so that f.older.Stat is *mostly* stable. + // + // NOTE: Unfortunately, an asynchronous os.Stderr.Write call + // that is already in progress when we called dup2Stderr + // will still write to the previous FD and + // may not be immediately observable by this Stat call. + // This is fundamentally unsolvable with the current design + // as we cannot synchronize all other os.Stderr.Write calls. + // In rare cases, it is possible that [Filch.TryReadLine] consumes + // the entire older file before the write commits, + // leading to dropped stderr lines. + waitIdleStderrForTest() + if fi, err := f.older.Stat(); err != nil { + return err + } else { + filchedBytes := max(0, fi.Size()-prevSize) + f.writeBytes.Add(filchedBytes) + f.filchedBytes.Add(filchedBytes) + f.storedBytes.Set(fi.Size()) // newer has been truncated, so only older matters + } + + // Start reading from the start of older. + if _, err := f.older.Seek(0, io.SeekStart); err != nil { + return err + } + + // Garbage collect unnecessarily large buffers. + mayGarbageCollect := func(b []byte, maxLen int) ([]byte, int) { + if cap(b)/4 > maxLen { // if less than 25% utilized + b = slices.Grow([]byte(nil), 2*maxLen) + } + maxLen = 3 * (maxLen / 4) // reduce by 25% + return b, maxLen + } + f.wrBuf, f.wrBufMaxLen = mayGarbageCollect(f.wrBuf, f.wrBufMaxLen) + f.rdBuf, f.rdBufMaxLen = mayGarbageCollect(f.rdBuf, f.rdBufMaxLen) + + return nil +} + +// Close closes the Filch, releasing all resources. +func (f *Filch) Close() error { f.mu.Lock() defer f.mu.Unlock() - + var errUnsave, errCloseNew, errCloseOld error if f.OrigStderr != nil { - if err2 := unsaveStderr(f.OrigStderr); err == nil { - err = err2 - } + errUnsave = unsaveStderr(f.OrigStderr) f.OrigStderr = nil } - - if err2 := f.cur.Close(); err == nil { - err = err2 + if f.newer != nil { + errCloseNew = f.newer.Close() + f.newer = nil } - if err2 := f.alt.Close(); err == nil { - err = err2 + if f.older != nil { + errCloseOld = f.older.Close() + f.older = nil } - - return err + return errors.Join(errUnsave, errCloseNew, errCloseOld) } // New creates a new filch around two log files, each starting with filePrefix. @@ -181,14 +453,10 @@ func New(filePrefix string, opts Options) (f *Filch, err error) { return nil, err } - mfs := defaultMaxFileSize - if opts.MaxFileSize > 0 { - mfs = opts.MaxFileSize - } - f = &Filch{ - OrigStderr: os.Stderr, // temporary, for past logs recovery - maxFileSize: int64(mfs), - } + f = new(Filch) + f.maxLineSize = int(cmp.Or(max(0, opts.MaxLineSize), DefaultMaxLineSize)) + f.maxFileSize = int64(cmp.Or(max(0, opts.MaxFileSize), DefaultMaxFileSize)) + f.maxFileSize /= 2 // since there are two log files that combine to equal MaxFileSize // Neither, either, or both files may exist and contain logs from // the last time the process ran. The three cases are: @@ -198,35 +466,22 @@ func New(filePrefix string, opts Options) (f *Filch, err error) { // - both: the files were swapped and were starting to be // read out, while new logs streamed into the other // file, but the read out did not complete - if n := fi1.Size() + fi2.Size(); n > 0 { - f.recovered = n - } switch { case fi1.Size() > 0 && fi2.Size() == 0: - f.cur, f.alt = f2, f1 + f.newer, f.older = f2, f1 // use empty file as newer case fi2.Size() > 0 && fi1.Size() == 0: - f.cur, f.alt = f1, f2 - case fi1.Size() > 0 && fi2.Size() > 0: // both - // We need to pick one of the files to be the elder, - // which we do using the mtime. - var older, newer *os.File - if fi1.ModTime().Before(fi2.ModTime()) { - older, newer = f1, f2 - } else { - older, newer = f2, f1 - } - if err := moveContents(older, newer); err != nil { - fmt.Fprintf(f.OrigStderr, "filch: recover move failed: %v\n", err) - fmt.Fprintf(older, "filch: recover move failed: %v\n", err) - } - f.cur, f.alt = newer, older + f.newer, f.older = f1, f2 // use empty file as newer + case fi1.ModTime().Before(fi2.ModTime()): + f.newer, f.older = f2, f1 // use older file as older + case fi2.ModTime().Before(fi1.ModTime()): + f.newer, f.older = f1, f2 // use newer file as newer default: - f.cur, f.alt = f1, f2 // does not matter + f.newer, f.older = f1, f2 // does not matter } - if f.recovered > 0 { - f.altscan = bufio.NewScanner(f.alt) - f.altscan.Buffer(f.buf[:], bufio.MaxScanTokenSize) - f.altscan.Split(splitLines) + f.writeBytes.Set(fi1.Size() + fi2.Size()) + f.storedBytes.Set(fi1.Size() + fi2.Size()) + if fi, err := f.newer.Stat(); err == nil { + f.newlyWrittenBytes = fi.Size() } f.OrigStderr = nil @@ -235,50 +490,10 @@ func New(filePrefix string, opts Options) (f *Filch, err error) { if err != nil { return nil, err } - if err := dup2Stderr(f.cur); err != nil { + if err := dup2Stderr(f.newer); err != nil { return nil, err } } return f, nil } - -func moveContents(dst, src *os.File) (err error) { - defer func() { - _, err2 := src.Seek(0, io.SeekStart) - err3 := src.Truncate(0) - _, err4 := dst.Seek(0, io.SeekStart) - if err == nil { - err = err2 - } - if err == nil { - err = err3 - } - if err == nil { - err = err4 - } - }() - if _, err := src.Seek(0, io.SeekStart); err != nil { - return err - } - if _, err := dst.Seek(0, io.SeekStart); err != nil { - return err - } - if _, err := io.Copy(dst, src); err != nil { - return err - } - return nil -} - -func splitLines(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - if i := bytes.IndexByte(data, '\n'); i >= 0 { - return i + 1, data[0 : i+1], nil - } - if atEOF { - return len(data), data, nil - } - return 0, nil, nil -} diff --git a/logtail/filch/filch_omit.go b/logtail/filch/filch_omit.go new file mode 100644 index 000000000..898978e21 --- /dev/null +++ b/logtail/filch/filch_omit.go @@ -0,0 +1,34 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +//go:build ts_omit_logtail + +package filch + +import "os" + +type Options struct { + ReplaceStderr bool + MaxLineSize int + MaxFileSize int +} + +type Filch struct { + OrigStderr *os.File +} + +func (*Filch) TryReadLine() ([]byte, error) { + return nil, nil +} + +func (*Filch) Write(b []byte) (int, error) { + return len(b), nil +} + +func (f *Filch) Close() error { + return nil +} + +func New(string, Options) (*Filch, error) { + return new(Filch), nil +} diff --git a/logtail/filch/filch_stub.go b/logtail/filch/filch_stub.go index 3bb82b190..f2aeeb9b9 100644 --- a/logtail/filch/filch_stub.go +++ b/logtail/filch/filch_stub.go @@ -1,13 +1,13 @@ // Copyright (c) Tailscale Inc & AUTHORS // SPDX-License-Identifier: BSD-3-Clause -//go:build wasm || plan9 || tamago +//go:build !ts_omit_logtail && (wasm || plan9 || tamago) package filch -import ( - "os" -) +import "os" + +const replaceStderrSupportedForTest = false func saveStderr() (*os.File, error) { return os.Stderr, nil diff --git a/logtail/filch/filch_test.go b/logtail/filch/filch_test.go index 6b7b88414..1e3347180 100644 --- a/logtail/filch/filch_test.go +++ b/logtail/filch/filch_test.go @@ -4,207 +4,380 @@ package filch import ( + "bytes" + "encoding/json" "fmt" "io" + "math" + "math/rand/v2" "os" + "path/filepath" "runtime" "strings" + "sync" + "sync/atomic" "testing" - "unicode" - "unsafe" + "time" + jsonv2 "github.com/go-json-experiment/json" "tailscale.com/tstest" + "tailscale.com/util/must" ) +func init() { alwaysStatForTests = true } + type filchTest struct { *Filch + + filePrefix string } -func newFilchTest(t *testing.T, filePrefix string, opts Options) *filchTest { +func newForTest(t *testing.T, filePrefix string, opts Options) *filchTest { + t.Helper() + if filePrefix == "" { + filePrefix = filepath.Join(t.TempDir(), "testlog") + } f, err := New(filePrefix, opts) if err != nil { t.Fatal(err) } - return &filchTest{Filch: f} + t.Cleanup(func() { + if err := f.Close(); err != nil { + t.Errorf("Close error: %v", err) + } + }) + return &filchTest{Filch: f, filePrefix: filePrefix} } -func (f *filchTest) write(t *testing.T, s string) { +func (f *filchTest) read(t *testing.T, want []byte) { t.Helper() - if _, err := f.Write([]byte(s)); err != nil { - t.Fatal(err) + if got, err := f.TryReadLine(); err != nil { + t.Fatalf("TryReadLine error: %v", err) + } else if string(got) != string(want) { + t.Errorf("TryReadLine = %q, want %q", got, want) } } -func (f *filchTest) read(t *testing.T, want string) { - t.Helper() - if b, err := f.TryReadLine(); err != nil { - t.Fatalf("r.ReadLine() err=%v", err) - } else if got := strings.TrimRightFunc(string(b), unicode.IsSpace); got != want { - t.Errorf("r.ReadLine()=%q, want %q", got, want) - } -} - -func (f *filchTest) readEOF(t *testing.T) { - t.Helper() - if b, err := f.TryReadLine(); b != nil || err != nil { - t.Fatalf("r.ReadLine()=%q err=%v, want nil slice", string(b), err) - } -} - -func (f *filchTest) close(t *testing.T) { - t.Helper() - if err := f.Close(); err != nil { - t.Fatal(err) - } -} - -func TestDropOldLogs(t *testing.T) { - const line1 = "123456789" // 10 bytes (9+newline) - tests := []struct { - write, read int - }{ - {10, 10}, - {100, 100}, - {200, 200}, - {250, 150}, - {500, 200}, - } - for _, tc := range tests { - t.Run(fmt.Sprintf("w%d-r%d", tc.write, tc.read), func(t *testing.T) { - filePrefix := t.TempDir() - f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false, MaxFileSize: 1000}) - defer f.close(t) - // Make filch rotate the logs 3 times - for range tc.write { - f.write(t, line1) +func TestNew(t *testing.T) { + const want1 = "Lorem\nipsum\ndolor\nsit\namet,\nconsectetur\nadipiscing\nelit,\nsed\n" + const want2 = "do\neiusmod\ntempor\nincididunt\nut\nlabore\net\ndolore\nmagna\naliqua.\n" + filePrefix := filepath.Join(t.TempDir(), "testlog") + checkLinesAndCleanup := func() { + t.Helper() + defer os.Remove(filepath.Join(filePrefix + ".log1.txt")) + defer os.Remove(filepath.Join(filePrefix + ".log2.txt")) + f := newForTest(t, filePrefix, Options{}) + var got []byte + for { + b := must.Get(f.TryReadLine()) + if b == nil { + break } - // We should only be able to read the last 150 lines - for i := range tc.read { - f.read(t, line1) - if t.Failed() { - t.Logf("could only read %d lines", i) - break - } - } - f.readEOF(t) - }) + got = append(got, b...) + } + if string(got) != want1+want2 { + t.Errorf("got %q\nwant %q", got, want1+want2) + } } + now := time.Now() + + must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1+want2), 0600)) + checkLinesAndCleanup() + + must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1+want2), 0600)) + checkLinesAndCleanup() + + must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1), 0600)) + os.Chtimes(filePrefix+".log1.txt", now.Add(-time.Minute), now.Add(-time.Minute)) + must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want2), 0600)) + os.Chtimes(filePrefix+".log2.txt", now.Add(+time.Minute), now.Add(+time.Minute)) + checkLinesAndCleanup() + + must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want2), 0600)) + os.Chtimes(filePrefix+".log1.txt", now.Add(+time.Minute), now.Add(+time.Minute)) + must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1), 0600)) + os.Chtimes(filePrefix+".log2.txt", now.Add(-time.Minute), now.Add(-time.Minute)) + checkLinesAndCleanup() } -func TestQueue(t *testing.T) { - filePrefix := t.TempDir() - f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false}) - - f.readEOF(t) - const line1 = "Hello, World!" - const line2 = "This is a test." - const line3 = "Of filch." - f.write(t, line1) - f.write(t, line2) - f.read(t, line1) - f.write(t, line3) - f.read(t, line2) - f.read(t, line3) - f.readEOF(t) - f.write(t, line1) - f.read(t, line1) - f.readEOF(t) - f.close(t) -} - -func TestRecover(t *testing.T) { - t.Run("empty", func(t *testing.T) { - filePrefix := t.TempDir() - f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false}) - f.write(t, "hello") - f.read(t, "hello") - f.readEOF(t) - f.close(t) - - f = newFilchTest(t, filePrefix, Options{ReplaceStderr: false}) - f.readEOF(t) - f.close(t) - }) - - t.Run("cur", func(t *testing.T) { - filePrefix := t.TempDir() - f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false}) - f.write(t, "hello") - f.close(t) - - f = newFilchTest(t, filePrefix, Options{ReplaceStderr: false}) - f.read(t, "hello") - f.readEOF(t) - f.close(t) - }) - - t.Run("alt", func(t *testing.T) { - t.Skip("currently broken on linux, passes on macOS") - /* --- FAIL: TestRecover/alt (0.00s) - filch_test.go:128: r.ReadLine()="world", want "hello" - filch_test.go:129: r.ReadLine()="hello", want "world" - */ - - filePrefix := t.TempDir() - f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false}) - f.write(t, "hello") - f.read(t, "hello") - f.write(t, "world") - f.close(t) - - f = newFilchTest(t, filePrefix, Options{ReplaceStderr: false}) - // TODO(crawshaw): The "hello" log is replayed in recovery. - // We could reduce replays by risking some logs loss. - // What should our policy here be? - f.read(t, "hello") - f.read(t, "world") - f.readEOF(t) - f.close(t) - }) -} - -func TestFilchStderr(t *testing.T) { - if runtime.GOOS == "windows" { - // TODO(bradfitz): this is broken on Windows but not - // fully sure why. Investigate. But notably, the - // stderrFD variable (defined in filch.go) and set - // below is only ever read in filch_unix.go. So just - // skip this for test for now. - t.Skip("test broken on Windows") - } +func setupStderr(t *testing.T) { + t.Helper() pipeR, pipeW, err := os.Pipe() if err != nil { t.Fatal(err) } - defer pipeR.Close() - defer pipeW.Close() - + t.Cleanup(func() { pipeR.Close() }) + t.Cleanup(func() { + switch b, err := io.ReadAll(pipeR); { + case err != nil: + t.Fatalf("ReadAll error: %v", err) + case len(b) > 0: + t.Errorf("unexpected write to fake stderr: %s", b) + } + }) + t.Cleanup(func() { pipeW.Close() }) tstest.Replace(t, &stderrFD, int(pipeW.Fd())) + tstest.Replace(t, &os.Stderr, pipeW) +} - filePrefix := t.TempDir() - f := newFilchTest(t, filePrefix, Options{ReplaceStderr: true}) - f.write(t, "hello") - if _, err := fmt.Fprintf(pipeW, "filch\n"); err != nil { - t.Fatal(err) +func TestConcurrentWriteAndRead(t *testing.T) { + if replaceStderrSupportedForTest { + setupStderr(t) } - f.read(t, "hello") - f.read(t, "filch") - f.readEOF(t) - f.close(t) - pipeW.Close() - b, err := io.ReadAll(pipeR) - if err != nil { - t.Fatal(err) + const numWriters = 10 + const linesPerWriter = 1000 + opts := Options{ReplaceStderr: replaceStderrSupportedForTest, MaxFileSize: math.MaxInt32} + f := newForTest(t, "", opts) + + // Concurrently write many lines. + var draining sync.RWMutex + var group sync.WaitGroup + defer group.Wait() + data := bytes.Repeat([]byte("X"), 1000) + var runningWriters atomic.Int64 + for i := range numWriters { + runningWriters.Add(+1) + group.Go(func() { + defer runningWriters.Add(-1) + var b []byte + for j := range linesPerWriter { + b = fmt.Appendf(b[:0], `{"Index":%d,"Count":%d,"Data":"%s"}`+"\n", i+1, j+1, data[:rand.IntN(len(data))]) + draining.RLock() + if i%2 == 0 && opts.ReplaceStderr { + stderrWriteForTest(b) + } else { + must.Get(f.Write(b)) + } + draining.RUnlock() + runtime.Gosched() + } + }) } - if len(b) > 0 { - t.Errorf("unexpected write to fake stderr: %s", b) + + // Verify that we can read back the lines in an ordered manner. + var lines int + var entry struct{ Index, Count int } + state := make(map[int]int) + checkLine := func() (ok bool) { + b := must.Get(f.TryReadLine()) + if len(b) == 0 { + return false + } + entry.Index, entry.Count = 0, 0 + if err := jsonv2.Unmarshal(b, &entry); err != nil { + t.Fatalf("json.Unmarshal error: %v", err) + } + if wantCount := state[entry.Index] + 1; entry.Count != wantCount { + t.Fatalf("Index:%d, Count = %d, want %d", entry.Index, entry.Count, wantCount) + } + state[entry.Index] = entry.Count + lines++ + return true + } + for lines < numWriters*linesPerWriter { + writersDone := runningWriters.Load() == 0 + for range rand.IntN(100) { + runtime.Gosched() // bias towards more writer operations + } + + if rand.IntN(100) == 0 { + // Asynchronous read of a single line. + if !checkLine() && writersDone { + t.Fatal("failed to read all lines after all writers done") + } + } else { + // Synchronous reading of all lines. + draining.Lock() + for checkLine() { + } + draining.Unlock() + } } } -func TestSizeOf(t *testing.T) { - s := unsafe.Sizeof(Filch{}) - if s > 4096 { - t.Fatalf("Filch{} has size %d on %v, decrease size of buf field", s, runtime.GOARCH) +// Test that the +func TestBufferCapacity(t *testing.T) { + f := newForTest(t, "", Options{}) + b := bytes.Repeat([]byte("X"), 1000) + for range 1000 { + must.Get(f.Write(b[:rand.IntN(len(b))])) + } + for must.Get(f.TryReadLine()) != nil { + } + if !(10*len(b) < cap(f.rdBuf) && cap(f.rdBuf) < 20*len(b)) { + t.Errorf("cap(rdBuf) = %v, want within [%v:%v]", cap(f.rdBuf), 10*len(b), 20*len(b)) + } + + must.Get(f.Write(bytes.Repeat([]byte("X"), DefaultMaxLineSize-1))) + must.Get(f.TryReadLine()) + wrCap, rdCap := cap(f.wrBuf), cap(f.rdBuf) + + // Force another rotation. Buffers should not be GC'd yet. + must.Get(f.TryReadLine()) + if cap(f.wrBuf) != wrCap { + t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), wrCap) + } + if cap(f.rdBuf) != rdCap { + t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), rdCap) + } + + // Force many rotations. Buffers should be GC'd. + for range 64 { + t.Logf("cap(f.wrBuf), cap(f.rdBuf) = %d, %d", cap(f.wrBuf), cap(f.rdBuf)) + must.Get(f.TryReadLine()) + } + if cap(f.wrBuf) != 0 { + t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), 0) + } + if cap(f.rdBuf) != 0 { + t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), 0) + } +} + +func TestMaxLineSize(t *testing.T) { + const maxLineSize = 1000 + f := newForTest(t, "", Options{MaxLineSize: maxLineSize}) + + // Test writing. + b0 := []byte(strings.Repeat("X", maxLineSize-len("\n")) + "\n") + must.Get(f.Write(b0)) + b1 := []byte(strings.Repeat("X", maxLineSize)) + if _, err := f.Write(b1); err != errTooLong { + t.Errorf("Write error = %v, want errTooLong", err) + } + b2 := bytes.Repeat(b0, 2) + must.Get(f.Write(b2)) + if f.storedBytesForTest() != int64(len(b0)+len(b2)) { + t.Errorf("storedBytes = %v, want %v", f.storedBytesForTest(), int64(len(b0)+len(b2))) + } + + // Test reading. + f.read(t, b0) + f.read(t, b0) + f.read(t, b0) + f.read(t, nil) // should trigger rotate + if f.storedBytesForTest() != 0 { + t.Errorf("storedBytes = %v, want 0", f.storedBytesForTest()) + } + + // Test writing + must.Get(f.Write([]byte("hello"))) + must.Get(f.Write(b0)) + must.Get(f.Write([]byte("goodbye"))) + + // Test reading. + f.Close() + f = newForTest(t, f.filePrefix, Options{MaxLineSize: 10}) + f.read(t, []byte("hello\n")) + if _, err := f.TryReadLine(); err != errTooLong { + t.Errorf("Write error = %v, want errTooLong", err) + } + f.read(t, []byte("goodbye\n")) + + // Check that the read buffer does not need to be as long + // as the overly long line to skip over it. + if cap(f.rdBuf) >= maxLineSize/2 { + t.Errorf("cap(rdBuf) = %v, want <%v", cap(f.rdBuf), maxLineSize/2) + } +} + +func TestMaxFileSize(t *testing.T) { + if replaceStderrSupportedForTest { + t.Run("ReplaceStderr:true", func(t *testing.T) { testMaxFileSize(t, true) }) + } + t.Run("ReplaceStderr:false", func(t *testing.T) { testMaxFileSize(t, false) }) +} + +func testMaxFileSize(t *testing.T, replaceStderr bool) { + if replaceStderr { + setupStderr(t) + } + + opts := Options{ReplaceStderr: replaceStderr, MaxFileSize: 1000} + f := newForTest(t, "", opts) + + // Write lots of data. + const calls = 1000 + var group sync.WaitGroup + var filchedBytes, writeBytes int64 + group.Go(func() { + if !opts.ReplaceStderr { + return + } + var b []byte + for i := range calls { + b = fmt.Appendf(b[:0], `{"FilchIndex":%d}`+"\n", i+1) + filchedBytes += int64(stderrWriteForTest(b)) + } + }) + group.Go(func() { + var b []byte + for i := range calls { + b = fmt.Appendf(b[:0], `{"WriteIndex":%d}`+"\n", i+1) + writeBytes += int64(must.Get(f.Write(b))) + } + }) + group.Wait() + f.statAndUpdateBytes() + droppedBytes := filchedBytes + writeBytes - f.storedBytes.Value() + + switch { + case f.writeCalls.Value() != calls: + t.Errorf("writeCalls = %v, want %d", f.writeCalls.Value(), calls) + case f.readCalls.Value() != 0: + t.Errorf("readCalls = %v, want 0", f.readCalls.Value()) + case f.rotateCalls.Value() == 0: + t.Errorf("rotateCalls = 0, want >0") + case f.callErrors.Value() != 0: + t.Errorf("callErrors = %v, want 0", f.callErrors.Value()) + case f.writeBytes.Value() != writeBytes+filchedBytes: + t.Errorf("writeBytes = %v, want %d", f.writeBytes.Value(), writeBytes+filchedBytes) + case f.readBytes.Value() != 0: + t.Errorf("readBytes = %v, want 0", f.readBytes.Value()) + case f.filchedBytes.Value() != filchedBytes: + t.Errorf("filchedBytes = %v, want %d", f.filchedBytes.Value(), filchedBytes) + case f.droppedBytes.Value() != droppedBytes: + t.Errorf("droppedBytes = %v, want %d", f.droppedBytes.Value(), droppedBytes) + case f.droppedBytes.Value() == 0: + t.Errorf("droppedBytes = 0, want >0") + case f.storedBytes.Value() != f.storedBytesForTest(): + t.Errorf("storedBytes = %v, want %d", f.storedBytes.Value(), f.storedBytesForTest()) + case f.storedBytes.Value() > int64(opts.MaxFileSize) && !opts.ReplaceStderr: + // If ReplaceStderr, it is impossible for MaxFileSize to be + // strictly adhered to since asynchronous os.Stderr.Write calls + // do not trigger any checks to enforce maximum file size. + t.Errorf("storedBytes = %v, want <=%d", f.storedBytes.Value(), opts.MaxFileSize) + } + + // Read back the data and verify that the entries are in order. + var readBytes, lastFilchIndex, lastWriteIndex int64 + for { + b := must.Get(f.TryReadLine()) + if len(b) == 0 { + break + } + var entry struct{ FilchIndex, WriteIndex int64 } + must.Do(json.Unmarshal(b, &entry)) + if entry.FilchIndex == 0 && entry.WriteIndex == 0 { + t.Errorf("both indexes are zero") + } + if entry.FilchIndex > 0 { + if entry.FilchIndex <= lastFilchIndex { + t.Errorf("FilchIndex = %d, want >%d", entry.FilchIndex, lastFilchIndex) + } + lastFilchIndex = entry.FilchIndex + } + if entry.WriteIndex > 0 { + if entry.WriteIndex <= lastWriteIndex { + t.Errorf("WriteIndex = %d, want >%d", entry.WriteIndex, lastWriteIndex) + } + lastWriteIndex = entry.WriteIndex + } + readBytes += int64(len(b)) + } + + if f.readBytes.Value() != readBytes { + t.Errorf("readBytes = %v, want %v", f.readBytes.Value(), readBytes) } } diff --git a/logtail/filch/filch_unix.go b/logtail/filch/filch_unix.go index 2eae70ace..27f1d02ee 100644 --- a/logtail/filch/filch_unix.go +++ b/logtail/filch/filch_unix.go @@ -1,7 +1,7 @@ // Copyright (c) Tailscale Inc & AUTHORS // SPDX-License-Identifier: BSD-3-Clause -//go:build !windows && !wasm && !plan9 && !tamago +//go:build !ts_omit_logtail && !windows && !wasm && !plan9 && !tamago package filch @@ -11,6 +11,8 @@ "golang.org/x/sys/unix" ) +const replaceStderrSupportedForTest = true + func saveStderr() (*os.File, error) { fd, err := unix.Dup(stderrFD) if err != nil { diff --git a/logtail/filch/filch_windows.go b/logtail/filch/filch_windows.go index d60514bf0..b08b64db3 100644 --- a/logtail/filch/filch_windows.go +++ b/logtail/filch/filch_windows.go @@ -1,6 +1,8 @@ // Copyright (c) Tailscale Inc & AUTHORS // SPDX-License-Identifier: BSD-3-Clause +//go:build !ts_omit_logtail && windows + package filch import ( @@ -9,6 +11,8 @@ "syscall" ) +const replaceStderrSupportedForTest = true + var kernel32 = syscall.MustLoadDLL("kernel32.dll") var procSetStdHandle = kernel32.MustFindProc("SetStdHandle")