From ef016506657fcdf2882ca162cf78cc735dca892c Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 30 Dec 2024 15:06:11 -0800 Subject: [PATCH] feat(server): emit notifications as JSON to stderr when running under KopiaUI (#4322) * feat(server): emit notifications as JSON to stderr when running under KopiaUI * added tests --- cli/command_server_notifications_test.go | 41 ++++++++++--- cli/command_server_start.go | 16 ++++- cli/terminate_signal_test.go | 2 +- notification/notification_send.go | 7 +++ notification/sender/jsonsender/jsonsender.go | 58 +++++++++++++++++++ .../sender/jsonsender/jsonsender_test.go | 49 ++++++++++++++++ tests/testenv/cli_test_env.go | 20 ++++++- 7 files changed, 182 insertions(+), 11 deletions(-) create mode 100644 notification/sender/jsonsender/jsonsender.go create mode 100644 notification/sender/jsonsender/jsonsender_test.go diff --git a/cli/command_server_notifications_test.go b/cli/command_server_notifications_test.go index 8c8267d60..47e62d8f9 100644 --- a/cli/command_server_notifications_test.go +++ b/cli/command_server_notifications_test.go @@ -2,15 +2,19 @@ import ( "bytes" + "encoding/json" "io" "net/http" "net/http/httptest" + "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/notification/sender" "github.com/kopia/kopia/tests/testenv" ) @@ -48,21 +52,32 @@ func TestServerNotifications(t *testing.T) { var sp testutil.ServerParameters - env.SetLogOutput(true, "server") + jsonNotificationsReceived := make(chan string, 100) - wait, kill := env.RunAndProcessStderr(t, sp.ProcessOutput, - "server", "start", + wait, kill := env.RunAndProcessStderrAsync(t, sp.ProcessOutput, func(line string) { + const prefix = "NOTIFICATION: " + + if strings.HasPrefix(line, prefix) { + t.Logf("JSON notification received: %v", line) + + jsonNotificationsReceived <- line[len(prefix):] + } + }, "server", "start", "--address=localhost:0", "--insecure", "--random-server-control-password", + "--kopiaui-notifications", "--shutdown-grace-period", "100ms", ) + defer func() { + kill() + wait() + }() + // trigger server snapshot env.RunAndExpectSuccess(t, "server", "snapshot", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, dir1) - t.Logf("triggered") - select { case not := <-notificationsReceived: t.Logf("notification received: %v", not) @@ -71,6 +86,18 @@ func TestServerNotifications(t *testing.T) { t.Error("notification not received in time") } - kill() - wait() + select { + case not := <-jsonNotificationsReceived: + // make sure we received a valid sender.Message JSON + dec := json.NewDecoder(strings.NewReader(not)) + dec.DisallowUnknownFields() + + var msg sender.Message + + require.NoError(t, dec.Decode(&msg)) + require.Contains(t, msg.Subject, "Kopia success") + + case <-time.After(5 * time.Second): + t.Error("notification not received in time") + } } diff --git a/cli/command_server_start.go b/cli/command_server_start.go index cb5094905..dad834830 100644 --- a/cli/command_server_start.go +++ b/cli/command_server_start.go @@ -21,6 +21,8 @@ "github.com/kopia/kopia/internal/auth" "github.com/kopia/kopia/internal/server" + "github.com/kopia/kopia/notification" + "github.com/kopia/kopia/notification/sender/jsonsender" "github.com/kopia/kopia/repo" ) @@ -68,7 +70,8 @@ type commandServerStart struct { debugScheduler bool minMaintenanceInterval time.Duration - shutdownGracePeriod time.Duration + shutdownGracePeriod time.Duration + kopiauiNotifications bool logServerRequests bool @@ -123,6 +126,8 @@ func (c *commandServerStart) setup(svc advancedAppServices, parent commandParent cmd.Flag("shutdown-grace-period", "Grace period for shutting down the server").Default("5s").DurationVar(&c.shutdownGracePeriod) + cmd.Flag("kopiaui-notifications", "Enable notifications to be printed to stdout for KopiaUI").BoolVar(&c.kopiauiNotifications) + c.sf.setup(svc, cmd) c.co.setup(svc, cmd) c.svc = svc @@ -271,6 +276,15 @@ func (c *commandServerStart) run(ctx context.Context) (reterr error) { onExternalConfigReloadRequest(srv.Refresh) + // enable notification to be printed to stderr where KopiaUI will pick it up + if c.kopiauiNotifications { + notification.AdditionalSenders = append(notification.AdditionalSenders, + jsonsender.NewJSONSender( + "NOTIFICATION: ", + c.out.stderr(), + notification.SeverityVerbose)) + } + return c.startServerWithOptionalTLS(ctx, httpServer) } diff --git a/cli/terminate_signal_test.go b/cli/terminate_signal_test.go index 5669f508b..4bffaab37 100644 --- a/cli/terminate_signal_test.go +++ b/cli/terminate_signal_test.go @@ -17,7 +17,7 @@ func TestTerminate(t *testing.T) { var sp testutil.ServerParameters - wait, interrupt := env.RunAndProcessStderrInt(t, sp.ProcessOutput, "server", "start", + wait, interrupt := env.RunAndProcessStderrInt(t, sp.ProcessOutput, nil, "server", "start", "--address=localhost:0", "--insecure") diff --git a/notification/notification_send.go b/notification/notification_send.go index 200f34d19..a2954612b 100644 --- a/notification/notification_send.go +++ b/notification/notification_send.go @@ -19,6 +19,11 @@ "github.com/kopia/kopia/repo/logging" ) +// AdditionalSenders is a list of additional senders that will be used in addition to the senders configured in the repository. +// +//nolint:gochecknoglobals +var AdditionalSenders []sender.Sender + var log = logging.Module("notification") // TemplateArgs represents the arguments passed to the notification template when rendering. @@ -134,6 +139,8 @@ func SendInternal(ctx context.Context, rep repo.Repository, templateName string, return errors.Wrap(err, "unable to get notification senders") } + senders = append(senders, AdditionalSenders...) + var resultErr error for _, s := range senders { diff --git a/notification/sender/jsonsender/jsonsender.go b/notification/sender/jsonsender/jsonsender.go new file mode 100644 index 000000000..faff566af --- /dev/null +++ b/notification/sender/jsonsender/jsonsender.go @@ -0,0 +1,58 @@ +// Package jsonsender provides a notification sender that writes messages in JSON format to the provided writer. +package jsonsender + +import ( + "bytes" + "context" + "encoding/json" + "io" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/notification/sender" +) + +type jsonSender struct { + prefix string + out io.Writer + minSeverity sender.Severity +} + +func (p *jsonSender) Send(ctx context.Context, msg *sender.Message) error { + if msg.Severity < p.minSeverity { + return nil + } + + var buf bytes.Buffer + + buf.WriteString(p.prefix) + + if err := json.NewEncoder(&buf).Encode(msg); err != nil { + return errors.Wrap(err, "unable to encode JSON") + } + + _, err := p.out.Write(buf.Bytes()) + + return err //nolint:wrapcheck +} + +func (p *jsonSender) Summary() string { + return "JSON sender" +} + +func (p *jsonSender) Format() string { + return sender.FormatPlainText +} + +func (p *jsonSender) ProfileName() string { + return "jsonsender" +} + +// NewJSONSender creates a new JSON sender that writes messages to the provided writer. +func NewJSONSender(prefix string, out io.Writer, minSeverity sender.Severity) sender.Sender { + return &jsonSender{ + prefix: prefix, + out: out, + minSeverity: minSeverity, + } +} diff --git a/notification/sender/jsonsender/jsonsender_test.go b/notification/sender/jsonsender/jsonsender_test.go new file mode 100644 index 000000000..80ea05671 --- /dev/null +++ b/notification/sender/jsonsender/jsonsender_test.go @@ -0,0 +1,49 @@ +package jsonsender_test + +import ( + "bytes" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/notification" + "github.com/kopia/kopia/notification/sender" + "github.com/kopia/kopia/notification/sender/jsonsender" +) + +func TestJSONSender(t *testing.T) { + ctx := testlogging.Context(t) + + var buf bytes.Buffer + + p := jsonsender.NewJSONSender("NOTIFICATION:", &buf, notification.SeverityWarning) + + m1 := &sender.Message{ + Subject: "test subject 1", + Body: "test body 1", + Severity: notification.SeverityVerbose, + } + m2 := &sender.Message{ + Subject: "test subject 2", + Body: "test body 2", + Severity: notification.SeverityWarning, + } + m3 := &sender.Message{ + Subject: "test subject 3", + Body: "test body 3", + Severity: notification.SeverityError, + } + require.NoError(t, p.Send(ctx, m1)) // will be ignored + require.NoError(t, p.Send(ctx, m2)) + require.NoError(t, p.Send(ctx, m3)) + + lines := strings.Split(strings.TrimSpace(buf.String()), "\n") + + require.Equal(t, + []string{ + "NOTIFICATION:{\"subject\":\"test subject 2\",\"severity\":10,\"body\":\"test body 2\"}", + "NOTIFICATION:{\"subject\":\"test subject 3\",\"severity\":20,\"body\":\"test body 3\"}", + }, lines) +} diff --git a/tests/testenv/cli_test_env.go b/tests/testenv/cli_test_env.go index 9972de658..52904dea6 100644 --- a/tests/testenv/cli_test_env.go +++ b/tests/testenv/cli_test_env.go @@ -176,7 +176,19 @@ func (e *CLITest) getLogOutputPrefix() (string, bool) { func (e *CLITest) RunAndProcessStderr(t *testing.T, callback func(line string) bool, args ...string) (wait func() error, kill func()) { t.Helper() - wait, interrupt := e.RunAndProcessStderrInt(t, callback, args...) + wait, interrupt := e.RunAndProcessStderrInt(t, callback, nil, args...) + kill = func() { + interrupt(os.Kill) + } + + return wait, kill +} + +// RunAndProcessStderrAsync runs the given command, and streams its output line-by-line to a given function until it returns false. +func (e *CLITest) RunAndProcessStderrAsync(t *testing.T, callback func(line string) bool, asyncCallback func(line string), args ...string) (wait func() error, kill func()) { + t.Helper() + + wait, interrupt := e.RunAndProcessStderrInt(t, callback, asyncCallback, args...) kill = func() { interrupt(os.Kill) } @@ -186,7 +198,7 @@ func (e *CLITest) RunAndProcessStderr(t *testing.T, callback func(line string) b // RunAndProcessStderrInt runs the given command, and streams its output // line-by-line to outputCallback until it returns false. -func (e *CLITest) RunAndProcessStderrInt(t *testing.T, outputCallback func(line string) bool, args ...string) (wait func() error, interrupt func(os.Signal)) { +func (e *CLITest) RunAndProcessStderrInt(t *testing.T, outputCallback func(line string) bool, asyncCallback func(line string), args ...string) (wait func() error, interrupt func(os.Signal)) { t.Helper() stdout, stderr, wait, interrupt := e.Runner.Start(t, e.RunContext, e.cmdArgs(args), e.Environment) @@ -214,6 +226,10 @@ func (e *CLITest) RunAndProcessStderrInt(t *testing.T, outputCallback func(line // complete the scan in background without processing lines. go func() { for scanner.Scan() { + if asyncCallback != nil { + asyncCallback(scanner.Text()) + } + if prefix, ok := e.getLogOutputPrefix(); ok { t.Logf("[%vstderr] %v", prefix, scanner.Text()) }