mirror of
https://github.com/tailscale/tailscale.git
synced 2026-04-04 06:36:01 -04:00
Updates rotateLocked so that we hold the activeStderrWriteForTest write lock around the dup2Stderr call, rather than acquiring it only after dup2 was already compelete. This ensures no stderrWriteForTest calls can race with the dup2 syscall. The now unused waitIdleStderrForTest has been removed. On macOS, dup2 and write on the same file descriptor are not atomic with respect to each other, when rotateLocked called dup2Stderr to redirect the stderr fd to a new file, concurrent goroutines calling stderrWriteForTest could observe the fd in a transiently invalid state, resulting in the bad file descripter. Fixes tailscale/corp#36953 Signed-off-by: James Scott <jim@tailscale.com>
409 lines
12 KiB
Go
409 lines
12 KiB
Go
// Copyright (c) Tailscale Inc & contributors
|
|
// SPDX-License-Identifier: BSD-3-Clause
|
|
|
|
package filch
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"math/rand/v2"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
jsonv2 "github.com/go-json-experiment/json"
|
|
"tailscale.com/tstest"
|
|
"tailscale.com/util/must"
|
|
)
|
|
|
|
func init() { alwaysStatForTests = true }
|
|
|
|
type filchTest struct {
|
|
*Filch
|
|
|
|
filePrefix string
|
|
}
|
|
|
|
func newForTest(t *testing.T, filePrefix string, opts Options) *filchTest {
|
|
t.Helper()
|
|
if filePrefix == "" {
|
|
filePrefix = filepath.Join(t.TempDir(), "testlog")
|
|
}
|
|
f, err := New(filePrefix, opts)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if err := f.Close(); err != nil {
|
|
t.Errorf("Close error: %v", err)
|
|
}
|
|
})
|
|
return &filchTest{Filch: f, filePrefix: filePrefix}
|
|
}
|
|
|
|
func (f *filchTest) read(t *testing.T, want []byte) {
|
|
t.Helper()
|
|
if got, err := f.TryReadLine(); err != nil {
|
|
t.Fatalf("TryReadLine error: %v", err)
|
|
} else if string(got) != string(want) {
|
|
t.Errorf("TryReadLine = %q, want %q", got, want)
|
|
}
|
|
}
|
|
|
|
func TestNew(t *testing.T) {
|
|
const want1 = "Lorem\nipsum\ndolor\nsit\namet,\nconsectetur\nadipiscing\nelit,\nsed\n"
|
|
const want2 = "do\neiusmod\ntempor\nincididunt\nut\nlabore\net\ndolore\nmagna\naliqua.\n"
|
|
filePrefix := filepath.Join(t.TempDir(), "testlog")
|
|
checkLinesAndCleanup := func() {
|
|
t.Helper()
|
|
defer os.Remove(filepath.Join(filePrefix + ".log1.txt"))
|
|
defer os.Remove(filepath.Join(filePrefix + ".log2.txt"))
|
|
f := newForTest(t, filePrefix, Options{})
|
|
var got []byte
|
|
for {
|
|
b := must.Get(f.TryReadLine())
|
|
if b == nil {
|
|
break
|
|
}
|
|
got = append(got, b...)
|
|
}
|
|
if string(got) != want1+want2 {
|
|
t.Errorf("got %q\nwant %q", got, want1+want2)
|
|
}
|
|
}
|
|
now := time.Now()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1+want2), 0600))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1+want2), 0600))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want1), 0600))
|
|
os.Chtimes(filePrefix+".log1.txt", now.Add(-time.Minute), now.Add(-time.Minute))
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want2), 0600))
|
|
os.Chtimes(filePrefix+".log2.txt", now.Add(+time.Minute), now.Add(+time.Minute))
|
|
checkLinesAndCleanup()
|
|
|
|
must.Do(os.WriteFile(filePrefix+".log1.txt", []byte(want2), 0600))
|
|
os.Chtimes(filePrefix+".log1.txt", now.Add(+time.Minute), now.Add(+time.Minute))
|
|
must.Do(os.WriteFile(filePrefix+".log2.txt", []byte(want1), 0600))
|
|
os.Chtimes(filePrefix+".log2.txt", now.Add(-time.Minute), now.Add(-time.Minute))
|
|
checkLinesAndCleanup()
|
|
}
|
|
|
|
func setupStderr(t *testing.T) {
|
|
t.Helper()
|
|
pipeR, pipeW, err := os.Pipe()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(func() { pipeR.Close() })
|
|
t.Cleanup(func() {
|
|
switch b, err := io.ReadAll(pipeR); {
|
|
case err != nil:
|
|
t.Fatalf("ReadAll error: %v", err)
|
|
case len(b) > 0:
|
|
t.Errorf("unexpected write to fake stderr: %s", b)
|
|
}
|
|
})
|
|
t.Cleanup(func() { pipeW.Close() })
|
|
tstest.Replace(t, &stderrFD, int(pipeW.Fd()))
|
|
tstest.Replace(t, &os.Stderr, pipeW)
|
|
}
|
|
|
|
func TestConcurrentWriteAndRead(t *testing.T) {
|
|
if replaceStderrSupportedForTest {
|
|
setupStderr(t)
|
|
}
|
|
|
|
const numWriters = 10
|
|
const linesPerWriter = 1000
|
|
opts := Options{ReplaceStderr: replaceStderrSupportedForTest, MaxFileSize: math.MaxInt32}
|
|
f := newForTest(t, "", opts)
|
|
|
|
// Concurrently write many lines.
|
|
var draining sync.RWMutex
|
|
var group sync.WaitGroup
|
|
defer group.Wait()
|
|
data := bytes.Repeat([]byte("X"), 1000)
|
|
var runningWriters atomic.Int64
|
|
for i := range numWriters {
|
|
runningWriters.Add(+1)
|
|
group.Go(func() {
|
|
defer runningWriters.Add(-1)
|
|
var b []byte
|
|
for j := range linesPerWriter {
|
|
b = fmt.Appendf(b[:0], `{"Index":%d,"Count":%d,"Data":"%s"}`+"\n", i+1, j+1, data[:rand.IntN(len(data))])
|
|
draining.RLock()
|
|
if i%2 == 0 && opts.ReplaceStderr {
|
|
stderrWriteForTest(b)
|
|
} else {
|
|
must.Get(f.Write(b))
|
|
}
|
|
draining.RUnlock()
|
|
runtime.Gosched()
|
|
}
|
|
})
|
|
}
|
|
|
|
// Verify that we can read back the lines in an ordered manner.
|
|
var lines int
|
|
var entry struct{ Index, Count int }
|
|
state := make(map[int]int)
|
|
checkLine := func() (ok bool) {
|
|
b := must.Get(f.TryReadLine())
|
|
if len(b) == 0 {
|
|
return false
|
|
}
|
|
entry.Index, entry.Count = 0, 0
|
|
if err := jsonv2.Unmarshal(b, &entry); err != nil {
|
|
t.Fatalf("json.Unmarshal error: %v", err)
|
|
}
|
|
if wantCount := state[entry.Index] + 1; entry.Count != wantCount {
|
|
t.Fatalf("Index:%d, Count = %d, want %d", entry.Index, entry.Count, wantCount)
|
|
}
|
|
state[entry.Index] = entry.Count
|
|
lines++
|
|
return true
|
|
}
|
|
for lines < numWriters*linesPerWriter {
|
|
writersDone := runningWriters.Load() == 0
|
|
for range rand.IntN(100) {
|
|
runtime.Gosched() // bias towards more writer operations
|
|
}
|
|
|
|
if rand.IntN(100) == 0 {
|
|
// Asynchronous read of a single line.
|
|
if !checkLine() && writersDone {
|
|
t.Fatal("failed to read all lines after all writers done")
|
|
}
|
|
} else {
|
|
// Synchronous reading of all lines.
|
|
draining.Lock()
|
|
for checkLine() {
|
|
}
|
|
draining.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test that the
|
|
func TestBufferCapacity(t *testing.T) {
|
|
f := newForTest(t, "", Options{})
|
|
b := bytes.Repeat([]byte("X"), 1000)
|
|
for range 1000 {
|
|
must.Get(f.Write(b[:rand.IntN(len(b))]))
|
|
}
|
|
for must.Get(f.TryReadLine()) != nil {
|
|
}
|
|
if !(10*len(b) < cap(f.rdBuf) && cap(f.rdBuf) < 20*len(b)) {
|
|
t.Errorf("cap(rdBuf) = %v, want within [%v:%v]", cap(f.rdBuf), 10*len(b), 20*len(b))
|
|
}
|
|
|
|
must.Get(f.Write(bytes.Repeat([]byte("X"), DefaultMaxLineSize-1)))
|
|
must.Get(f.TryReadLine())
|
|
wrCap, rdCap := cap(f.wrBuf), cap(f.rdBuf)
|
|
|
|
// Force another rotation. Buffers should not be GC'd yet.
|
|
must.Get(f.TryReadLine())
|
|
if cap(f.wrBuf) != wrCap {
|
|
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), wrCap)
|
|
}
|
|
if cap(f.rdBuf) != rdCap {
|
|
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), rdCap)
|
|
}
|
|
|
|
// Force many rotations. Buffers should be GC'd.
|
|
for range 64 {
|
|
t.Logf("cap(f.wrBuf), cap(f.rdBuf) = %d, %d", cap(f.wrBuf), cap(f.rdBuf))
|
|
must.Get(f.TryReadLine())
|
|
}
|
|
if cap(f.wrBuf) != 0 {
|
|
t.Errorf("cap(f.wrBuf) = %v, want %v", cap(f.wrBuf), 0)
|
|
}
|
|
if cap(f.rdBuf) != 0 {
|
|
t.Errorf("cap(f.rdBuf) = %v, want %v", cap(f.rdBuf), 0)
|
|
}
|
|
}
|
|
|
|
func TestMaxLineSize(t *testing.T) {
|
|
const maxLineSize = 1000
|
|
f := newForTest(t, "", Options{MaxLineSize: maxLineSize})
|
|
|
|
// Test writing.
|
|
b0 := []byte(strings.Repeat("X", maxLineSize-len("\n")) + "\n")
|
|
must.Get(f.Write(b0))
|
|
b1 := []byte(strings.Repeat("X", maxLineSize))
|
|
if _, err := f.Write(b1); err != errTooLong {
|
|
t.Errorf("Write error = %v, want errTooLong", err)
|
|
}
|
|
b2 := bytes.Repeat(b0, 2)
|
|
must.Get(f.Write(b2))
|
|
if f.storedBytesForTest() != int64(len(b0)+len(b2)) {
|
|
t.Errorf("storedBytes = %v, want %v", f.storedBytesForTest(), int64(len(b0)+len(b2)))
|
|
}
|
|
|
|
// Test reading.
|
|
f.read(t, b0)
|
|
f.read(t, b0)
|
|
f.read(t, b0)
|
|
f.read(t, nil) // should trigger rotate
|
|
if f.storedBytesForTest() != 0 {
|
|
t.Errorf("storedBytes = %v, want 0", f.storedBytesForTest())
|
|
}
|
|
|
|
// Test writing
|
|
must.Get(f.Write([]byte("hello")))
|
|
must.Get(f.Write(b0))
|
|
must.Get(f.Write([]byte("goodbye")))
|
|
|
|
// Test reading.
|
|
f.Close()
|
|
f = newForTest(t, f.filePrefix, Options{MaxLineSize: 10})
|
|
f.read(t, []byte("hello\n"))
|
|
if _, err := f.TryReadLine(); err != errTooLong {
|
|
t.Errorf("Write error = %v, want errTooLong", err)
|
|
}
|
|
f.read(t, []byte("goodbye\n"))
|
|
|
|
// Check that the read buffer does not need to be as long
|
|
// as the overly long line to skip over it.
|
|
if cap(f.rdBuf) >= maxLineSize/2 {
|
|
t.Errorf("cap(rdBuf) = %v, want <%v", cap(f.rdBuf), maxLineSize/2)
|
|
}
|
|
}
|
|
|
|
func TestMaxFileSize(t *testing.T) {
|
|
if replaceStderrSupportedForTest {
|
|
t.Run("ReplaceStderr:true", func(t *testing.T) { testMaxFileSize(t, true) })
|
|
}
|
|
t.Run("ReplaceStderr:false", func(t *testing.T) { testMaxFileSize(t, false) })
|
|
}
|
|
|
|
func testMaxFileSize(t *testing.T, replaceStderr bool) {
|
|
if replaceStderr {
|
|
setupStderr(t)
|
|
}
|
|
|
|
opts := Options{ReplaceStderr: replaceStderr, MaxFileSize: 1000}
|
|
f := newForTest(t, "", opts)
|
|
|
|
// Write lots of data.
|
|
const calls = 1000
|
|
var group sync.WaitGroup
|
|
var filchedBytes, writeBytes int64
|
|
group.Go(func() {
|
|
if !opts.ReplaceStderr {
|
|
return
|
|
}
|
|
var b []byte
|
|
for i := range calls {
|
|
b = fmt.Appendf(b[:0], `{"FilchIndex":%d}`+"\n", i+1)
|
|
filchedBytes += int64(stderrWriteForTest(b))
|
|
}
|
|
})
|
|
group.Go(func() {
|
|
var b []byte
|
|
for i := range calls {
|
|
b = fmt.Appendf(b[:0], `{"WriteIndex":%d}`+"\n", i+1)
|
|
writeBytes += int64(must.Get(f.Write(b)))
|
|
}
|
|
})
|
|
group.Wait()
|
|
f.statAndUpdateBytes()
|
|
droppedBytes := filchedBytes + writeBytes - f.storedBytes.Value()
|
|
|
|
switch {
|
|
case f.writeCalls.Value() != calls:
|
|
t.Errorf("writeCalls = %v, want %d", f.writeCalls.Value(), calls)
|
|
case f.readCalls.Value() != 0:
|
|
t.Errorf("readCalls = %v, want 0", f.readCalls.Value())
|
|
case f.rotateCalls.Value() == 0:
|
|
t.Errorf("rotateCalls = 0, want >0")
|
|
case f.callErrors.Value() != 0:
|
|
t.Errorf("callErrors = %v, want 0", f.callErrors.Value())
|
|
case f.writeBytes.Value() != writeBytes+filchedBytes:
|
|
t.Errorf("writeBytes = %v, want %d", f.writeBytes.Value(), writeBytes+filchedBytes)
|
|
case f.readBytes.Value() != 0:
|
|
t.Errorf("readBytes = %v, want 0", f.readBytes.Value())
|
|
case f.filchedBytes.Value() != filchedBytes:
|
|
t.Errorf("filchedBytes = %v, want %d", f.filchedBytes.Value(), filchedBytes)
|
|
case f.droppedBytes.Value() != droppedBytes:
|
|
t.Errorf("droppedBytes = %v, want %d", f.droppedBytes.Value(), droppedBytes)
|
|
case f.droppedBytes.Value() == 0:
|
|
t.Errorf("droppedBytes = 0, want >0")
|
|
case f.storedBytes.Value() != f.storedBytesForTest():
|
|
t.Errorf("storedBytes = %v, want %d", f.storedBytes.Value(), f.storedBytesForTest())
|
|
case f.storedBytes.Value() > int64(opts.MaxFileSize) && !opts.ReplaceStderr:
|
|
// If ReplaceStderr, it is impossible for MaxFileSize to be
|
|
// strictly adhered to since asynchronous os.Stderr.Write calls
|
|
// do not trigger any checks to enforce maximum file size.
|
|
t.Errorf("storedBytes = %v, want <=%d", f.storedBytes.Value(), opts.MaxFileSize)
|
|
}
|
|
|
|
// Read back the data and verify that the entries are in order.
|
|
var readBytes, lastFilchIndex, lastWriteIndex int64
|
|
for {
|
|
b := must.Get(f.TryReadLine())
|
|
if len(b) == 0 {
|
|
break
|
|
}
|
|
var entry struct{ FilchIndex, WriteIndex int64 }
|
|
must.Do(json.Unmarshal(b, &entry))
|
|
if entry.FilchIndex == 0 && entry.WriteIndex == 0 {
|
|
t.Errorf("both indexes are zero")
|
|
}
|
|
if entry.FilchIndex > 0 {
|
|
if entry.FilchIndex <= lastFilchIndex {
|
|
t.Errorf("FilchIndex = %d, want >%d", entry.FilchIndex, lastFilchIndex)
|
|
}
|
|
lastFilchIndex = entry.FilchIndex
|
|
}
|
|
if entry.WriteIndex > 0 {
|
|
if entry.WriteIndex <= lastWriteIndex {
|
|
t.Errorf("WriteIndex = %d, want >%d", entry.WriteIndex, lastWriteIndex)
|
|
}
|
|
lastWriteIndex = entry.WriteIndex
|
|
}
|
|
readBytes += int64(len(b))
|
|
}
|
|
|
|
if f.readBytes.Value() != readBytes {
|
|
t.Errorf("readBytes = %v, want %v", f.readBytes.Value(), readBytes)
|
|
}
|
|
}
|
|
|
|
// TestConcurrentSameFile tests that concurrent Filch operations on the same
|
|
// set of log files does not result in a panic.
|
|
// The exact behavior is undefined, but we should at least avoid a panic.
|
|
func TestConcurrentSameFile(t *testing.T) {
|
|
filePrefix := filepath.Join(t.TempDir(), "testlog")
|
|
f1 := must.Get(New(filePrefix, Options{MaxFileSize: 1000}))
|
|
defer f1.Close()
|
|
f2 := must.Get(New(filePrefix, Options{MaxFileSize: 1000}))
|
|
defer f2.Close()
|
|
var group sync.WaitGroup
|
|
for _, f := range []*Filch{f1, f2} {
|
|
group.Go(func() {
|
|
for range 1000 {
|
|
for range rand.IntN(10) {
|
|
f.Write([]byte("hello, world"))
|
|
}
|
|
for range rand.IntN(10) {
|
|
f.TryReadLine()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
group.Wait()
|
|
}
|