mirror of
https://github.com/rclone/rclone.git
synced 2026-03-26 03:12:36 -04:00
fs/log: fix data race on OutputHandler.format field
The format field was read in Handle() without synchronization while setFormat() could write it concurrently from InitLogging(). This caused a data race detected by the race detector, failing TestListBucketsAuthProxy in cmd/serve/s3. Fix by protecting all access to format with the existing mutex.
This commit is contained in:
@@ -125,7 +125,7 @@ type OutputHandler struct {
|
||||
mu sync.Mutex
|
||||
output []outputFn // log to writer if empty or the last item
|
||||
outputExtra []outputExtra // log to all these additional places
|
||||
format logFormat
|
||||
format logFormat // protected by mu
|
||||
jsonBuf bytes.Buffer
|
||||
jsonHandler *slog.JSONHandler
|
||||
}
|
||||
@@ -139,6 +139,13 @@ type outputExtra struct {
|
||||
// Define the type of the override logger
|
||||
type outputFn func(level slog.Level, text string)
|
||||
|
||||
// getFormat returns the current log format flags under the mutex.
|
||||
func (h *OutputHandler) getFormat() logFormat {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.format
|
||||
}
|
||||
|
||||
// NewOutputHandler creates a new OutputHandler with the specified flags.
|
||||
//
|
||||
// This is designed to use log/slog but produce output which is
|
||||
@@ -215,16 +222,22 @@ func (h *OutputHandler) setWriter(writer io.Writer) {
|
||||
|
||||
// Set the format flags to that passed in.
|
||||
func (h *OutputHandler) setFormat(format logFormat) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.format = format
|
||||
}
|
||||
|
||||
// clear format flags that this output type doesn't want
|
||||
func (h *OutputHandler) clearFormatFlags(bitMask logFormat) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.format &^= bitMask
|
||||
}
|
||||
|
||||
// set format flags that this output type requires
|
||||
func (h *OutputHandler) setFormatFlags(bitMask logFormat) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.format |= bitMask
|
||||
}
|
||||
|
||||
@@ -238,37 +251,37 @@ func (h *OutputHandler) Enabled(_ context.Context, level slog.Level) bool {
|
||||
}
|
||||
|
||||
// Create a log header in Go standard log format.
|
||||
func (h *OutputHandler) formatStdLogHeader(buf *bytes.Buffer, level slog.Level, t time.Time, object string, lineInfo string) {
|
||||
func (h *OutputHandler) formatStdLogHeader(buf *bytes.Buffer, format logFormat, level slog.Level, t time.Time, object string, lineInfo string) {
|
||||
// Add time in Go standard format if requested
|
||||
if h.format&(logFormatDate|logFormatTime|logFormatMicroseconds) != 0 {
|
||||
if h.format&logFormatUTC != 0 {
|
||||
if format&(logFormatDate|logFormatTime|logFormatMicroseconds) != 0 {
|
||||
if format&logFormatUTC != 0 {
|
||||
t = t.UTC()
|
||||
}
|
||||
if h.format&logFormatDate != 0 {
|
||||
if format&logFormatDate != 0 {
|
||||
year, month, day := t.Date()
|
||||
fmt.Fprintf(buf, "%04d/%02d/%02d ", year, month, day)
|
||||
}
|
||||
if h.format&(logFormatTime|logFormatMicroseconds) != 0 {
|
||||
if format&(logFormatTime|logFormatMicroseconds) != 0 {
|
||||
hour, min, sec := t.Clock()
|
||||
fmt.Fprintf(buf, "%02d:%02d:%02d", hour, min, sec)
|
||||
if h.format&logFormatMicroseconds != 0 {
|
||||
if format&logFormatMicroseconds != 0 {
|
||||
fmt.Fprintf(buf, ".%06d", t.Nanosecond()/1e3)
|
||||
}
|
||||
buf.WriteByte(' ')
|
||||
}
|
||||
}
|
||||
// Add source code filename:line if requested
|
||||
if h.format&(logFormatShortFile|logFormatLongFile) != 0 && lineInfo != "" {
|
||||
if format&(logFormatShortFile|logFormatLongFile) != 0 && lineInfo != "" {
|
||||
buf.WriteString(lineInfo)
|
||||
buf.WriteByte(':')
|
||||
buf.WriteByte(' ')
|
||||
}
|
||||
// Add PID if requested
|
||||
if h.format&logFormatPid != 0 {
|
||||
if format&logFormatPid != 0 {
|
||||
fmt.Fprintf(buf, "[%d] ", os.Getpid())
|
||||
}
|
||||
// Add log level if required
|
||||
if h.format&logFormatNoLevel == 0 {
|
||||
if format&logFormatNoLevel == 0 {
|
||||
levelStr := slogLevelToString(level)
|
||||
fmt.Fprintf(buf, "%-6s: ", levelStr)
|
||||
}
|
||||
@@ -281,9 +294,9 @@ func (h *OutputHandler) formatStdLogHeader(buf *bytes.Buffer, level slog.Level,
|
||||
}
|
||||
|
||||
// Create a log in standard Go log format into buf.
|
||||
func (h *OutputHandler) textLog(ctx context.Context, buf *bytes.Buffer, r slog.Record) error {
|
||||
func (h *OutputHandler) textLog(ctx context.Context, buf *bytes.Buffer, format logFormat, r slog.Record) error {
|
||||
var lineInfo string
|
||||
if h.format&(logFormatShortFile|logFormatLongFile) != 0 {
|
||||
if format&(logFormatShortFile|logFormatLongFile) != 0 {
|
||||
lineInfo = getCaller(2)
|
||||
}
|
||||
|
||||
@@ -296,7 +309,7 @@ func (h *OutputHandler) textLog(ctx context.Context, buf *bytes.Buffer, r slog.R
|
||||
return true
|
||||
})
|
||||
|
||||
h.formatStdLogHeader(buf, r.Level, r.Time, object, lineInfo)
|
||||
h.formatStdLogHeader(buf, format, r.Level, r.Time, object, lineInfo)
|
||||
buf.WriteString(r.Message)
|
||||
if buf.Len() == 0 || buf.Bytes()[buf.Len()-1] != '\n' { // Ensure newline
|
||||
buf.WriteByte('\n')
|
||||
@@ -305,13 +318,13 @@ func (h *OutputHandler) textLog(ctx context.Context, buf *bytes.Buffer, r slog.R
|
||||
}
|
||||
|
||||
// Create a log in JSON format into buf.
|
||||
func (h *OutputHandler) jsonLog(ctx context.Context, buf *bytes.Buffer, r slog.Record) (err error) {
|
||||
func (h *OutputHandler) jsonLog(ctx context.Context, buf *bytes.Buffer, format logFormat, r slog.Record) (err error) {
|
||||
// Call the JSON handler to create the JSON in buf
|
||||
r.AddAttrs(
|
||||
slog.String("source", getCaller(2)),
|
||||
)
|
||||
// Add PID if requested
|
||||
if h.format&logFormatPid != 0 {
|
||||
if format&logFormatPid != 0 {
|
||||
r.AddAttrs(slog.Int("pid", os.Getpid()))
|
||||
}
|
||||
h.mu.Lock()
|
||||
@@ -331,8 +344,15 @@ func (h *OutputHandler) Handle(ctx context.Context, r slog.Record) (err error) {
|
||||
buf *bytes.Buffer
|
||||
)
|
||||
|
||||
// Read the format under the mutex once so it is consistent
|
||||
// throughout this call. The mutex is released before calling
|
||||
// jsonLog/textLog (which may re-acquire it) to avoid deadlock.
|
||||
h.mu.Lock()
|
||||
format := h.format
|
||||
h.mu.Unlock()
|
||||
|
||||
// Check whether we need to build Text or JSON logs or both
|
||||
needJSON := h.format&logFormatJSON != 0
|
||||
needJSON := format&logFormatJSON != 0
|
||||
needText := !needJSON
|
||||
for _, out := range h.outputExtra {
|
||||
if out.json {
|
||||
@@ -345,7 +365,7 @@ func (h *OutputHandler) Handle(ctx context.Context, r slog.Record) (err error) {
|
||||
if needJSON {
|
||||
var bufJSONBack [256]byte
|
||||
bufJSON = bytes.NewBuffer(bufJSONBack[:0])
|
||||
err = h.jsonLog(ctx, bufJSON, r)
|
||||
err = h.jsonLog(ctx, bufJSON, format, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -354,7 +374,7 @@ func (h *OutputHandler) Handle(ctx context.Context, r slog.Record) (err error) {
|
||||
if needText {
|
||||
var bufTextBack [256]byte
|
||||
bufText = bytes.NewBuffer(bufTextBack[:0])
|
||||
err = h.textLog(ctx, bufText, r)
|
||||
err = h.textLog(ctx, bufText, format, r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -364,7 +384,7 @@ func (h *OutputHandler) Handle(ctx context.Context, r slog.Record) (err error) {
|
||||
defer h.mu.Unlock()
|
||||
|
||||
// Do the log, either to the default destination or to the alternate logging system
|
||||
if h.format&logFormatJSON != 0 {
|
||||
if format&logFormatJSON != 0 {
|
||||
buf = bufJSON
|
||||
} else {
|
||||
buf = bufText
|
||||
@@ -390,13 +410,13 @@ func (h *OutputHandler) Handle(ctx context.Context, r slog.Record) (err error) {
|
||||
// WithAttrs creates a new handler with the same writer, options, and flags.
|
||||
// Attributes are ignored for the output format of this specific handler.
|
||||
func (h *OutputHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
|
||||
return NewOutputHandler(h.writer, &h.opts, h.format)
|
||||
return NewOutputHandler(h.writer, &h.opts, h.getFormat())
|
||||
}
|
||||
|
||||
// WithGroup creates a new handler with the same writer, options, and flags.
|
||||
// Groups are ignored for the output format of this specific handler.
|
||||
func (h *OutputHandler) WithGroup(name string) slog.Handler {
|
||||
return NewOutputHandler(h.writer, &h.opts, h.format)
|
||||
return NewOutputHandler(h.writer, &h.opts, h.getFormat())
|
||||
}
|
||||
|
||||
// Check interface
|
||||
|
||||
@@ -4,9 +4,11 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -91,9 +93,9 @@ func TestFormatStdLogHeader(t *testing.T) {
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
h := &OutputHandler{format: tc.format}
|
||||
h := NewOutputHandler(&bytes.Buffer{}, nil, tc.format)
|
||||
buf := &bytes.Buffer{}
|
||||
h.formatStdLogHeader(buf, slog.LevelInfo, t0, tc.object, tc.lineInfo)
|
||||
h.formatStdLogHeader(buf, tc.format, slog.LevelInfo, t0, tc.object, tc.lineInfo)
|
||||
if !strings.HasPrefix(buf.String(), tc.wantPrefix) {
|
||||
t.Errorf("%s: got %q; want prefix %q", tc.name, buf.String(), tc.wantPrefix)
|
||||
}
|
||||
@@ -114,13 +116,13 @@ func TestEnabled(t *testing.T) {
|
||||
|
||||
// Test clearFormatFlags and setFormatFlags bitwise ops.
|
||||
func TestClearSetFormatFlags(t *testing.T) {
|
||||
h := &OutputHandler{format: logFormatDate | logFormatTime}
|
||||
h := NewOutputHandler(&bytes.Buffer{}, nil, logFormatDate|logFormatTime)
|
||||
|
||||
h.clearFormatFlags(logFormatTime)
|
||||
assert.True(t, h.format&logFormatTime == 0)
|
||||
assert.True(t, h.getFormat()&logFormatTime == 0)
|
||||
|
||||
h.setFormatFlags(logFormatMicroseconds)
|
||||
assert.True(t, h.format&logFormatMicroseconds != 0)
|
||||
assert.True(t, h.getFormat()&logFormatMicroseconds != 0)
|
||||
}
|
||||
|
||||
// Test SetOutput and ResetOutput override the default writer.
|
||||
@@ -231,7 +233,7 @@ func TestTextLogAndJsonLog(t *testing.T) {
|
||||
|
||||
// textLog
|
||||
bufText := &bytes.Buffer{}
|
||||
require.NoError(t, h.textLog(context.Background(), bufText, r))
|
||||
require.NoError(t, h.textLog(context.Background(), bufText, h.getFormat(), r))
|
||||
out := bufText.String()
|
||||
if !strings.Contains(out, "WARNING") || !strings.Contains(out, "obj:") || !strings.HasSuffix(out, "\n") {
|
||||
t.Errorf("textLog output = %q", out)
|
||||
@@ -239,13 +241,110 @@ func TestTextLogAndJsonLog(t *testing.T) {
|
||||
|
||||
// jsonLog
|
||||
bufJSON := &bytes.Buffer{}
|
||||
require.NoError(t, h.jsonLog(context.Background(), bufJSON, r))
|
||||
require.NoError(t, h.jsonLog(context.Background(), bufJSON, h.getFormat(), r))
|
||||
j := bufJSON.String()
|
||||
if !strings.Contains(j, `"level":"warning"`) || !strings.Contains(j, `"msg":"msg!"`) {
|
||||
t.Errorf("jsonLog output = %q", j)
|
||||
}
|
||||
}
|
||||
|
||||
// Test concurrent access to the handler does not race or deadlock.
|
||||
func TestOutputHandlerConcurrency(t *testing.T) {
|
||||
h := NewOutputHandler(io.Discard, nil, logFormatDate|logFormatTime)
|
||||
ctx := context.Background()
|
||||
|
||||
const goroutines = 10
|
||||
const iterations = 500
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Goroutines calling Handle (text format)
|
||||
for i := 0; i < goroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < iterations; j++ {
|
||||
r := slog.NewRecord(t0, slog.LevelInfo, "concurrent text", 0)
|
||||
r.AddAttrs(slog.String("object", "obj"))
|
||||
_ = h.Handle(ctx, r)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Goroutines calling setFormat (switching between text and JSON)
|
||||
for i := 0; i < 2; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < iterations; j++ {
|
||||
if j%2 == 0 {
|
||||
h.setFormat(logFormatDate | logFormatTime)
|
||||
} else {
|
||||
h.setFormat(logFormatJSON)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Goroutines calling setFormatFlags / clearFormatFlags
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < iterations; j++ {
|
||||
h.setFormatFlags(logFormatPid | logFormatMicroseconds)
|
||||
h.clearFormatFlags(logFormatPid | logFormatMicroseconds)
|
||||
}
|
||||
}()
|
||||
|
||||
// Goroutines calling SetLevel
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < iterations; j++ {
|
||||
if j%2 == 0 {
|
||||
h.SetLevel(slog.LevelDebug)
|
||||
} else {
|
||||
h.SetLevel(slog.LevelInfo)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Goroutines calling SetOutput / ResetOutput
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
noop := func(_ slog.Level, _ string) {}
|
||||
for j := 0; j < iterations; j++ {
|
||||
h.SetOutput(noop)
|
||||
h.ResetOutput()
|
||||
}
|
||||
}()
|
||||
|
||||
// Goroutines calling WithAttrs / WithGroup (reads format)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < iterations; j++ {
|
||||
_ = h.WithAttrs(nil)
|
||||
_ = h.WithGroup("g")
|
||||
}
|
||||
}()
|
||||
|
||||
// Use a channel with a timeout to detect deadlocks
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// success
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Fatal("timed out waiting for concurrent goroutines — probable deadlock")
|
||||
}
|
||||
}
|
||||
|
||||
// Table-driven test for JSON vs text Handle behavior.
|
||||
func TestHandleFormatFlags(t *testing.T) {
|
||||
r := slog.NewRecord(t0, slog.LevelInfo, "hi", 0)
|
||||
|
||||
Reference in New Issue
Block a user