From 675bf4e033b4a79e1252efbcec3d08ae8d0369bb Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 15 Feb 2021 23:55:58 -0800 Subject: [PATCH] Removed manifest manager refresh + server improvements (#835) * manifest: removed explicit refresh Instead, content manager is exposing a revision counter that changes on each mutation or index change. Manifest manager will be invalidated whenever this is encountered. * server: refactored initialization API * server: added unit tests for repository server APIs (HTTP and REST) * server: ensure we don't upload contents that already exist This saves bandwidth, since the client can compute hash locally and ask the server whether the object exists before starting the upload. --- cli/command_server_start.go | 22 +- internal/apiclient/apiclient.go | 4 +- internal/repotesting/repotesting.go | 15 +- internal/server/grpc_session.go | 24 +- internal/server/server_test.go | 288 ++++++++++++++++++++ repo/api_server_repository.go | 15 +- repo/content/committed_content_index.go | 10 + repo/content/content_manager.go | 12 + repo/grpc_repository_client.go | 15 + repo/manifest/committed_manifest_manager.go | 21 +- repo/manifest/manifest_manager.go | 6 +- repo/manifest/manifest_manager_test.go | 4 - repo/repository.go | 14 +- 13 files changed, 383 insertions(+), 67 deletions(-) create mode 100644 internal/server/server_test.go diff --git a/cli/command_server_start.go b/cli/command_server_start.go index 49c4dfb85..3d816bed3 100644 --- a/cli/command_server_start.go +++ b/cli/command_server_start.go @@ -18,7 +18,6 @@ "github.com/pkg/errors" prom "github.com/prometheus/client_golang/prometheus" htpasswd "github.com/tg123/go-htpasswd" - "google.golang.org/grpc" "github.com/kopia/kopia/internal/auth" "github.com/kopia/kopia/internal/clock" @@ -110,26 +109,11 @@ func runServer(ctx context.Context, rep repo.Repository) error { var handler http.Handler = mux if *serverStartGRPC { - grpcServer := grpc.NewServer( - grpc.MaxSendMsgSize(repo.MaxGRPCMessageSize), - grpc.MaxRecvMsgSize(repo.MaxGRPCMessageSize), - ) - srv.RegisterGRPCHandlers(grpcServer) - - log(ctx).Debugf("starting GRPC/HTTP server...") - - httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { - grpcServer.ServeHTTP(w, r) - } else { - handler.ServeHTTP(w, r) - } - }) - } else { - log(ctx).Debugf("starting HTTP-only server...") - httpServer.Handler = handler + handler = srv.GRPCRouterHandler(handler) } + httpServer.Handler = handler + if *serverStartShutdownWhenStdinClosed { log(ctx).Infof("Server will close when stdin is closed...") diff --git a/internal/apiclient/apiclient.go b/internal/apiclient/apiclient.go index 99e098975..1a98908f6 100644 --- a/internal/apiclient/apiclient.go +++ b/internal/apiclient/apiclient.go @@ -44,8 +44,8 @@ func (c *KopiaAPIClient) Put(ctx context.Context, urlSuffix string, reqPayload, // Delete is a helper that performs HTTP DELETE on a URL with the specified body from reqPayload and decodes the response // onto respPayload which must be a pointer to byte slice or JSON-serializable structure. -func (c *KopiaAPIClient) Delete(ctx context.Context, urlSuffix string, reqPayload, respPayload interface{}) error { - return c.runRequest(ctx, http.MethodDelete, c.BaseURL+urlSuffix, nil, reqPayload, respPayload) +func (c *KopiaAPIClient) Delete(ctx context.Context, urlSuffix string, onNotFound error, reqPayload, respPayload interface{}) error { + return c.runRequest(ctx, http.MethodDelete, c.BaseURL+urlSuffix, onNotFound, reqPayload, respPayload) } func (c *KopiaAPIClient) runRequest(ctx context.Context, method, url string, notFoundError error, reqPayload, respPayload interface{}) error { diff --git a/internal/repotesting/repotesting.go b/internal/repotesting/repotesting.go index 2554f047b..48a8b8d63 100644 --- a/internal/repotesting/repotesting.go +++ b/internal/repotesting/repotesting.go @@ -76,13 +76,13 @@ func (e *Environment) Setup(t *testing.T, opts ...Options) *Environment { t.Fatalf("err: %v", err) } - if err = repo.Connect(ctx, e.configFile(), st, masterPassword, nil); err != nil { + if err = repo.Connect(ctx, e.ConfigFile(), st, masterPassword, nil); err != nil { t.Fatalf("can't connect: %v", err) } e.connected = true - rep, err := repo.Open(ctx, e.configFile(), masterPassword, openOpt) + rep, err := repo.Open(ctx, e.ConfigFile(), masterPassword, openOpt) if err != nil { t.Fatalf("can't open: %v", err) } @@ -106,7 +106,7 @@ func (e *Environment) Close(ctx context.Context, t *testing.T) { } if e.connected { - if err := repo.Disconnect(ctx, e.configFile()); err != nil { + if err := repo.Disconnect(ctx, e.ConfigFile()); err != nil { t.Errorf("error disconnecting: %v", err) } } @@ -117,7 +117,8 @@ func (e *Environment) Close(ctx context.Context, t *testing.T) { } } -func (e *Environment) configFile() string { +// ConfigFile returns the name of the config file. +func (e *Environment) ConfigFile() string { return filepath.Join(e.configDir, "kopia.config") } @@ -132,7 +133,7 @@ func (e *Environment) MustReopen(t *testing.T, openOpts ...func(*repo.Options)) t.Fatalf("close error: %v", err) } - rep, err := repo.Open(ctx, e.configFile(), masterPassword, repoOptions(openOpts)) + rep, err := repo.Open(ctx, e.ConfigFile(), masterPassword, repoOptions(openOpts)) if err != nil { t.Fatalf("err: %v", err) } @@ -149,7 +150,7 @@ func (e *Environment) MustOpenAnother(t *testing.T) repo.RepositoryWriter { ctx := testlogging.Context(t) - rep2, err := repo.Open(ctx, e.configFile(), masterPassword, &repo.Options{}) + rep2, err := repo.Open(ctx, e.ConfigFile(), masterPassword, &repo.Options{}) if err != nil { t.Fatalf("err: %v", err) } @@ -191,7 +192,7 @@ func (e *Environment) MustConnectOpenAnother(t *testing.T, openOpts ...func(*rep t.Fatal("can't connect:", err) } - rep, err := repo.Open(ctx, e.configFile(), masterPassword, repoOptions(openOpts)) + rep, err := repo.Open(ctx, e.ConfigFile(), masterPassword, repoOptions(openOpts)) if err != nil { t.Fatal("can't open:", err) } diff --git a/internal/server/grpc_session.go b/internal/server/grpc_session.go index 18c732d0a..acafe0468 100644 --- a/internal/server/grpc_session.go +++ b/internal/server/grpc_session.go @@ -3,6 +3,7 @@ import ( "context" "encoding/json" + "net/http" "runtime" "strings" "sync" @@ -51,10 +52,6 @@ func (s *Server) authenticateGRPCSession(ctx context.Context) (string, error) { return "", status.Errorf(codes.PermissionDenied, "metadata not found in context") } - if s.authenticator == nil { - return "", status.Errorf(codes.PermissionDenied, "no authenticator") - } - if u, h, p := md.Get("kopia-username"), md.Get("kopia-hostname"), md.Get("kopia-password"); len(u) == 1 && len(p) == 1 && len(h) == 1 { username := u[0] + "@" + h[0] password := p[0] @@ -448,3 +445,22 @@ func makeGRPCServerState(maxConcurrency int) grpcServerState { sem: semaphore.NewWeighted(int64(maxConcurrency)), } } + +// GRPCRouterHandler returns HTTP handler that supports GRPC services and +// routes non-GRPC calls to the provided handler. +func (s *Server) GRPCRouterHandler(handler http.Handler) http.Handler { + grpcServer := grpc.NewServer( + grpc.MaxSendMsgSize(repo.MaxGRPCMessageSize), + grpc.MaxRecvMsgSize(repo.MaxGRPCMessageSize), + ) + + s.RegisterGRPCHandlers(grpcServer) + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") { + grpcServer.ServeHTTP(w, r) + } else { + handler.ServeHTTP(w, r) + } + }) +} diff --git a/internal/server/server_test.go b/internal/server/server_test.go new file mode 100644 index 000000000..8b26c6f45 --- /dev/null +++ b/internal/server/server_test.go @@ -0,0 +1,288 @@ +package server_test + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "io/ioutil" + "net/http/httptest" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/kopia/kopia/internal/auth" + "github.com/kopia/kopia/internal/repotesting" + "github.com/kopia/kopia/internal/server" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/manifest" + "github.com/kopia/kopia/repo/object" + "github.com/kopia/kopia/snapshot" +) + +const ( + testUsername = "foo" + testHostname = "bar" + testPassword = "123" + testPathname = "/tmp/path" +) + +// nolint:thelper +func startServer(ctx context.Context, t *testing.T) *repo.APIServerInfo { + var env repotesting.Environment + + env.Setup(t) + t.Cleanup(func() { env.Close(ctx, t) }) + + s, err := server.New(ctx, server.Options{ + ConfigFile: env.ConfigFile(), + Authorizer: auth.LegacyAuthorizerForUser, + Authenticator: auth.AuthenticateSingleUser(testUsername+"@"+testHostname, testPassword), + RefreshInterval: 1 * time.Minute, + }) + + s.SetRepository(ctx, env.Repository) + + if err != nil { + t.Fatal(err) + } + + hs := httptest.NewUnstartedServer(s.GRPCRouterHandler(s.APIHandlers(true))) + hs.EnableHTTP2 = true + hs.StartTLS() + + t.Cleanup(hs.Close) + + serverHash := sha256.Sum256(hs.Certificate().Raw) + + return &repo.APIServerInfo{ + BaseURL: hs.URL, + TrustedServerCertificateFingerprint: hex.EncodeToString(serverHash[:]), + } +} + +func TestServer_REST(t *testing.T) { + testServer(t, true) +} + +func TestServer_GRPC(t *testing.T) { + testServer(t, false) +} + +// nolint:thelper +func testServer(t *testing.T, disableGRPC bool) { + ctx := testlogging.ContextWithLevel(t, testlogging.LevelDebug) + apiServerInfo := startServer(ctx, t) + + apiServerInfo.DisableGRPC = disableGRPC + + rep, err := repo.OpenAPIServer(ctx, apiServerInfo, repo.ClientOptions{ + Username: testUsername, + Hostname: testHostname, + }, testPassword) + if err != nil { + t.Fatal(err) + } + + defer rep.Close(ctx) + + remoteRepositoryTest(ctx, t, rep) +} + +func TestGPRServer_AuthenticationError(t *testing.T) { + ctx := testlogging.ContextWithLevel(t, testlogging.LevelDebug) + apiServerInfo := startServer(ctx, t) + + if _, err := repo.OpenGRPCAPIRepository(ctx, apiServerInfo, repo.ClientOptions{ + Username: "bad-username", + Hostname: "bad-hostname", + }, "bad-password"); err == nil { + t.Fatal("unexpected success when connecting with invalid username") + } +} + +// nolint:thelper +func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository) { + mustListSnapshotCount(ctx, t, rep, 0) + mustGetObjectNotFound(ctx, t, rep, "abcd") + mustGetManifestNotFound(ctx, t, rep, "mnosuchmanifest") + + var ( + result object.ID + manifestID, manifestID2 manifest.ID + written = []byte{1, 2, 3} + srcInfo = snapshot.SourceInfo{ + Host: testHostname, + UserName: testUsername, + Path: testPathname, + } + ) + + var uploaded int64 + + must(t, repo.WriteSession(ctx, rep, repo.WriteSessionOptions{ + Purpose: "write test", + OnUpload: func(i int64) { + uploaded += i + }, + }, func(w repo.RepositoryWriter) error { + mustGetObjectNotFound(ctx, t, w, "abcd") + mustGetManifestNotFound(ctx, t, w, "mnosuchmanifest") + mustManifestNotFound(t, w.DeleteManifest(ctx, manifestID2)) + mustListSnapshotCount(ctx, t, w, 0) + + result = mustWriteObject(ctx, t, w, written) + + if uploaded == 0 { + t.Fatalf("did not report uploaded bytes") + } + + uploaded = 0 + result2 := mustWriteObject(ctx, t, w, written) + if uploaded != 0 { + t.Fatalf("unexpected upload when writing duplicate object") + } + + if result != result2 { + t.Fatalf("two identical object with different IDs: %v vs %v", result, result2) + } + + // verify data is read back the same. + mustReadObject(ctx, t, w, result, written) + + ow := w.NewObjectWriter(ctx, object.WriterOptions{ + Prefix: content.ID(manifest.ContentPrefix), + }) + + _, err := ow.Write([]byte{2, 3, 4}) + must(t, err) + + _, err = ow.Result() + if err == nil { + t.Fatalf("unexpected success writing object with 'm' prefix") + } + + manifestID, err = snapshot.SaveSnapshot(ctx, w, &snapshot.Manifest{ + Source: srcInfo, + Description: "written", + }) + must(t, err) + mustListSnapshotCount(ctx, t, w, 1) + + manifestID2, err = snapshot.SaveSnapshot(ctx, w, &snapshot.Manifest{ + Source: srcInfo, + Description: "written2", + }) + must(t, err) + mustListSnapshotCount(ctx, t, w, 2) + + mustReadManifest(ctx, t, w, manifestID, "written") + mustReadManifest(ctx, t, w, manifestID2, "written2") + + must(t, w.DeleteManifest(ctx, manifestID2)) + mustListSnapshotCount(ctx, t, w, 1) + + mustGetManifestNotFound(ctx, t, w, manifestID2) + mustReadManifest(ctx, t, w, manifestID, "written") + + return nil + })) + + // data and manifest written in a session can be read outside of it + mustReadObject(ctx, t, rep, result, written) + mustReadManifest(ctx, t, rep, manifestID, "written") + mustGetManifestNotFound(ctx, t, rep, manifestID2) + mustListSnapshotCount(ctx, t, rep, 1) +} + +func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID { + t.Helper() + + ow := w.NewObjectWriter(ctx, object.WriterOptions{}) + + _, err := ow.Write(data) + must(t, err) + + result, err := ow.Result() + must(t, err) + + return result +} + +func mustReadObject(ctx context.Context, t *testing.T, r repo.Repository, oid object.ID, want []byte) { + t.Helper() + + or, err := r.OpenObject(ctx, oid) + must(t, err) + + data, err := ioutil.ReadAll(or) + must(t, err) + + // verify data is read back the same. + if diff := cmp.Diff(data, want); diff != "" { + t.Fatalf("invalid object data, diff: %v", diff) + } +} + +func mustReadManifest(ctx context.Context, t *testing.T, r repo.Repository, manID manifest.ID, want string) { + t.Helper() + + man, err := snapshot.LoadSnapshot(ctx, r, manID) + must(t, err) + + // verify data is read back the same. + if diff := cmp.Diff(man.Description, want); diff != "" { + t.Fatalf("invalid manifest data, diff: %v", diff) + } +} + +func mustGetObjectNotFound(ctx context.Context, t *testing.T, r repo.Repository, oid object.ID) { + t.Helper() + + if _, err := r.OpenObject(ctx, oid); !errors.Is(err, object.ErrObjectNotFound) { + t.Fatalf("unexpected non-existent object error: %v", err) + } +} + +func mustGetManifestNotFound(ctx context.Context, t *testing.T, r repo.Repository, manID manifest.ID) { + t.Helper() + + _, err := r.GetManifest(ctx, manID, nil) + mustManifestNotFound(t, err) +} + +func mustListSnapshotCount(ctx context.Context, t *testing.T, rep repo.Repository, wantCount int) { + t.Helper() + + snaps, err := snapshot.ListSnapshots(ctx, rep, snapshot.SourceInfo{ + UserName: testUsername, + Host: testHostname, + Path: testPathname, + }) + if err != nil { + t.Fatal(err) + } + + if got, want := len(snaps), wantCount; got != want { + t.Fatalf("unexpected number of snapshots: %v, want %v", got, want) + } +} + +func must(t *testing.T, err error) { + t.Helper() + + if err != nil { + t.Fatal(err) + } +} + +func mustManifestNotFound(t *testing.T, err error) { + t.Helper() + + if !errors.Is(err, manifest.ErrNotFound) { + t.Fatalf("invalid error %v, wanted manifest not found", err) + } +} diff --git a/repo/api_server_repository.go b/repo/api_server_repository.go index e8aedd4e8..c1205d91d 100644 --- a/repo/api_server_repository.go +++ b/repo/api_server_repository.go @@ -37,6 +37,7 @@ type apiServerRepository struct { objectFormat object.Format cliOpts ClientOptions omgr *object.Manager + wso WriteSessionOptions } func (r *apiServerRepository) APIServerURL() string { @@ -116,7 +117,7 @@ func (r *apiServerRepository) FindManifests(ctx context.Context, labels map[stri } func (r *apiServerRepository) DeleteManifest(ctx context.Context, id manifest.ID) error { - return errors.Wrap(r.cli.Delete(ctx, "manifests/"+string(id), nil, nil), "DeleteManifest") + return errors.Wrap(r.cli.Delete(ctx, "manifests/"+string(id), manifest.ErrNotFound, nil, nil), "DeleteManifest") } func (r *apiServerRepository) Time() time.Time { @@ -143,6 +144,7 @@ func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOpt } w.omgr = omgr + w.wso = opt return w, nil } @@ -176,6 +178,14 @@ func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, pre contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data))) + // avoid uploading the content body if it already exists. + if _, err := r.ContentInfo(ctx, contentID); err == nil { + // content already exists + return contentID, nil + } + + r.wso.OnUpload(int64(len(data))) + if err := r.cli.Put(ctx, "contents/"+string(contentID), data, nil); err != nil { return "", errors.Wrapf(err, "error writing content %v", contentID) } @@ -214,6 +224,9 @@ func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien rr := &apiServerRepository{ cli: cli, cliOpts: cliOpts, + wso: WriteSessionOptions{ + OnUpload: func(i int64) {}, + }, } var p remoterepoapi.Parameters diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index 32ef8f454..896da331a 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -5,6 +5,7 @@ "context" "path/filepath" "sync" + "sync/atomic" "github.com/pkg/errors" @@ -18,6 +19,7 @@ const smallIndexEntryCountThreshold = 100 type committedContentIndex struct { + rev int64 cache committedContentIndexCache mu sync.Mutex @@ -32,6 +34,10 @@ type committedContentIndexCache interface { expireUnused(ctx context.Context, used []blob.ID) error } +func (b *committedContentIndex) revision() int64 { + return atomic.LoadInt64(&b.rev) +} + func (b *committedContentIndex) getContent(contentID ID) (Info, error) { b.mu.Lock() defer b.mu.Unlock() @@ -49,6 +55,8 @@ func (b *committedContentIndex) getContent(contentID ID) (Info, error) { } func (b *committedContentIndex) addContent(ctx context.Context, indexBlobID blob.ID, data []byte, use bool) error { + atomic.AddInt64(&b.rev, 1) + if err := b.cache.addContentToCache(ctx, indexBlobID, data); err != nil { return errors.Wrap(err, "error adding content to cache") } @@ -108,6 +116,8 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b return false, nil } + atomic.AddInt64(&b.rev, 1) + var newMerged mergedIndex newInUse := map[blob.ID]packIndex{} diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index f463f8ea4..357ab1f02 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -9,6 +9,7 @@ "fmt" "math/rand" "sync" + "sync/atomic" "time" "github.com/pkg/errors" @@ -72,6 +73,8 @@ type IndexBlobInfo struct { // WriteManager builds content-addressable storage with encryption, deduplication and packaging on top of BLOB store. type WriteManager struct { + revision int64 // changes on each local write + mu *sync.RWMutex cond *sync.Cond flushing bool @@ -103,6 +106,11 @@ type pendingPackInfo struct { finalized bool // indicates whether currentPackData has local index appended to it } +// Revision returns data revision number that changes on each write or refresh. +func (bm *WriteManager) Revision() int64 { + return atomic.LoadInt64(&bm.revision) + bm.committedContents.revision() +} + // DeleteContent marks the given contentID as deleted. // // NOTE: To avoid race conditions only contents that cannot be possibly re-created @@ -112,6 +120,8 @@ func (bm *WriteManager) DeleteContent(ctx context.Context, contentID ID) error { bm.lock() defer bm.unlock() + atomic.AddInt64(&bm.revision, 1) + formatLog(ctx).Debugf("delete-content %v", contentID) // remove from all pending packs @@ -210,6 +220,8 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat bm.lock() + atomic.AddInt64(&bm.revision, 1) + // do not start new uploads while flushing for bm.flushing { formatLog(ctx).Debugf("wait-before-flush") diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index e51fa84e2..aa7e69607 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -2,6 +2,7 @@ import ( "context" + "encoding/hex" "encoding/json" "io" "net/url" @@ -525,6 +526,20 @@ func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID) } func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) { + if err := content.ValidatePrefix(prefix); err != nil { + return "", errors.Wrap(err, "invalid prefix") + } + + var hashOutput [128]byte + + contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data))) + + // avoid uploading the content body if it already exists. + if _, err := r.ContentInfo(ctx, contentID); err == nil { + // content already exists + return contentID, nil + } + r.opt.OnUpload(int64(len(data))) v, err := r.inSessionWithoutRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) { diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index b10d673d0..67e4186f7 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -17,8 +17,8 @@ type committedManifestManager struct { b contentManager cmmu sync.Mutex + lastRevision int64 locked bool - initialized bool committedEntries map[ID]*manifestEntry committedContentIDs map[content.ID]bool } @@ -91,17 +91,6 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri return map[content.ID]bool{contentID: true}, nil } -func (m *committedManifestManager) invalidate() error { - m.lock() - defer m.unlock() - - m.initialized = false - m.committedContentIDs = map[content.ID]bool{} - m.committedEntries = map[ID]*manifestEntry{} - - return nil -} - func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Context) error { m.verifyLocked() @@ -258,7 +247,8 @@ func (m *committedManifestManager) ensureInitialized(ctx context.Context) error m.lock() defer m.unlock() - if m.initialized { + rev := m.b.Revision() + if m.lastRevision == rev { return nil } @@ -266,7 +256,10 @@ func (m *committedManifestManager) ensureInitialized(ctx context.Context) error return err } - m.initialized = true + m.lastRevision = rev + + // it is possible that the content manager revision has changed while we were reading it, + // that's ok - we read __some__ consistent set of data and next time we will invalidate again. return nil } diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index 06d1f43b8..4a849d4e5 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -34,6 +34,7 @@ const TypeLabelKey = "type" type contentManager interface { + Revision() int64 GetContent(ctx context.Context, contentID content.ID) ([]byte, error) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) DeleteContent(ctx context.Context, contentID content.ID) error @@ -239,11 +240,6 @@ func (m *Manager) Delete(ctx context.Context, id ID) error { return nil } -// Refresh updates the committed contents from the underlying storage. -func (m *Manager) Refresh(ctx context.Context) error { - return m.committed.invalidate() -} - // Compact performs compaction of manifest contents. func (m *Manager) Compact(ctx context.Context) error { return m.committed.compact(ctx) diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 359d8bd2e..ede31b348 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -103,10 +103,6 @@ func TestManifest(t *testing.T) { // still found in another verifyItem(ctx, t, mgr2, id3, labels3, item3) - if err := mgr2.Refresh(ctx); err != nil { - t.Errorf("unable to load: %v", err) - } - if err := mgr.Compact(ctx); err != nil { t.Errorf("can't compact: %v", err) } diff --git a/repo/repository.go b/repo/repository.go index cb76bef2a..b1e379506 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -275,10 +275,6 @@ func (r *directRepository) Refresh(ctx context.Context) error { return errors.Wrap(err, "error refreshing content index") } - if err := r.mmgr.Refresh(ctx); err != nil { - return errors.Wrap(err, "error reloading manifests") - } - return nil } @@ -320,7 +316,7 @@ func WriteSession(ctx context.Context, r Repository, opt WriteSessionOptions, cb return errors.Wrap(err, "unable to create writer") } - return handleWriteSessionResult(ctx, r, w, opt, cb(w)) + return handleWriteSessionResult(ctx, w, opt, cb(w)) } // DirectWriteSession executes the provided callback in a DirectRepositoryWriter created for the purpose and flushes writes. @@ -330,10 +326,10 @@ func DirectWriteSession(ctx context.Context, r DirectRepository, opt WriteSessio return errors.Wrap(err, "unable to create direct writer") } - return handleWriteSessionResult(ctx, r, w, opt, cb(w)) + return handleWriteSessionResult(ctx, w, opt, cb(w)) } -func handleWriteSessionResult(ctx context.Context, r Repository, w RepositoryWriter, opt WriteSessionOptions, resultErr error) error { +func handleWriteSessionResult(ctx context.Context, w RepositoryWriter, opt WriteSessionOptions, resultErr error) error { defer func() { if err := w.Close(ctx); err != nil { log(ctx).Warningf("error closing writer: %v", err) @@ -346,10 +342,6 @@ func handleWriteSessionResult(ctx context.Context, r Repository, w RepositoryWri } } - if err := r.Refresh(ctx); err != nil { - return errors.Wrap(err, "error refreshing repository") - } - return resultErr }