feat(server): emit notifications as JSON to stderr when running under KopiaUI (#4322)

* feat(server): emit notifications as JSON to stderr when running under KopiaUI

* added tests
This commit is contained in:
Jarek Kowalski
2024-12-30 15:06:11 -08:00
committed by GitHub
parent d6b9254a4c
commit ef01650665
7 changed files with 182 additions and 11 deletions

View File

@@ -2,15 +2,19 @@
import (
"bytes"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/notification/sender"
"github.com/kopia/kopia/tests/testenv"
)
@@ -48,21 +52,32 @@ func TestServerNotifications(t *testing.T) {
var sp testutil.ServerParameters
env.SetLogOutput(true, "server")
jsonNotificationsReceived := make(chan string, 100)
wait, kill := env.RunAndProcessStderr(t, sp.ProcessOutput,
"server", "start",
wait, kill := env.RunAndProcessStderrAsync(t, sp.ProcessOutput, func(line string) {
const prefix = "NOTIFICATION: "
if strings.HasPrefix(line, prefix) {
t.Logf("JSON notification received: %v", line)
jsonNotificationsReceived <- line[len(prefix):]
}
}, "server", "start",
"--address=localhost:0",
"--insecure",
"--random-server-control-password",
"--kopiaui-notifications",
"--shutdown-grace-period", "100ms",
)
defer func() {
kill()
wait()
}()
// trigger server snapshot
env.RunAndExpectSuccess(t, "server", "snapshot", "--address", sp.BaseURL, "--server-control-password", sp.ServerControlPassword, dir1)
t.Logf("triggered")
select {
case not := <-notificationsReceived:
t.Logf("notification received: %v", not)
@@ -71,6 +86,18 @@ func TestServerNotifications(t *testing.T) {
t.Error("notification not received in time")
}
kill()
wait()
select {
case not := <-jsonNotificationsReceived:
// make sure we received a valid sender.Message JSON
dec := json.NewDecoder(strings.NewReader(not))
dec.DisallowUnknownFields()
var msg sender.Message
require.NoError(t, dec.Decode(&msg))
require.Contains(t, msg.Subject, "Kopia success")
case <-time.After(5 * time.Second):
t.Error("notification not received in time")
}
}

View File

@@ -21,6 +21,8 @@
"github.com/kopia/kopia/internal/auth"
"github.com/kopia/kopia/internal/server"
"github.com/kopia/kopia/notification"
"github.com/kopia/kopia/notification/sender/jsonsender"
"github.com/kopia/kopia/repo"
)
@@ -68,7 +70,8 @@ type commandServerStart struct {
debugScheduler bool
minMaintenanceInterval time.Duration
shutdownGracePeriod time.Duration
shutdownGracePeriod time.Duration
kopiauiNotifications bool
logServerRequests bool
@@ -123,6 +126,8 @@ func (c *commandServerStart) setup(svc advancedAppServices, parent commandParent
cmd.Flag("shutdown-grace-period", "Grace period for shutting down the server").Default("5s").DurationVar(&c.shutdownGracePeriod)
cmd.Flag("kopiaui-notifications", "Enable notifications to be printed to stdout for KopiaUI").BoolVar(&c.kopiauiNotifications)
c.sf.setup(svc, cmd)
c.co.setup(svc, cmd)
c.svc = svc
@@ -271,6 +276,15 @@ func (c *commandServerStart) run(ctx context.Context) (reterr error) {
onExternalConfigReloadRequest(srv.Refresh)
// enable notification to be printed to stderr where KopiaUI will pick it up
if c.kopiauiNotifications {
notification.AdditionalSenders = append(notification.AdditionalSenders,
jsonsender.NewJSONSender(
"NOTIFICATION: ",
c.out.stderr(),
notification.SeverityVerbose))
}
return c.startServerWithOptionalTLS(ctx, httpServer)
}

View File

@@ -17,7 +17,7 @@ func TestTerminate(t *testing.T) {
var sp testutil.ServerParameters
wait, interrupt := env.RunAndProcessStderrInt(t, sp.ProcessOutput, "server", "start",
wait, interrupt := env.RunAndProcessStderrInt(t, sp.ProcessOutput, nil, "server", "start",
"--address=localhost:0",
"--insecure")

View File

@@ -19,6 +19,11 @@
"github.com/kopia/kopia/repo/logging"
)
// AdditionalSenders is a list of additional senders that will be used in addition to the senders configured in the repository.
//
//nolint:gochecknoglobals
var AdditionalSenders []sender.Sender
var log = logging.Module("notification")
// TemplateArgs represents the arguments passed to the notification template when rendering.
@@ -134,6 +139,8 @@ func SendInternal(ctx context.Context, rep repo.Repository, templateName string,
return errors.Wrap(err, "unable to get notification senders")
}
senders = append(senders, AdditionalSenders...)
var resultErr error
for _, s := range senders {

View File

@@ -0,0 +1,58 @@
// Package jsonsender provides a notification sender that writes messages in JSON format to the provided writer.
package jsonsender
import (
"bytes"
"context"
"encoding/json"
"io"
"github.com/pkg/errors"
"github.com/kopia/kopia/notification/sender"
)
type jsonSender struct {
prefix string
out io.Writer
minSeverity sender.Severity
}
func (p *jsonSender) Send(ctx context.Context, msg *sender.Message) error {
if msg.Severity < p.minSeverity {
return nil
}
var buf bytes.Buffer
buf.WriteString(p.prefix)
if err := json.NewEncoder(&buf).Encode(msg); err != nil {
return errors.Wrap(err, "unable to encode JSON")
}
_, err := p.out.Write(buf.Bytes())
return err //nolint:wrapcheck
}
func (p *jsonSender) Summary() string {
return "JSON sender"
}
func (p *jsonSender) Format() string {
return sender.FormatPlainText
}
func (p *jsonSender) ProfileName() string {
return "jsonsender"
}
// NewJSONSender creates a new JSON sender that writes messages to the provided writer.
func NewJSONSender(prefix string, out io.Writer, minSeverity sender.Severity) sender.Sender {
return &jsonSender{
prefix: prefix,
out: out,
minSeverity: minSeverity,
}
}

View File

@@ -0,0 +1,49 @@
package jsonsender_test
import (
"bytes"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/notification"
"github.com/kopia/kopia/notification/sender"
"github.com/kopia/kopia/notification/sender/jsonsender"
)
func TestJSONSender(t *testing.T) {
ctx := testlogging.Context(t)
var buf bytes.Buffer
p := jsonsender.NewJSONSender("NOTIFICATION:", &buf, notification.SeverityWarning)
m1 := &sender.Message{
Subject: "test subject 1",
Body: "test body 1",
Severity: notification.SeverityVerbose,
}
m2 := &sender.Message{
Subject: "test subject 2",
Body: "test body 2",
Severity: notification.SeverityWarning,
}
m3 := &sender.Message{
Subject: "test subject 3",
Body: "test body 3",
Severity: notification.SeverityError,
}
require.NoError(t, p.Send(ctx, m1)) // will be ignored
require.NoError(t, p.Send(ctx, m2))
require.NoError(t, p.Send(ctx, m3))
lines := strings.Split(strings.TrimSpace(buf.String()), "\n")
require.Equal(t,
[]string{
"NOTIFICATION:{\"subject\":\"test subject 2\",\"severity\":10,\"body\":\"test body 2\"}",
"NOTIFICATION:{\"subject\":\"test subject 3\",\"severity\":20,\"body\":\"test body 3\"}",
}, lines)
}

View File

@@ -176,7 +176,19 @@ func (e *CLITest) getLogOutputPrefix() (string, bool) {
func (e *CLITest) RunAndProcessStderr(t *testing.T, callback func(line string) bool, args ...string) (wait func() error, kill func()) {
t.Helper()
wait, interrupt := e.RunAndProcessStderrInt(t, callback, args...)
wait, interrupt := e.RunAndProcessStderrInt(t, callback, nil, args...)
kill = func() {
interrupt(os.Kill)
}
return wait, kill
}
// RunAndProcessStderrAsync runs the given command, and streams its output line-by-line to a given function until it returns false.
func (e *CLITest) RunAndProcessStderrAsync(t *testing.T, callback func(line string) bool, asyncCallback func(line string), args ...string) (wait func() error, kill func()) {
t.Helper()
wait, interrupt := e.RunAndProcessStderrInt(t, callback, asyncCallback, args...)
kill = func() {
interrupt(os.Kill)
}
@@ -186,7 +198,7 @@ func (e *CLITest) RunAndProcessStderr(t *testing.T, callback func(line string) b
// RunAndProcessStderrInt runs the given command, and streams its output
// line-by-line to outputCallback until it returns false.
func (e *CLITest) RunAndProcessStderrInt(t *testing.T, outputCallback func(line string) bool, args ...string) (wait func() error, interrupt func(os.Signal)) {
func (e *CLITest) RunAndProcessStderrInt(t *testing.T, outputCallback func(line string) bool, asyncCallback func(line string), args ...string) (wait func() error, interrupt func(os.Signal)) {
t.Helper()
stdout, stderr, wait, interrupt := e.Runner.Start(t, e.RunContext, e.cmdArgs(args), e.Environment)
@@ -214,6 +226,10 @@ func (e *CLITest) RunAndProcessStderrInt(t *testing.T, outputCallback func(line
// complete the scan in background without processing lines.
go func() {
for scanner.Scan() {
if asyncCallback != nil {
asyncCallback(scanner.Text())
}
if prefix, ok := e.getLogOutputPrefix(); ok {
t.Logf("[%vstderr] %v", prefix, scanner.Text())
}