mirror of
https://github.com/kopia/kopia.git
synced 2026-04-19 21:47:56 -04:00
Removed manifest manager refresh + server improvements (#835)
* manifest: removed explicit refresh Instead, content manager is exposing a revision counter that changes on each mutation or index change. Manifest manager will be invalidated whenever this is encountered. * server: refactored initialization API * server: added unit tests for repository server APIs (HTTP and REST) * server: ensure we don't upload contents that already exist This saves bandwidth, since the client can compute hash locally and ask the server whether the object exists before starting the upload.
This commit is contained in:
@@ -18,7 +18,6 @@
|
||||
"github.com/pkg/errors"
|
||||
prom "github.com/prometheus/client_golang/prometheus"
|
||||
htpasswd "github.com/tg123/go-htpasswd"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/kopia/kopia/internal/auth"
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
@@ -110,26 +109,11 @@ func runServer(ctx context.Context, rep repo.Repository) error {
|
||||
var handler http.Handler = mux
|
||||
|
||||
if *serverStartGRPC {
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.MaxSendMsgSize(repo.MaxGRPCMessageSize),
|
||||
grpc.MaxRecvMsgSize(repo.MaxGRPCMessageSize),
|
||||
)
|
||||
srv.RegisterGRPCHandlers(grpcServer)
|
||||
|
||||
log(ctx).Debugf("starting GRPC/HTTP server...")
|
||||
|
||||
httpServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
|
||||
grpcServer.ServeHTTP(w, r)
|
||||
} else {
|
||||
handler.ServeHTTP(w, r)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
log(ctx).Debugf("starting HTTP-only server...")
|
||||
httpServer.Handler = handler
|
||||
handler = srv.GRPCRouterHandler(handler)
|
||||
}
|
||||
|
||||
httpServer.Handler = handler
|
||||
|
||||
if *serverStartShutdownWhenStdinClosed {
|
||||
log(ctx).Infof("Server will close when stdin is closed...")
|
||||
|
||||
|
||||
@@ -44,8 +44,8 @@ func (c *KopiaAPIClient) Put(ctx context.Context, urlSuffix string, reqPayload,
|
||||
|
||||
// Delete is a helper that performs HTTP DELETE on a URL with the specified body from reqPayload and decodes the response
|
||||
// onto respPayload which must be a pointer to byte slice or JSON-serializable structure.
|
||||
func (c *KopiaAPIClient) Delete(ctx context.Context, urlSuffix string, reqPayload, respPayload interface{}) error {
|
||||
return c.runRequest(ctx, http.MethodDelete, c.BaseURL+urlSuffix, nil, reqPayload, respPayload)
|
||||
func (c *KopiaAPIClient) Delete(ctx context.Context, urlSuffix string, onNotFound error, reqPayload, respPayload interface{}) error {
|
||||
return c.runRequest(ctx, http.MethodDelete, c.BaseURL+urlSuffix, onNotFound, reqPayload, respPayload)
|
||||
}
|
||||
|
||||
func (c *KopiaAPIClient) runRequest(ctx context.Context, method, url string, notFoundError error, reqPayload, respPayload interface{}) error {
|
||||
|
||||
@@ -76,13 +76,13 @@ func (e *Environment) Setup(t *testing.T, opts ...Options) *Environment {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if err = repo.Connect(ctx, e.configFile(), st, masterPassword, nil); err != nil {
|
||||
if err = repo.Connect(ctx, e.ConfigFile(), st, masterPassword, nil); err != nil {
|
||||
t.Fatalf("can't connect: %v", err)
|
||||
}
|
||||
|
||||
e.connected = true
|
||||
|
||||
rep, err := repo.Open(ctx, e.configFile(), masterPassword, openOpt)
|
||||
rep, err := repo.Open(ctx, e.ConfigFile(), masterPassword, openOpt)
|
||||
if err != nil {
|
||||
t.Fatalf("can't open: %v", err)
|
||||
}
|
||||
@@ -106,7 +106,7 @@ func (e *Environment) Close(ctx context.Context, t *testing.T) {
|
||||
}
|
||||
|
||||
if e.connected {
|
||||
if err := repo.Disconnect(ctx, e.configFile()); err != nil {
|
||||
if err := repo.Disconnect(ctx, e.ConfigFile()); err != nil {
|
||||
t.Errorf("error disconnecting: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -117,7 +117,8 @@ func (e *Environment) Close(ctx context.Context, t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Environment) configFile() string {
|
||||
// ConfigFile returns the name of the config file.
|
||||
func (e *Environment) ConfigFile() string {
|
||||
return filepath.Join(e.configDir, "kopia.config")
|
||||
}
|
||||
|
||||
@@ -132,7 +133,7 @@ func (e *Environment) MustReopen(t *testing.T, openOpts ...func(*repo.Options))
|
||||
t.Fatalf("close error: %v", err)
|
||||
}
|
||||
|
||||
rep, err := repo.Open(ctx, e.configFile(), masterPassword, repoOptions(openOpts))
|
||||
rep, err := repo.Open(ctx, e.ConfigFile(), masterPassword, repoOptions(openOpts))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -149,7 +150,7 @@ func (e *Environment) MustOpenAnother(t *testing.T) repo.RepositoryWriter {
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
rep2, err := repo.Open(ctx, e.configFile(), masterPassword, &repo.Options{})
|
||||
rep2, err := repo.Open(ctx, e.ConfigFile(), masterPassword, &repo.Options{})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
@@ -191,7 +192,7 @@ func (e *Environment) MustConnectOpenAnother(t *testing.T, openOpts ...func(*rep
|
||||
t.Fatal("can't connect:", err)
|
||||
}
|
||||
|
||||
rep, err := repo.Open(ctx, e.configFile(), masterPassword, repoOptions(openOpts))
|
||||
rep, err := repo.Open(ctx, e.ConfigFile(), masterPassword, repoOptions(openOpts))
|
||||
if err != nil {
|
||||
t.Fatal("can't open:", err)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -51,10 +52,6 @@ func (s *Server) authenticateGRPCSession(ctx context.Context) (string, error) {
|
||||
return "", status.Errorf(codes.PermissionDenied, "metadata not found in context")
|
||||
}
|
||||
|
||||
if s.authenticator == nil {
|
||||
return "", status.Errorf(codes.PermissionDenied, "no authenticator")
|
||||
}
|
||||
|
||||
if u, h, p := md.Get("kopia-username"), md.Get("kopia-hostname"), md.Get("kopia-password"); len(u) == 1 && len(p) == 1 && len(h) == 1 {
|
||||
username := u[0] + "@" + h[0]
|
||||
password := p[0]
|
||||
@@ -448,3 +445,22 @@ func makeGRPCServerState(maxConcurrency int) grpcServerState {
|
||||
sem: semaphore.NewWeighted(int64(maxConcurrency)),
|
||||
}
|
||||
}
|
||||
|
||||
// GRPCRouterHandler returns HTTP handler that supports GRPC services and
|
||||
// routes non-GRPC calls to the provided handler.
|
||||
func (s *Server) GRPCRouterHandler(handler http.Handler) http.Handler {
|
||||
grpcServer := grpc.NewServer(
|
||||
grpc.MaxSendMsgSize(repo.MaxGRPCMessageSize),
|
||||
grpc.MaxRecvMsgSize(repo.MaxGRPCMessageSize),
|
||||
)
|
||||
|
||||
s.RegisterGRPCHandlers(grpcServer)
|
||||
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
|
||||
grpcServer.ServeHTTP(w, r)
|
||||
} else {
|
||||
handler.ServeHTTP(w, r)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
288
internal/server/server_test.go
Normal file
288
internal/server/server_test.go
Normal file
@@ -0,0 +1,288 @@
|
||||
package server_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/kopia/kopia/internal/auth"
|
||||
"github.com/kopia/kopia/internal/repotesting"
|
||||
"github.com/kopia/kopia/internal/server"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/content"
|
||||
"github.com/kopia/kopia/repo/manifest"
|
||||
"github.com/kopia/kopia/repo/object"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
)
|
||||
|
||||
const (
|
||||
testUsername = "foo"
|
||||
testHostname = "bar"
|
||||
testPassword = "123"
|
||||
testPathname = "/tmp/path"
|
||||
)
|
||||
|
||||
// nolint:thelper
|
||||
func startServer(ctx context.Context, t *testing.T) *repo.APIServerInfo {
|
||||
var env repotesting.Environment
|
||||
|
||||
env.Setup(t)
|
||||
t.Cleanup(func() { env.Close(ctx, t) })
|
||||
|
||||
s, err := server.New(ctx, server.Options{
|
||||
ConfigFile: env.ConfigFile(),
|
||||
Authorizer: auth.LegacyAuthorizerForUser,
|
||||
Authenticator: auth.AuthenticateSingleUser(testUsername+"@"+testHostname, testPassword),
|
||||
RefreshInterval: 1 * time.Minute,
|
||||
})
|
||||
|
||||
s.SetRepository(ctx, env.Repository)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hs := httptest.NewUnstartedServer(s.GRPCRouterHandler(s.APIHandlers(true)))
|
||||
hs.EnableHTTP2 = true
|
||||
hs.StartTLS()
|
||||
|
||||
t.Cleanup(hs.Close)
|
||||
|
||||
serverHash := sha256.Sum256(hs.Certificate().Raw)
|
||||
|
||||
return &repo.APIServerInfo{
|
||||
BaseURL: hs.URL,
|
||||
TrustedServerCertificateFingerprint: hex.EncodeToString(serverHash[:]),
|
||||
}
|
||||
}
|
||||
|
||||
func TestServer_REST(t *testing.T) {
|
||||
testServer(t, true)
|
||||
}
|
||||
|
||||
func TestServer_GRPC(t *testing.T) {
|
||||
testServer(t, false)
|
||||
}
|
||||
|
||||
// nolint:thelper
|
||||
func testServer(t *testing.T, disableGRPC bool) {
|
||||
ctx := testlogging.ContextWithLevel(t, testlogging.LevelDebug)
|
||||
apiServerInfo := startServer(ctx, t)
|
||||
|
||||
apiServerInfo.DisableGRPC = disableGRPC
|
||||
|
||||
rep, err := repo.OpenAPIServer(ctx, apiServerInfo, repo.ClientOptions{
|
||||
Username: testUsername,
|
||||
Hostname: testHostname,
|
||||
}, testPassword)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer rep.Close(ctx)
|
||||
|
||||
remoteRepositoryTest(ctx, t, rep)
|
||||
}
|
||||
|
||||
func TestGPRServer_AuthenticationError(t *testing.T) {
|
||||
ctx := testlogging.ContextWithLevel(t, testlogging.LevelDebug)
|
||||
apiServerInfo := startServer(ctx, t)
|
||||
|
||||
if _, err := repo.OpenGRPCAPIRepository(ctx, apiServerInfo, repo.ClientOptions{
|
||||
Username: "bad-username",
|
||||
Hostname: "bad-hostname",
|
||||
}, "bad-password"); err == nil {
|
||||
t.Fatal("unexpected success when connecting with invalid username")
|
||||
}
|
||||
}
|
||||
|
||||
// nolint:thelper
|
||||
func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository) {
|
||||
mustListSnapshotCount(ctx, t, rep, 0)
|
||||
mustGetObjectNotFound(ctx, t, rep, "abcd")
|
||||
mustGetManifestNotFound(ctx, t, rep, "mnosuchmanifest")
|
||||
|
||||
var (
|
||||
result object.ID
|
||||
manifestID, manifestID2 manifest.ID
|
||||
written = []byte{1, 2, 3}
|
||||
srcInfo = snapshot.SourceInfo{
|
||||
Host: testHostname,
|
||||
UserName: testUsername,
|
||||
Path: testPathname,
|
||||
}
|
||||
)
|
||||
|
||||
var uploaded int64
|
||||
|
||||
must(t, repo.WriteSession(ctx, rep, repo.WriteSessionOptions{
|
||||
Purpose: "write test",
|
||||
OnUpload: func(i int64) {
|
||||
uploaded += i
|
||||
},
|
||||
}, func(w repo.RepositoryWriter) error {
|
||||
mustGetObjectNotFound(ctx, t, w, "abcd")
|
||||
mustGetManifestNotFound(ctx, t, w, "mnosuchmanifest")
|
||||
mustManifestNotFound(t, w.DeleteManifest(ctx, manifestID2))
|
||||
mustListSnapshotCount(ctx, t, w, 0)
|
||||
|
||||
result = mustWriteObject(ctx, t, w, written)
|
||||
|
||||
if uploaded == 0 {
|
||||
t.Fatalf("did not report uploaded bytes")
|
||||
}
|
||||
|
||||
uploaded = 0
|
||||
result2 := mustWriteObject(ctx, t, w, written)
|
||||
if uploaded != 0 {
|
||||
t.Fatalf("unexpected upload when writing duplicate object")
|
||||
}
|
||||
|
||||
if result != result2 {
|
||||
t.Fatalf("two identical object with different IDs: %v vs %v", result, result2)
|
||||
}
|
||||
|
||||
// verify data is read back the same.
|
||||
mustReadObject(ctx, t, w, result, written)
|
||||
|
||||
ow := w.NewObjectWriter(ctx, object.WriterOptions{
|
||||
Prefix: content.ID(manifest.ContentPrefix),
|
||||
})
|
||||
|
||||
_, err := ow.Write([]byte{2, 3, 4})
|
||||
must(t, err)
|
||||
|
||||
_, err = ow.Result()
|
||||
if err == nil {
|
||||
t.Fatalf("unexpected success writing object with 'm' prefix")
|
||||
}
|
||||
|
||||
manifestID, err = snapshot.SaveSnapshot(ctx, w, &snapshot.Manifest{
|
||||
Source: srcInfo,
|
||||
Description: "written",
|
||||
})
|
||||
must(t, err)
|
||||
mustListSnapshotCount(ctx, t, w, 1)
|
||||
|
||||
manifestID2, err = snapshot.SaveSnapshot(ctx, w, &snapshot.Manifest{
|
||||
Source: srcInfo,
|
||||
Description: "written2",
|
||||
})
|
||||
must(t, err)
|
||||
mustListSnapshotCount(ctx, t, w, 2)
|
||||
|
||||
mustReadManifest(ctx, t, w, manifestID, "written")
|
||||
mustReadManifest(ctx, t, w, manifestID2, "written2")
|
||||
|
||||
must(t, w.DeleteManifest(ctx, manifestID2))
|
||||
mustListSnapshotCount(ctx, t, w, 1)
|
||||
|
||||
mustGetManifestNotFound(ctx, t, w, manifestID2)
|
||||
mustReadManifest(ctx, t, w, manifestID, "written")
|
||||
|
||||
return nil
|
||||
}))
|
||||
|
||||
// data and manifest written in a session can be read outside of it
|
||||
mustReadObject(ctx, t, rep, result, written)
|
||||
mustReadManifest(ctx, t, rep, manifestID, "written")
|
||||
mustGetManifestNotFound(ctx, t, rep, manifestID2)
|
||||
mustListSnapshotCount(ctx, t, rep, 1)
|
||||
}
|
||||
|
||||
func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID {
|
||||
t.Helper()
|
||||
|
||||
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
|
||||
|
||||
_, err := ow.Write(data)
|
||||
must(t, err)
|
||||
|
||||
result, err := ow.Result()
|
||||
must(t, err)
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func mustReadObject(ctx context.Context, t *testing.T, r repo.Repository, oid object.ID, want []byte) {
|
||||
t.Helper()
|
||||
|
||||
or, err := r.OpenObject(ctx, oid)
|
||||
must(t, err)
|
||||
|
||||
data, err := ioutil.ReadAll(or)
|
||||
must(t, err)
|
||||
|
||||
// verify data is read back the same.
|
||||
if diff := cmp.Diff(data, want); diff != "" {
|
||||
t.Fatalf("invalid object data, diff: %v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func mustReadManifest(ctx context.Context, t *testing.T, r repo.Repository, manID manifest.ID, want string) {
|
||||
t.Helper()
|
||||
|
||||
man, err := snapshot.LoadSnapshot(ctx, r, manID)
|
||||
must(t, err)
|
||||
|
||||
// verify data is read back the same.
|
||||
if diff := cmp.Diff(man.Description, want); diff != "" {
|
||||
t.Fatalf("invalid manifest data, diff: %v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func mustGetObjectNotFound(ctx context.Context, t *testing.T, r repo.Repository, oid object.ID) {
|
||||
t.Helper()
|
||||
|
||||
if _, err := r.OpenObject(ctx, oid); !errors.Is(err, object.ErrObjectNotFound) {
|
||||
t.Fatalf("unexpected non-existent object error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func mustGetManifestNotFound(ctx context.Context, t *testing.T, r repo.Repository, manID manifest.ID) {
|
||||
t.Helper()
|
||||
|
||||
_, err := r.GetManifest(ctx, manID, nil)
|
||||
mustManifestNotFound(t, err)
|
||||
}
|
||||
|
||||
func mustListSnapshotCount(ctx context.Context, t *testing.T, rep repo.Repository, wantCount int) {
|
||||
t.Helper()
|
||||
|
||||
snaps, err := snapshot.ListSnapshots(ctx, rep, snapshot.SourceInfo{
|
||||
UserName: testUsername,
|
||||
Host: testHostname,
|
||||
Path: testPathname,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if got, want := len(snaps), wantCount; got != want {
|
||||
t.Fatalf("unexpected number of snapshots: %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func must(t *testing.T, err error) {
|
||||
t.Helper()
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func mustManifestNotFound(t *testing.T, err error) {
|
||||
t.Helper()
|
||||
|
||||
if !errors.Is(err, manifest.ErrNotFound) {
|
||||
t.Fatalf("invalid error %v, wanted manifest not found", err)
|
||||
}
|
||||
}
|
||||
@@ -37,6 +37,7 @@ type apiServerRepository struct {
|
||||
objectFormat object.Format
|
||||
cliOpts ClientOptions
|
||||
omgr *object.Manager
|
||||
wso WriteSessionOptions
|
||||
}
|
||||
|
||||
func (r *apiServerRepository) APIServerURL() string {
|
||||
@@ -116,7 +117,7 @@ func (r *apiServerRepository) FindManifests(ctx context.Context, labels map[stri
|
||||
}
|
||||
|
||||
func (r *apiServerRepository) DeleteManifest(ctx context.Context, id manifest.ID) error {
|
||||
return errors.Wrap(r.cli.Delete(ctx, "manifests/"+string(id), nil, nil), "DeleteManifest")
|
||||
return errors.Wrap(r.cli.Delete(ctx, "manifests/"+string(id), manifest.ErrNotFound, nil, nil), "DeleteManifest")
|
||||
}
|
||||
|
||||
func (r *apiServerRepository) Time() time.Time {
|
||||
@@ -143,6 +144,7 @@ func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOpt
|
||||
}
|
||||
|
||||
w.omgr = omgr
|
||||
w.wso = opt
|
||||
|
||||
return w, nil
|
||||
}
|
||||
@@ -176,6 +178,14 @@ func (r *apiServerRepository) WriteContent(ctx context.Context, data []byte, pre
|
||||
|
||||
contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data)))
|
||||
|
||||
// avoid uploading the content body if it already exists.
|
||||
if _, err := r.ContentInfo(ctx, contentID); err == nil {
|
||||
// content already exists
|
||||
return contentID, nil
|
||||
}
|
||||
|
||||
r.wso.OnUpload(int64(len(data)))
|
||||
|
||||
if err := r.cli.Put(ctx, "contents/"+string(contentID), data, nil); err != nil {
|
||||
return "", errors.Wrapf(err, "error writing content %v", contentID)
|
||||
}
|
||||
@@ -214,6 +224,9 @@ func openRestAPIRepository(ctx context.Context, si *APIServerInfo, cliOpts Clien
|
||||
rr := &apiServerRepository{
|
||||
cli: cli,
|
||||
cliOpts: cliOpts,
|
||||
wso: WriteSessionOptions{
|
||||
OnUpload: func(i int64) {},
|
||||
},
|
||||
}
|
||||
|
||||
var p remoterepoapi.Parameters
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
"context"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -18,6 +19,7 @@
|
||||
const smallIndexEntryCountThreshold = 100
|
||||
|
||||
type committedContentIndex struct {
|
||||
rev int64
|
||||
cache committedContentIndexCache
|
||||
|
||||
mu sync.Mutex
|
||||
@@ -32,6 +34,10 @@ type committedContentIndexCache interface {
|
||||
expireUnused(ctx context.Context, used []blob.ID) error
|
||||
}
|
||||
|
||||
func (b *committedContentIndex) revision() int64 {
|
||||
return atomic.LoadInt64(&b.rev)
|
||||
}
|
||||
|
||||
func (b *committedContentIndex) getContent(contentID ID) (Info, error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
@@ -49,6 +55,8 @@ func (b *committedContentIndex) getContent(contentID ID) (Info, error) {
|
||||
}
|
||||
|
||||
func (b *committedContentIndex) addContent(ctx context.Context, indexBlobID blob.ID, data []byte, use bool) error {
|
||||
atomic.AddInt64(&b.rev, 1)
|
||||
|
||||
if err := b.cache.addContentToCache(ctx, indexBlobID, data); err != nil {
|
||||
return errors.Wrap(err, "error adding content to cache")
|
||||
}
|
||||
@@ -108,6 +116,8 @@ func (b *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b
|
||||
return false, nil
|
||||
}
|
||||
|
||||
atomic.AddInt64(&b.rev, 1)
|
||||
|
||||
var newMerged mergedIndex
|
||||
|
||||
newInUse := map[blob.ID]packIndex{}
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -72,6 +73,8 @@ type IndexBlobInfo struct {
|
||||
|
||||
// WriteManager builds content-addressable storage with encryption, deduplication and packaging on top of BLOB store.
|
||||
type WriteManager struct {
|
||||
revision int64 // changes on each local write
|
||||
|
||||
mu *sync.RWMutex
|
||||
cond *sync.Cond
|
||||
flushing bool
|
||||
@@ -103,6 +106,11 @@ type pendingPackInfo struct {
|
||||
finalized bool // indicates whether currentPackData has local index appended to it
|
||||
}
|
||||
|
||||
// Revision returns data revision number that changes on each write or refresh.
|
||||
func (bm *WriteManager) Revision() int64 {
|
||||
return atomic.LoadInt64(&bm.revision) + bm.committedContents.revision()
|
||||
}
|
||||
|
||||
// DeleteContent marks the given contentID as deleted.
|
||||
//
|
||||
// NOTE: To avoid race conditions only contents that cannot be possibly re-created
|
||||
@@ -112,6 +120,8 @@ func (bm *WriteManager) DeleteContent(ctx context.Context, contentID ID) error {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
|
||||
atomic.AddInt64(&bm.revision, 1)
|
||||
|
||||
formatLog(ctx).Debugf("delete-content %v", contentID)
|
||||
|
||||
// remove from all pending packs
|
||||
@@ -210,6 +220,8 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
|
||||
|
||||
bm.lock()
|
||||
|
||||
atomic.AddInt64(&bm.revision, 1)
|
||||
|
||||
// do not start new uploads while flushing
|
||||
for bm.flushing {
|
||||
formatLog(ctx).Debugf("wait-before-flush")
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/url"
|
||||
@@ -525,6 +526,20 @@ func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID)
|
||||
}
|
||||
|
||||
func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error) {
|
||||
if err := content.ValidatePrefix(prefix); err != nil {
|
||||
return "", errors.Wrap(err, "invalid prefix")
|
||||
}
|
||||
|
||||
var hashOutput [128]byte
|
||||
|
||||
contentID := prefix + content.ID(hex.EncodeToString(r.h(hashOutput[:0], data)))
|
||||
|
||||
// avoid uploading the content body if it already exists.
|
||||
if _, err := r.ContentInfo(ctx, contentID); err == nil {
|
||||
// content already exists
|
||||
return contentID, nil
|
||||
}
|
||||
|
||||
r.opt.OnUpload(int64(len(data)))
|
||||
|
||||
v, err := r.inSessionWithoutRetry(ctx, func(ctx context.Context, sess *grpcInnerSession) (interface{}, error) {
|
||||
|
||||
@@ -17,8 +17,8 @@ type committedManifestManager struct {
|
||||
b contentManager
|
||||
|
||||
cmmu sync.Mutex
|
||||
lastRevision int64
|
||||
locked bool
|
||||
initialized bool
|
||||
committedEntries map[ID]*manifestEntry
|
||||
committedContentIDs map[content.ID]bool
|
||||
}
|
||||
@@ -91,17 +91,6 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri
|
||||
return map[content.ID]bool{contentID: true}, nil
|
||||
}
|
||||
|
||||
func (m *committedManifestManager) invalidate() error {
|
||||
m.lock()
|
||||
defer m.unlock()
|
||||
|
||||
m.initialized = false
|
||||
m.committedContentIDs = map[content.ID]bool{}
|
||||
m.committedEntries = map[ID]*manifestEntry{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Context) error {
|
||||
m.verifyLocked()
|
||||
|
||||
@@ -258,7 +247,8 @@ func (m *committedManifestManager) ensureInitialized(ctx context.Context) error
|
||||
m.lock()
|
||||
defer m.unlock()
|
||||
|
||||
if m.initialized {
|
||||
rev := m.b.Revision()
|
||||
if m.lastRevision == rev {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -266,7 +256,10 @@ func (m *committedManifestManager) ensureInitialized(ctx context.Context) error
|
||||
return err
|
||||
}
|
||||
|
||||
m.initialized = true
|
||||
m.lastRevision = rev
|
||||
|
||||
// it is possible that the content manager revision has changed while we were reading it,
|
||||
// that's ok - we read __some__ consistent set of data and next time we will invalidate again.
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
const TypeLabelKey = "type"
|
||||
|
||||
type contentManager interface {
|
||||
Revision() int64
|
||||
GetContent(ctx context.Context, contentID content.ID) ([]byte, error)
|
||||
WriteContent(ctx context.Context, data []byte, prefix content.ID) (content.ID, error)
|
||||
DeleteContent(ctx context.Context, contentID content.ID) error
|
||||
@@ -239,11 +240,6 @@ func (m *Manager) Delete(ctx context.Context, id ID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Refresh updates the committed contents from the underlying storage.
|
||||
func (m *Manager) Refresh(ctx context.Context) error {
|
||||
return m.committed.invalidate()
|
||||
}
|
||||
|
||||
// Compact performs compaction of manifest contents.
|
||||
func (m *Manager) Compact(ctx context.Context) error {
|
||||
return m.committed.compact(ctx)
|
||||
|
||||
@@ -103,10 +103,6 @@ func TestManifest(t *testing.T) {
|
||||
// still found in another
|
||||
verifyItem(ctx, t, mgr2, id3, labels3, item3)
|
||||
|
||||
if err := mgr2.Refresh(ctx); err != nil {
|
||||
t.Errorf("unable to load: %v", err)
|
||||
}
|
||||
|
||||
if err := mgr.Compact(ctx); err != nil {
|
||||
t.Errorf("can't compact: %v", err)
|
||||
}
|
||||
|
||||
@@ -275,10 +275,6 @@ func (r *directRepository) Refresh(ctx context.Context) error {
|
||||
return errors.Wrap(err, "error refreshing content index")
|
||||
}
|
||||
|
||||
if err := r.mmgr.Refresh(ctx); err != nil {
|
||||
return errors.Wrap(err, "error reloading manifests")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -320,7 +316,7 @@ func WriteSession(ctx context.Context, r Repository, opt WriteSessionOptions, cb
|
||||
return errors.Wrap(err, "unable to create writer")
|
||||
}
|
||||
|
||||
return handleWriteSessionResult(ctx, r, w, opt, cb(w))
|
||||
return handleWriteSessionResult(ctx, w, opt, cb(w))
|
||||
}
|
||||
|
||||
// DirectWriteSession executes the provided callback in a DirectRepositoryWriter created for the purpose and flushes writes.
|
||||
@@ -330,10 +326,10 @@ func DirectWriteSession(ctx context.Context, r DirectRepository, opt WriteSessio
|
||||
return errors.Wrap(err, "unable to create direct writer")
|
||||
}
|
||||
|
||||
return handleWriteSessionResult(ctx, r, w, opt, cb(w))
|
||||
return handleWriteSessionResult(ctx, w, opt, cb(w))
|
||||
}
|
||||
|
||||
func handleWriteSessionResult(ctx context.Context, r Repository, w RepositoryWriter, opt WriteSessionOptions, resultErr error) error {
|
||||
func handleWriteSessionResult(ctx context.Context, w RepositoryWriter, opt WriteSessionOptions, resultErr error) error {
|
||||
defer func() {
|
||||
if err := w.Close(ctx); err != nil {
|
||||
log(ctx).Warningf("error closing writer: %v", err)
|
||||
@@ -346,10 +342,6 @@ func handleWriteSessionResult(ctx context.Context, r Repository, w RepositoryWri
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.Refresh(ctx); err != nil {
|
||||
return errors.Wrap(err, "error refreshing repository")
|
||||
}
|
||||
|
||||
return resultErr
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user