diff --git a/cli/app.go b/cli/app.go index b7892cdeb..b719857c5 100644 --- a/cli/app.go +++ b/cli/app.go @@ -45,6 +45,7 @@ indexCommands = app.Command("index", "Commands to manipulate content index.").Hidden() benchmarkCommands = app.Command("benchmark", "Commands to test performance of algorithms.").Hidden() maintenanceCommands = app.Command("maintenance", "Maintenance commands.").Hidden().Alias("gc") + sessionCommands = app.Command("session", "Session commands.").Hidden() ) func helpFullAction(ctx *kingpin.ParseContext) error { diff --git a/cli/command_blob_gc.go b/cli/command_blob_gc.go index cf8d7c14d..69885da37 100644 --- a/cli/command_blob_gc.go +++ b/cli/command_blob_gc.go @@ -11,21 +11,23 @@ ) var ( - blobGarbageCollectCommand = blobCommands.Command("gc", "Garbage-collect unused blobs") - blobGarbageCollectCommandDelete = blobGarbageCollectCommand.Flag("delete", "Whether to delete unused blobs").String() - blobGarbageCollectParallel = blobGarbageCollectCommand.Flag("parallel", "Number of parallel blob scans").Default("16").Int() - blobGarbageCollectMinAge = blobGarbageCollectCommand.Flag("min-age", "Garbage-collect blobs with minimum age").Default("24h").Duration() - blobGarbageCollectPrefix = blobGarbageCollectCommand.Flag("prefix", "Only GC blobs with given prefix").String() + blobGarbageCollectCommand = blobCommands.Command("gc", "Garbage-collect unused blobs") + blobGarbageCollectCommandDelete = blobGarbageCollectCommand.Flag("delete", "Whether to delete unused blobs").String() + blobGarbageCollectParallel = blobGarbageCollectCommand.Flag("parallel", "Number of parallel blob scans").Default("16").Int() + blobGarbageCollectMinAge = blobGarbageCollectCommand.Flag("min-age", "Garbage-collect blobs with minimum age").Default("24h").Duration() + blobGarbageCollectSessionExpirationAge = blobGarbageCollectCommand.Flag("session-expiration-age", "Garbage-collect blobs belonging to sessions that have not been updated recently").Default("96h").Duration() + blobGarbageCollectPrefix = blobGarbageCollectCommand.Flag("prefix", "Only GC blobs with given prefix").String() ) func runBlobGarbageCollectCommand(ctx context.Context, rep *repo.DirectRepository) error { advancedCommand(ctx) opts := maintenance.DeleteUnreferencedBlobsOptions{ - DryRun: *blobGarbageCollectCommandDelete != "yes", - MinAge: *blobGarbageCollectMinAge, - Parallel: *blobGarbageCollectParallel, - Prefix: blob.ID(*blobGarbageCollectPrefix), + DryRun: *blobGarbageCollectCommandDelete != "yes", + MinAge: *blobGarbageCollectMinAge, + SessionExpirationAge: *blobGarbageCollectSessionExpirationAge, + Parallel: *blobGarbageCollectParallel, + Prefix: blob.ID(*blobGarbageCollectPrefix), } n, err := maintenance.DeleteUnreferencedBlobs(ctx, rep, opts) diff --git a/cli/command_session_list.go b/cli/command_session_list.go new file mode 100644 index 000000000..0f6e4d4f9 --- /dev/null +++ b/cli/command_session_list.go @@ -0,0 +1,28 @@ +package cli + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo" +) + +var sessionListCommand = sessionCommands.Command("list", "List sessions").Alias("ls") + +func runSessionList(ctx context.Context, rep *repo.DirectRepository) error { + sessions, err := rep.ListActiveSessions(ctx) + if err != nil { + return errors.Wrap(err, "error listing sessions") + } + + for _, s := range sessions { + printStdout("%v %v@%v %v %v\n", s.ID, s.User, s.Host, formatTimestamp(s.StartTime), formatTimestamp(s.CheckpointTime)) + } + + return nil +} + +func init() { + sessionListCommand.Action(directRepositoryAction(runSessionList)) +} diff --git a/internal/blobtesting/faulty.go b/internal/blobtesting/faulty.go index 644cbb020..183b0db3a 100644 --- a/internal/blobtesting/faulty.go +++ b/internal/blobtesting/faulty.go @@ -3,6 +3,7 @@ import ( "context" "sync" + "testing" "time" "github.com/kopia/kopia/repo/blob" @@ -117,12 +118,18 @@ func (s *FaultyStorage) getNextFault(ctx context.Context, method string, args .. return nil } + log(ctx).Infof("got fault for %v %v", method, faults[0]) + f := faults[0] if f.Repeat > 0 { f.Repeat-- log(ctx).Debugf("will repeat %v more times the fault for %v %v", f.Repeat, method, args) } else { - s.Faults[method] = faults[1:] + if remaining := faults[1:]; len(remaining) > 0 { + s.Faults[method] = remaining + } else { + delete(s.Faults, method) + } } s.mu.Unlock() @@ -149,4 +156,11 @@ func (s *FaultyStorage) getNextFault(ctx context.Context, method string, args .. return f.Err } +// VerifyAllFaultsExercised fails the test if some faults have not been exercised. +func (s *FaultyStorage) VerifyAllFaultsExercised(t *testing.T) { + if len(s.Faults) != 0 { + t.Fatalf("not all defined faults have been hit: %#v", s.Faults) + } +} + var _ blob.Storage = (*FaultyStorage)(nil) diff --git a/repo/blob/filesystem/filesystem_storage.go b/repo/blob/filesystem/filesystem_storage.go index b95ed6eb1..8492e579c 100644 --- a/repo/blob/filesystem/filesystem_storage.go +++ b/repo/blob/filesystem/filesystem_storage.go @@ -128,11 +128,6 @@ func (fs *fsImpl) GetBlobFromPath(ctx context.Context, dirPath, path string, off func (fs *fsImpl) GetMetadataFromPath(ctx context.Context, dirPath, path string) (blob.Metadata, error) { fi, err := os.Stat(path) - if err != nil { - // nolint:wrapcheck - return blob.Metadata{}, err - } - if err != nil { if os.IsNotExist(err) { return blob.Metadata{}, blob.ErrBlobNotFound diff --git a/repo/content/blob_crypto.go b/repo/content/blob_crypto.go new file mode 100644 index 000000000..880bb1c5c --- /dev/null +++ b/repo/content/blob_crypto.go @@ -0,0 +1,82 @@ +package content + +import ( + "bytes" + "crypto/aes" + "encoding/hex" + "strings" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" +) + +func getIndexBlobIV(s blob.ID) ([]byte, error) { + if p := strings.Index(string(s), "-"); p >= 0 { // nolint:gocritic + s = s[0:p] + } + + if len(s) < 2*aes.BlockSize { + return nil, errors.Errorf("blob id too short: %v", s) + } + + return hex.DecodeString(string(s[len(s)-(aes.BlockSize*2):])) +} + +func encryptFullBlob(h hashing.HashFunc, enc encryption.Encryptor, data []byte, prefix blob.ID, sessionID SessionID) (blob.ID, []byte, error) { + var hashOutput [maxHashSize]byte + + hash := h(hashOutput[:0], data) + blobID := prefix + blob.ID(hex.EncodeToString(hash)) + + if sessionID != "" { + blobID += blob.ID("-" + sessionID) + } + + iv, err := getIndexBlobIV(blobID) + if err != nil { + return "", nil, err + } + + data2, err := enc.Encrypt(nil, data, iv) + if err != nil { + return "", nil, errors.Wrapf(err, "error encrypting blob %v", blobID) + } + + return blobID, data2, nil +} + +func decryptFullBlob(h hashing.HashFunc, enc encryption.Encryptor, payload []byte, blobID blob.ID) ([]byte, error) { + iv, err := getIndexBlobIV(blobID) + if err != nil { + return nil, errors.Wrap(err, "unable to get index blob IV") + } + + payload, err = enc.Decrypt(nil, payload, iv) + if err != nil { + return nil, errors.Wrap(err, "decrypt error") + } + + // Since the encryption key is a function of data, we must be able to generate exactly the same key + // after decrypting the content. This serves as a checksum. + if err := verifyChecksum(h, payload, iv); err != nil { + return nil, err + } + + return payload, nil +} + +func verifyChecksum(h hashing.HashFunc, data, iv []byte) error { + var hashOutput [maxHashSize]byte + + expected := h(hashOutput[:0], data) + expected = expected[len(expected)-aes.BlockSize:] + + if !bytes.HasSuffix(iv, expected) { + return errors.Errorf("invalid checksum for blob %x, expected %x", iv, expected) + } + + return nil +} diff --git a/repo/content/content_formatter_test.go b/repo/content/content_formatter_test.go index 2de43ba43..14496ccb8 100644 --- a/repo/content/content_formatter_test.go +++ b/repo/content/content_formatter_test.go @@ -10,7 +10,6 @@ "time" "github.com/kopia/kopia/internal/blobtesting" - "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/encryption" @@ -104,7 +103,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp MaxPackSize: maxPackSize, MasterKey: make([]byte, 32), // zero key, does not matter Version: 1, - }, nil, &ManagerOptions{TimeNow: clock.Now}) + }, nil, nil) if err != nil { t.Errorf("can't create content manager with hash %v and encryption %v: %v", hashAlgo, encryptionAlgo, err.Error()) return diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index a68b07fc2..b36ac9e94 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -78,6 +78,12 @@ type Manager struct { cond *sync.Cond flushing bool + sessionUser string // user@host to report as session owners + sessionHost string + + currentSessionInfo SessionInfo + sessionMarkerBlobIDs []blob.ID // session marker blobs written so far + pendingPacks map[blob.ID]*pendingPackInfo writingPacks []*pendingPackInfo // list of packs that are being written failedPacks []*pendingPackInfo // list of packs that failed to write, will be retried @@ -121,13 +127,13 @@ func (bm *Manager) DeleteContent(ctx context.Context, contentID ID) error { // remove from all packs that are being written, since they will be committed to index soon for _, pp := range bm.writingPacks { if bi, ok := pp.currentPackItems[contentID]; ok && !bi.Deleted { - return bm.deletePreexistingContent(bi) + return bm.deletePreexistingContent(ctx, bi) } } // if found in committed index, add another entry that's marked for deletion if bi, ok := bm.packIndexBuilder[contentID]; ok { - return bm.deletePreexistingContent(*bi) + return bm.deletePreexistingContent(ctx, *bi) } // see if the block existed before @@ -136,17 +142,17 @@ func (bm *Manager) DeleteContent(ctx context.Context, contentID ID) error { return err } - return bm.deletePreexistingContent(bi) + return bm.deletePreexistingContent(ctx, bi) } // Intentionally passing bi by value. // nolint:gocritic -func (bm *Manager) deletePreexistingContent(ci Info) error { +func (bm *Manager) deletePreexistingContent(ctx context.Context, ci Info) error { if ci.Deleted { return nil } - pp, err := bm.getOrCreatePendingPackInfoLocked(packPrefixForContentID(ci.ID)) + pp, err := bm.getOrCreatePendingPackInfoLocked(ctx, packPrefixForContentID(ci.ID)) if err != nil { return errors.Wrap(err, "unable to create pack") } @@ -158,7 +164,51 @@ func (bm *Manager) deletePreexistingContent(ci Info) error { return nil } +func (bm *Manager) maybeFlushBasedOnTimeUnlocked(ctx context.Context) error { + bm.lock() + shouldFlush := bm.timeNow().After(bm.flushPackIndexesAfter) + bm.unlock() + + if !shouldFlush { + return nil + } + + return bm.Flush(ctx) +} + +func (bm *Manager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context) error { + bm.lock() + defer bm.unlock() + + // do not start new uploads while flushing + for bm.flushing { + formatLog(ctx).Debugf("wait-before-retry") + bm.cond.Wait() + } + + // see if we have any packs that have failed previously + // retry writing them now. + // + // we're making a copy of bm.failedPacks since bm.writePackAndAddToIndex() + // will remove from it on success. + fp := append([]*pendingPackInfo(nil), bm.failedPacks...) + for _, pp := range fp { + formatLog(ctx).Debugf("retry-write %v", pp.packBlobID) + + if err := bm.writePackAndAddToIndex(ctx, pp, true); err != nil { + return errors.Wrap(err, "error writing previously failed pack") + } + } + + return nil +} + func (bm *Manager) addToPackUnlocked(ctx context.Context, contentID ID, data []byte, isDeleted bool) error { + // see if the current index is old enough to cause automatic flush. + if err := bm.maybeFlushBasedOnTimeUnlocked(ctx); err != nil { + return errors.Wrap(err, "unable to flush old pending writes") + } + prefix := packPrefixForContentID(contentID) bm.lock() @@ -176,21 +226,17 @@ func (bm *Manager) addToPackUnlocked(ctx context.Context, contentID ID, data []b // will remove from it on success. fp := append([]*pendingPackInfo(nil), bm.failedPacks...) for _, pp := range fp { + formatLog(ctx).Debugf("retry-write %v", pp.packBlobID) + if err := bm.writePackAndAddToIndex(ctx, pp, true); err != nil { bm.unlock() return errors.Wrap(err, "error writing previously failed pack") } } - if bm.timeNow().After(bm.flushPackIndexesAfter) { - if err := bm.flushPackIndexesLocked(ctx); err != nil { - bm.unlock() - return err - } - } - - pp, err := bm.getOrCreatePendingPackInfoLocked(prefix) + pp, err := bm.getOrCreatePendingPackInfoLocked(ctx, prefix) if err != nil { + bm.unlock() return errors.Wrap(err, "unable to create pending pack") } @@ -311,11 +357,17 @@ func (bm *Manager) flushPackIndexesLocked(ctx context.Context) error { data := b.Bytes() dataCopy := append([]byte(nil), data...) - indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data) + indexBlobMD, err := bm.indexBlobManager.writeIndexBlob(ctx, data, bm.currentSessionInfo.ID) if err != nil { return errors.Wrap(err, "error writing index blob") } + if err := bm.commitSession(ctx); err != nil { + return errors.Wrap(err, "unable to commit session") + } + + // if we managed to commit the session marker blobs, the index is now fully committed + // and will be visible to others, including blob GC. if err := bm.committedContents.addContent(ctx, indexBlobMD.BlobID, dataCopy, true); err != nil { return errors.Wrap(err, "unable to add committed content") } @@ -383,7 +435,7 @@ func (bm *Manager) prepareAndWritePackInternal(ctx context.Context, pp *pendingP if pp.currentPackData.Length() > 0 { if err := bm.writePackFileNotLocked(ctx, pp.packBlobID, pp.currentPackData.Bytes); err != nil { formatLog(ctx).Debugf("failed-pack %v %v", pp.packBlobID, err) - return nil, errors.Wrap(err, "can't save pack data content") + return nil, errors.Wrapf(err, "can't save pack data blob %v", pp.packBlobID) } formatLog(ctx).Debugf("wrote-pack %v %v", pp.packBlobID, pp.currentPackData.Length()) @@ -439,6 +491,20 @@ func (bm *Manager) Flush(ctx context.Context) error { bm.cond.Broadcast() }() + // see if we have any packs that have failed previously + // retry writing them now. + // + // we're making a copy of bm.failedPacks since bm.writePackAndAddToIndex() + // will remove from it on success. + fp := append([]*pendingPackInfo(nil), bm.failedPacks...) + for _, pp := range fp { + formatLog(ctx).Debugf("retry-write %v", pp.packBlobID) + + if err := bm.writePackAndAddToIndex(ctx, pp, true); err != nil { + return errors.Wrap(err, "error writing previously failed pack") + } + } + for len(bm.writingPacks) > 0 { log(ctx).Debugf("waiting for %v in-progress packs to finish", len(bm.writingPacks)) @@ -506,28 +572,35 @@ func packPrefixForContentID(contentID ID) blob.ID { return PackBlobIDPrefixRegular } -func (bm *Manager) getOrCreatePendingPackInfoLocked(prefix blob.ID) (*pendingPackInfo, error) { - if bm.pendingPacks[prefix] == nil { - b := gather.NewWriteBuffer() +func (bm *Manager) getOrCreatePendingPackInfoLocked(ctx context.Context, prefix blob.ID) (*pendingPackInfo, error) { + if pp := bm.pendingPacks[prefix]; pp != nil { + return pp, nil + } - contentID := make([]byte, 16) - if _, err := cryptorand.Read(contentID); err != nil { - return nil, errors.Wrap(err, "unable to read crypto bytes") - } + b := gather.NewWriteBuffer() - b.Append(bm.repositoryFormatBytes) + sessionID, err := bm.getOrStartSessionLocked(ctx) + if err != nil { + return nil, errors.Wrap(err, "unable to get session ID") + } - // nolint:gosec - if err := writeRandomBytesToBuffer(b, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength); err != nil { - return nil, errors.Wrap(err, "unable to prepare content preamble") - } + blobID := make([]byte, 16) + if _, err := cryptorand.Read(blobID); err != nil { + return nil, errors.Wrap(err, "unable to read crypto bytes") + } - bm.pendingPacks[prefix] = &pendingPackInfo{ - prefix: prefix, - packBlobID: blob.ID(fmt.Sprintf("%v%x", prefix, contentID)), - currentPackItems: map[ID]Info{}, - currentPackData: b, - } + b.Append(bm.repositoryFormatBytes) + + // nolint:gosec + if err := writeRandomBytesToBuffer(b, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength); err != nil { + return nil, errors.Wrap(err, "unable to prepare content preamble") + } + + bm.pendingPacks[prefix] = &pendingPackInfo{ + prefix: prefix, + packBlobID: blob.ID(fmt.Sprintf("%v%x-%v", prefix, blobID, sessionID)), + currentPackItems: map[ID]Info{}, + currentPackData: b, } return bm.pendingPacks[prefix], nil @@ -536,6 +609,10 @@ func (bm *Manager) getOrCreatePendingPackInfoLocked(prefix blob.ID) (*pendingPac // WriteContent saves a given content of data to a pack group with a provided name and returns a contentID // that's based on the contents of data written. func (bm *Manager) WriteContent(ctx context.Context, data []byte, prefix ID) (ID, error) { + if err := bm.maybeRetryWritingFailedPacksUnlocked(ctx); err != nil { + return "", err + } + stats.Record(ctx, metricContentWriteContentCount.M(1)) stats.Record(ctx, metricContentWriteContentBytes.M(int64(len(data)))) @@ -682,6 +759,8 @@ func (bm *Manager) DecryptBlob(ctx context.Context, blobID blob.ID) ([]byte, err type ManagerOptions struct { RepositoryFormatBytes []byte TimeNow func() time.Time // Time provider + SessionUser string // optional username to report as session owner + SessionHost string // optional hostname to report as session owner ownWritesCache ownWritesCache // test hook to allow overriding own-writes cache } @@ -736,5 +815,7 @@ func newManagerWithReadManager(ctx context.Context, f *FormattingOptions, readMa flushPackIndexesAfter: options.TimeNow().Add(flushPackIndexTimeout), pendingPacks: map[blob.ID]*pendingPackInfo{}, packIndexBuilder: make(packIndexBuilder), + sessionUser: options.SessionUser, + sessionHost: options.SessionHost, } } diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 50d5ba425..758ebc8eb 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -109,7 +109,7 @@ func (bm *Manager) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlob return errors.Wrap(err, "unable to build an index") } - compactedIndexBlob, err := bm.indexBlobManager.writeIndexBlob(ctx, buf.Bytes()) + compactedIndexBlob, err := bm.indexBlobManager.writeIndexBlob(ctx, buf.Bytes(), "") if err != nil { return errors.Wrap(err, "unable to write compacted indexes") } diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 39e11f341..15757b5b5 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -16,6 +16,7 @@ "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/pkg/errors" "github.com/kopia/kopia/internal/blobtesting" @@ -105,15 +106,10 @@ func TestContentManagerSmallContentWrites(t *testing.T) { writeContentAndVerify(ctx, t, bm, seededRandomData(i, 10)) } - if got, want := len(data), 0; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } - + verifyBlobCount(t, data, map[blob.ID]int{"s": 1}) bm.Flush(ctx) - if got, want := len(data), 2; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + verifyBlobCount(t, data, map[blob.ID]int{"n": 1, "p": 1}) } func TestContentManagerDedupesPendingContents(t *testing.T) { @@ -128,15 +124,13 @@ func TestContentManagerDedupesPendingContents(t *testing.T) { writeContentAndVerify(ctx, t, bm, seededRandomData(0, maxPackCapacity/2)) } - if got, want := len(data), 0; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + // expect one blob which is a session marker. + verifyBlobCount(t, data, map[blob.ID]int{"s": 1}) bm.Flush(ctx) - if got, want := len(data), 2; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + // session marker will be deleted and replaced with data + index. + verifyBlobCount(t, data, map[blob.ID]int{"n": 1, "p": 1}) } func TestContentManagerDedupesPendingAndUncommittedContents(t *testing.T) { @@ -151,30 +145,26 @@ func TestContentManagerDedupesPendingAndUncommittedContents(t *testing.T) { contentSize := maxPackCapacity/3 - encryptionOverhead - 1 // no writes here, all data fits in a single pack. + // but we will have a session marker. writeContentAndVerify(ctx, t, bm, seededRandomData(0, contentSize)) writeContentAndVerify(ctx, t, bm, seededRandomData(1, contentSize)) writeContentAndVerify(ctx, t, bm, seededRandomData(2, contentSize)) - if got, want := len(data), 0; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + // expect one blob which is a session marker. + verifyBlobCount(t, data, map[blob.ID]int{"s": 1}) // no writes here writeContentAndVerify(ctx, t, bm, seededRandomData(0, contentSize)) writeContentAndVerify(ctx, t, bm, seededRandomData(1, contentSize)) writeContentAndVerify(ctx, t, bm, seededRandomData(2, contentSize)) - if got, want := len(data), 0; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + // expect one blob which is a session marker. + verifyBlobCount(t, data, map[blob.ID]int{"s": 1}) bm.Flush(ctx) - // this flushes the pack content + index blob - if got, want := len(data), 2; got != want { - dumpContentManagerData(ctx, t, data) - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + // this flushes the pack content + index blob and deletes session marker. + verifyBlobCount(t, data, map[blob.ID]int{"n": 1, "p": 1}) } func TestContentManagerEmpty(t *testing.T) { @@ -197,9 +187,7 @@ func TestContentManagerEmpty(t *testing.T) { t.Errorf("unexpected error when getting non-existent content info: %v, %v", bi, err) } - if got, want := len(data), 0; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + verifyBlobCount(t, data, map[blob.ID]int{}) } func verifyActiveIndexBlobCount(ctx context.Context, t *testing.T, bm *Manager, expected int) { @@ -231,10 +219,8 @@ func TestContentManagerInternalFlush(t *testing.T) { writeContentAndVerify(ctx, t, bm, b) } - // 1 data content written, but no index yet. - if got, want := len(data), 1; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + // 1 data blobs + session marker written, but no index yet. + verifyBlobCount(t, data, map[blob.ID]int{"s": 1, "p": 1}) // do it again - should be 2 blobs + some bytes pending. for i := 0; i < itemsToOverflow; i++ { @@ -243,18 +229,13 @@ func TestContentManagerInternalFlush(t *testing.T) { writeContentAndVerify(ctx, t, bm, b) } - // 2 data contents written, but no index yet. - if got, want := len(data), 2; got != want { - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + // 2 data blobs written + session marker, but no index yet. + verifyBlobCount(t, data, map[blob.ID]int{"s": 1, "p": 2}) bm.Flush(ctx) - // third content gets written, followed by index. - if got, want := len(data), 4; got != want { - dumpContentManagerData(ctx, t, data) - t.Errorf("unexpected number of contents: %v, wanted %v", got, want) - } + // third data blob gets written, followed by index, session marker gets deleted. + verifyBlobCount(t, data, map[blob.ID]int{"n": 1, "p": 3}) } func TestContentManagerWriteMultiple(t *testing.T) { @@ -315,6 +296,8 @@ func TestContentManagerFailedToWritePack(t *testing.T) { } st = faulty + ta := faketime.NewTimeAdvance(fakeTime, 0) + bm, err := NewManager(testlogging.Context(t), st, &FormattingOptions{ Version: 1, Hash: "HMAC-SHA256-128", @@ -322,29 +305,54 @@ func TestContentManagerFailedToWritePack(t *testing.T) { MaxPackSize: maxPackSize, HMACSecret: []byte("foo"), MasterKey: []byte("0123456789abcdef0123456789abcdef"), - }, nil, &ManagerOptions{TimeNow: faketime.Frozen(fakeTime)}) + }, nil, &ManagerOptions{TimeNow: ta.NowFunc()}) if err != nil { t.Fatalf("can't create bm: %v", err) } defer bm.Close(ctx) + sessionPutErr := errors.New("booboo0") + firstPutErr := errors.New("booboo1") + secondPutErr := errors.New("booboo2") faulty.Faults = map[string][]*blobtesting.Fault{ - "PutContent": { - {Err: errors.New("booboo")}, + "PutBlob": { + {Err: sessionPutErr}, + {Err: firstPutErr}, + {Err: secondPutErr}, }, } + _, err = bm.WriteContent(ctx, seededRandomData(1, 10), "") + if !errors.Is(err, sessionPutErr) { + t.Fatalf("can't create first content: %v", err) + } + b1, err := bm.WriteContent(ctx, seededRandomData(1, 10), "") if err != nil { t.Fatalf("can't create content: %v", err) } - if err := bm.Flush(ctx); err != nil { + // advance time enough to cause auto-flush, which will fail (firstPutErr) + ta.Advance(1 * time.Hour) + + if _, err := bm.WriteContent(ctx, seededRandomData(2, 10), ""); !errors.Is(err, firstPutErr) { + t.Fatalf("can't create 2nd content: %v", err) + } + + // manual flush will fail because we're unable to write the blob (secondPutErr) + if err := bm.Flush(ctx); !errors.Is(err, secondPutErr) { t.Logf("expected flush error: %v", err) } + // flush will now succeed. + if err := bm.Flush(ctx); err != nil { + t.Logf("unexpected 2nd flush error: %v", err) + } + verifyContent(ctx, t, bm, b1, seededRandomData(1, 10)) + + faulty.VerifyAllFaultsExercised(t) } func TestIndexCompactionDropsContent(t *testing.T) { @@ -1083,6 +1091,58 @@ func TestFlushResumesWriters(t *testing.T) { }) } +func TestFlushWaitsForAllPendingWriters(t *testing.T) { + t.Parallel() + + ctx := testlogging.Context(t) + + data := blobtesting.DataMap{} + keyTime := map[blob.ID]time.Time{} + st := blobtesting.NewMapStorage(data, keyTime, nil) + + fs := &blobtesting.FaultyStorage{ + Base: st, + Faults: map[string][]*blobtesting.Fault{ + "PutBlob": { + // first write is fast (session ID blobs) + {}, + // second write is slow + {Sleep: 2 * time.Second}, + }, + }, + } + + bm := newTestContentManagerWithStorage(t, fs, nil) + defer bm.Close(ctx) + + // write one content in another goroutine + // 'fs' is configured so that blob write takes several seconds to complete. + go writeContentAndVerify(ctx, t, bm, seededRandomData(1, maxPackSize)) + + // wait enough time for the goroutine to start writing. + time.Sleep(100 * time.Millisecond) + + // write second short content + writeContentAndVerify(ctx, t, bm, seededRandomData(1, maxPackSize/4)) + + // flush will wait for both writes to complete. + t.Logf(">>> start of flushing") + bm.Flush(ctx) + t.Logf("<<< end of flushing") + + verifyBlobCount(t, data, map[blob.ID]int{ + PackBlobIDPrefixRegular: 2, + indexBlobPrefix: 1, + }) + + bm.Flush(ctx) + + verifyBlobCount(t, data, map[blob.ID]int{ + PackBlobIDPrefixRegular: 2, + indexBlobPrefix: 1, + }) +} + func verifyAllDataPresent(ctx context.Context, t *testing.T, data map[blob.ID][]byte, contentIDs map[ID]bool) { bm := newTestContentManager(t, data, nil, nil) defer bm.Close(ctx) @@ -1097,8 +1157,6 @@ func verifyAllDataPresent(ctx context.Context, t *testing.T, data map[blob.ID][] } func TestHandleWriteErrors(t *testing.T) { - ctx := testlogging.Context(t) - // genFaults(S0,F0,S1,F1,...,) generates a list of faults // where success is returned Sn times followed by failure returned Fn times genFaults := func(counts ...int) []*blobtesting.Fault { @@ -1106,14 +1164,18 @@ func TestHandleWriteErrors(t *testing.T) { for i, cnt := range counts { if i%2 == 0 { - result = append(result, &blobtesting.Fault{ - Repeat: cnt - 1, - }) + if cnt > 0 { + result = append(result, &blobtesting.Fault{ + Repeat: cnt - 1, + }) + } } else { - result = append(result, &blobtesting.Fault{ - Repeat: cnt - 1, - Err: errors.Errorf("some write error"), - }) + if cnt > 0 { + result = append(result, &blobtesting.Fault{ + Repeat: cnt - 1, + Err: errors.Errorf("some write error"), + }) + } } } @@ -1125,23 +1187,32 @@ func TestHandleWriteErrors(t *testing.T) { // also, verify that all the data is durable cases := []struct { faults []*blobtesting.Fault // failures to similuate - numContents int // how many contents to write - contentSize int // size of each content + contentSizes []int // sizes of contents to write + expectedWriteRetries []int expectedFlushRetries int - expectedWriteRetries int }{ - {faults: genFaults(0, 10, 10, 10, 10, 10, 10, 10, 10, 10), numContents: 5, contentSize: maxPackSize, expectedWriteRetries: 10, expectedFlushRetries: 0}, - {faults: genFaults(1, 2), numContents: 1, contentSize: maxPackSize, expectedWriteRetries: 0, expectedFlushRetries: 2}, - {faults: genFaults(1, 2), numContents: 10, contentSize: maxPackSize, expectedWriteRetries: 2, expectedFlushRetries: 0}, - // 2 failures, 2 successes (pack blobs), 1 failure (flush), 1 success (flush) - {faults: genFaults(0, 2, 2, 1, 1, 1, 1), numContents: 2, contentSize: maxPackSize, expectedWriteRetries: 2, expectedFlushRetries: 1}, - {faults: genFaults(0, 2, 2, 1, 1, 1, 1), numContents: 4, contentSize: maxPackSize / 2, expectedWriteRetries: 2, expectedFlushRetries: 1}, + // write 3 packs of maxPackSize + // PutBlob: {1 x SUCCESS (session marker), 5 x FAILURE, 3 x SUCCESS, 9 x FAILURE } + {faults: genFaults(1, 5, 3, 9), contentSizes: []int{maxPackSize, maxPackSize, maxPackSize}, expectedWriteRetries: []int{5, 0, 0}, expectedFlushRetries: 9}, + + // write 1 content which succeeds, then flush which will fail 5 times before succeeding. + {faults: genFaults(2, 5), contentSizes: []int{maxPackSize}, expectedWriteRetries: []int{0}, expectedFlushRetries: 5}, + + // write 4 contents, first write succeeds, next one fails 7 times, then all successes. + {faults: genFaults(2, 7), contentSizes: []int{maxPackSize, maxPackSize, maxPackSize, maxPackSize}, expectedWriteRetries: []int{0, 7, 0, 0}, expectedFlushRetries: 0}, + + // first flush fill fail on pack write, next 3 will fail on index writes. + {faults: genFaults(1, 1, 0, 3), contentSizes: []int{maxPackSize / 2}, expectedWriteRetries: []int{0}, expectedFlushRetries: 4}, + + // second write will be retried 5 times, flush will be retried 3 times. + {faults: genFaults(1, 5, 1, 3), contentSizes: []int{maxPackSize / 2, maxPackSize / 2}, expectedWriteRetries: []int{0, 5}, expectedFlushRetries: 3}, } for n, tc := range cases { tc := tc t.Run(fmt.Sprintf("case-%v", n), func(t *testing.T) { + ctx := testlogging.Context(t) data := blobtesting.DataMap{} keyTime := map[blob.ID]time.Time{} st := blobtesting.NewMapStorage(data, keyTime, nil) @@ -1157,26 +1228,29 @@ func TestHandleWriteErrors(t *testing.T) { bm := newTestContentManagerWithStorage(t, fs, nil) defer bm.Close(ctx) - writeRetries := 0 + var writeRetries []int var cids []ID - for i := 0; i < tc.numContents; i++ { - cid, retries := writeContentWithRetriesAndVerify(ctx, t, bm, seededRandomData(i, tc.contentSize)) - writeRetries += retries + for i, size := range tc.contentSizes { + t.Logf(">>>> writing %v", i) + cid, retries := writeContentWithRetriesAndVerify(ctx, t, bm, seededRandomData(i, size)) + writeRetries = append(writeRetries, retries) cids = append(cids, cid) } if got, want := flushWithRetries(ctx, t, bm), tc.expectedFlushRetries; got != want { t.Errorf("invalid # of flush retries %v, wanted %v", got, want) } - if got, want := writeRetries, tc.expectedWriteRetries; got != want { - t.Errorf("invalid # of write retries %v, wanted %v", got, want) + if diff := cmp.Diff(writeRetries, tc.expectedWriteRetries); diff != "" { + t.Errorf("invalid # of write retries (-got,+want): %v", diff) } bm2 := newTestContentManagerWithStorage(t, st, nil) defer bm2.Close(ctx) for i, cid := range cids { - verifyContent(ctx, t, bm2, cid, seededRandomData(i, tc.contentSize)) + verifyContent(ctx, t, bm2, cid, seededRandomData(i, tc.contentSizes[i])) } + + fs.VerifyAllFaultsExercised(t) }) } } @@ -1970,11 +2044,13 @@ func flushWithRetries(ctx context.Context, t *testing.T, bm *Manager) int { func writeContentWithRetriesAndVerify(ctx context.Context, t *testing.T, bm *Manager, b []byte) (contentID ID, retryCount int) { t.Helper() + log(ctx).Infof("*** starting writeContentWithRetriesAndVerify") + contentID, err := bm.WriteContent(ctx, b, "") for i := 0; err != nil && i < maxRetries; i++ { retryCount++ - log(ctx).Warningf("WriteContent failed %v, retrying", err) + log(ctx).Infof("*** try %v", retryCount) contentID, err = bm.WriteContent(ctx, b, "") } @@ -1988,6 +2064,7 @@ func writeContentWithRetriesAndVerify(ctx context.Context, t *testing.T, bm *Man } verifyContent(ctx, t, bm, contentID, b) + log(ctx).Infof("*** finished after %v retries", retryCount) return contentID, retryCount } @@ -2059,3 +2136,17 @@ func must(t *testing.T, err error) { t.Fatal(err) } } + +func verifyBlobCount(t *testing.T, data blobtesting.DataMap, want map[blob.ID]int) { + t.Helper() + + got := map[blob.ID]int{} + + for k := range data { + got[k[0:1]]++ + } + + if !cmp.Equal(got, want) { + t.Fatalf("unexpected blob count %v, want %v", got, want) + } +} diff --git a/repo/content/index_blob_manager.go b/repo/content/index_blob_manager.go index a1ce76cc1..b991525c2 100644 --- a/repo/content/index_blob_manager.go +++ b/repo/content/index_blob_manager.go @@ -1,12 +1,8 @@ package content import ( - "bytes" "context" - "crypto/aes" - "encoding/hex" "encoding/json" - "strings" "time" "github.com/pkg/errors" @@ -19,7 +15,7 @@ // indexBlobManager is the API of index blob manager as used by content manager. type indexBlobManager interface { - writeIndexBlob(ctx context.Context, data []byte) (blob.Metadata, error) + writeIndexBlob(ctx context.Context, data []byte, sessionID SessionID) (blob.Metadata, error) listIndexBlobs(ctx context.Context, includeInactive bool) ([]IndexBlobInfo, error) getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata) error @@ -135,7 +131,7 @@ func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, o return errors.Wrap(err, "unable to marshal log entry bytes") } - compactionLogBlobMetadata, err := m.encryptAndWriteBlob(ctx, logEntryBytes, compactionLogBlobPrefix) + compactionLogBlobMetadata, err := m.encryptAndWriteBlob(ctx, logEntryBytes, compactionLogBlobPrefix, "") if err != nil { return errors.Wrap(err, "unable to write compaction log") } @@ -167,57 +163,17 @@ func (m *indexBlobManagerImpl) getEncryptedBlob(ctx context.Context, blobID blob return nil, errors.Wrap(err, "getContent") } - iv, err := getIndexBlobIV(blobID) - if err != nil { - return nil, errors.Wrap(err, "unable to get index blob IV") - } - - payload, err = m.encryptor.Decrypt(nil, payload, iv) - - if err != nil { - return nil, errors.Wrap(err, "decrypt error") - } - - // Since the encryption key is a function of data, we must be able to generate exactly the same key - // after decrypting the content. This serves as a checksum. - if err := m.verifyChecksum(payload, iv); err != nil { - return nil, err - } - - return payload, nil + return decryptFullBlob(m.hasher, m.encryptor, payload, blobID) } -func (m *indexBlobManagerImpl) verifyChecksum(data, contentID []byte) error { - var hashOutput [maxHashSize]byte - - expected := m.hasher(hashOutput[:0], data) - expected = expected[len(expected)-aes.BlockSize:] - - if !bytes.HasSuffix(contentID, expected) { - return errors.Errorf("invalid checksum for blob %x, expected %x", contentID, expected) - } - - return nil +func (m *indexBlobManagerImpl) writeIndexBlob(ctx context.Context, data []byte, sessionID SessionID) (blob.Metadata, error) { + return m.encryptAndWriteBlob(ctx, data, indexBlobPrefix, sessionID) } -func (m *indexBlobManagerImpl) writeIndexBlob(ctx context.Context, data []byte) (blob.Metadata, error) { - return m.encryptAndWriteBlob(ctx, data, indexBlobPrefix) -} - -func (m *indexBlobManagerImpl) encryptAndWriteBlob(ctx context.Context, data []byte, prefix blob.ID) (blob.Metadata, error) { - var hashOutput [maxHashSize]byte - - hash := m.hasher(hashOutput[:0], data) - blobID := prefix + blob.ID(hex.EncodeToString(hash)) - - iv, err := getIndexBlobIV(blobID) +func (m *indexBlobManagerImpl) encryptAndWriteBlob(ctx context.Context, data []byte, prefix blob.ID, sessionID SessionID) (blob.Metadata, error) { + blobID, data2, err := encryptFullBlob(m.hasher, m.encryptor, data, prefix, sessionID) if err != nil { - return blob.Metadata{}, err - } - - data2, err := m.encryptor.Encrypt(nil, data, iv) - if err != nil { - return blob.Metadata{}, errors.Wrapf(err, "error encrypting blob %v", blobID) + return blob.Metadata{}, errors.Wrap(err, "error encrypting") } m.listCache.deleteListCache(prefix) @@ -395,7 +351,7 @@ func (m *indexBlobManagerImpl) delayCleanupBlobs(ctx context.Context, blobIDs [] return errors.Wrap(err, "unable to marshal cleanup log bytes") } - if _, err := m.encryptAndWriteBlob(ctx, payload, cleanupBlobPrefix); err != nil { + if _, err := m.encryptAndWriteBlob(ctx, payload, cleanupBlobPrefix, ""); err != nil { return errors.Wrap(err, "unable to cleanup log") } @@ -468,14 +424,6 @@ func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata { return res } -func getIndexBlobIV(s blob.ID) ([]byte, error) { - if p := strings.Index(string(s), "-"); p >= 0 { // nolint:gocritic - s = s[0:p] - } - - return hex.DecodeString(string(s[len(s)-(aes.BlockSize*2):])) -} - func removeCompactedIndexes(ctx context.Context, m map[blob.ID]*IndexBlobInfo, compactionLogs map[blob.ID]*compactionLogEntry, markAsSuperseded bool) { var validCompactionLogs []*compactionLogEntry diff --git a/repo/content/index_blob_manager_test.go b/repo/content/index_blob_manager_test.go index 305f1ee64..04c4e26b0 100644 --- a/repo/content/index_blob_manager_test.go +++ b/repo/content/index_blob_manager_test.go @@ -605,7 +605,7 @@ func writeFakeIndex(ctx context.Context, m indexBlobManager, ndx map[string]fake return blob.Metadata{}, errors.Wrap(err, "json error") } - bm, err := m.writeIndexBlob(ctx, j) + bm, err := m.writeIndexBlob(ctx, j, "") if err != nil { return blob.Metadata{}, errors.Wrap(err, "error writing blob") } @@ -702,7 +702,7 @@ func mustRegisterCompaction(t *testing.T, m indexBlobManager, inputs, outputs [] func mustWriteIndexBlob(t *testing.T, m indexBlobManager, data string) blob.Metadata { t.Logf("writing index blob %q", data) - blobMD, err := m.writeIndexBlob(testlogging.Context(t), []byte(data)) + blobMD, err := m.writeIndexBlob(testlogging.Context(t), []byte(data), "") if err != nil { t.Fatalf("failed to write index blob: %v", err) } diff --git a/repo/content/sessions.go b/repo/content/sessions.go new file mode 100644 index 000000000..b7ea95eab --- /dev/null +++ b/repo/content/sessions.go @@ -0,0 +1,179 @@ +package content + +import ( + "context" + cryptorand "crypto/rand" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" +) + +// BlobIDPrefixSession is the prefix for blob IDs indicating active sessions. +// Each blob ID will consist of {sessionID}.{suffix}. +const BlobIDPrefixSession blob.ID = "s" + +// SessionID represents identifier of a session. +type SessionID string + +// SessionInfo describes a particular session and is persisted in Session blob. +type SessionInfo struct { + ID SessionID `json:"id"` + StartTime time.Time `json:"startTime"` + CheckpointTime time.Time `json:"checkpointTime"` + User string `json:"username"` + Host string `json:"hostname"` +} + +var ( + sessionIDEpochStartTime = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + sessionIDEpochGranularity = 30 * 24 * time.Hour +) + +// generateSessionID generates a random session identifier. +func generateSessionID(now time.Time) (SessionID, error) { + // generate session ID as {random-64-bit}{epoch-number} + // where epoch number is roughly the number of months since 2000-01-01 + // so our 64-bit number only needs to be unique per month. + // Given number of seconds per month, this allows >1000 sessions per + // second before significant probability of collision while keeping the + // session identifiers relatively short. + r := make([]byte, 8) + if _, err := cryptorand.Read(r); err != nil { + return "", errors.Wrap(err, "unable to read crypto bytes") + } + + epochNumber := int(now.Sub(sessionIDEpochStartTime) / sessionIDEpochGranularity) + + return SessionID(fmt.Sprintf("%v%016x%x", BlobIDPrefixSession, r, epochNumber)), nil +} + +func (bm *Manager) getOrStartSessionLocked(ctx context.Context) (SessionID, error) { + if bm.currentSessionInfo.ID != "" { + return bm.currentSessionInfo.ID, nil + } + + id, err := generateSessionID(bm.timeNow()) + if err != nil { + return "", errors.Wrap(err, "unable to generate session ID") + } + + bm.currentSessionInfo = SessionInfo{ + ID: id, + StartTime: bm.timeNow(), + User: bm.sessionUser, + Host: bm.sessionHost, + } + + bm.sessionMarkerBlobIDs = nil + if err := bm.writeSessionMarkerLocked(ctx); err != nil { + return "", errors.Wrap(err, "unable to write session marker") + } + + return id, nil +} + +// commitSession commits the current session by deleting all session marker blobs +// that got written. +func (bm *Manager) commitSession(ctx context.Context) error { + for _, b := range bm.sessionMarkerBlobIDs { + if err := bm.st.DeleteBlob(ctx, b); err != nil && !errors.Is(err, blob.ErrBlobNotFound) { + return errors.Wrapf(err, "failed to delete session marker %v", b) + } + } + + bm.currentSessionInfo.ID = "" + bm.sessionMarkerBlobIDs = nil + + return nil +} + +// writeSessionMarkerLocked writes a session marker indicating last time the session +// was known to be alive. +// TODO(jkowalski): write this periodically when sessions span the duration of an upload. +func (bm *Manager) writeSessionMarkerLocked(ctx context.Context) error { + cp := bm.currentSessionInfo + cp.CheckpointTime = bm.timeNow() + + js, err := json.Marshal(cp) + if err != nil { + return errors.Wrap(err, "unable to serialize session marker payload") + } + + sessionBlobID, encrypted, err := encryptFullBlob(bm.hasher, bm.encryptor, js, BlobIDPrefixSession, bm.currentSessionInfo.ID) + if err != nil { + return errors.Wrap(err, "unable to encrypt session marker") + } + + if err := bm.st.PutBlob(ctx, sessionBlobID, gather.FromSlice(encrypted)); err != nil { + return errors.Wrapf(err, "unable to write session marker: %v", string(sessionBlobID)) + } + + bm.sessionMarkerBlobIDs = append(bm.sessionMarkerBlobIDs, sessionBlobID) + + return nil +} + +// SessionIDFromBlobID returns session ID from a given blob ID or empty string if it's not a session blob ID. +func SessionIDFromBlobID(b blob.ID) SessionID { + parts := strings.Split(string(b), "-") + if len(parts) == 1 { + return "" + } + + for _, sid := range parts[1:] { + if strings.HasPrefix(sid, string(BlobIDPrefixSession)) { + return SessionID(sid) + } + } + + return "" +} + +// ListActiveSessions returns a set of all active sessions in a given storage. +func (bm *Manager) ListActiveSessions(ctx context.Context) (map[SessionID]*SessionInfo, error) { + blobs, err := blob.ListAllBlobs(ctx, bm.st, BlobIDPrefixSession) + if err != nil { + return nil, errors.Wrap(err, "unable to list session blobs") + } + + m := map[SessionID]*SessionInfo{} + + for _, b := range blobs { + sid := SessionIDFromBlobID(b.BlobID) + if sid == "" { + return nil, errors.Errorf("found invalid session blob %v", b.BlobID) + } + + si := &SessionInfo{} + + payload, err := bm.st.GetBlob(ctx, b.BlobID, 0, -1) + if err != nil { + if errors.Is(err, blob.ErrBlobNotFound) { + continue + } + + return nil, errors.Wrapf(err, "error loading session: %v", b.BlobID) + } + + payload, err = decryptFullBlob(bm.hasher, bm.encryptor, payload, b.BlobID) + if err != nil { + return nil, errors.Wrapf(err, "error decrypting session: %v", b.BlobID) + } + + if err := json.Unmarshal(payload, si); err != nil { + return nil, errors.Wrapf(err, "error parsing session: %v", b.BlobID) + } + + if old := m[sid]; old == nil || si.CheckpointTime.After(old.CheckpointTime) { + m[sid] = si + } + } + + return m, nil +} diff --git a/repo/content/sessions_test.go b/repo/content/sessions_test.go new file mode 100644 index 000000000..c6b76fcf6 --- /dev/null +++ b/repo/content/sessions_test.go @@ -0,0 +1,57 @@ +package content + +import ( + "testing" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/repo/blob" +) + +func TestGenerateSessionID(t *testing.T) { + n := clock.Now() + + s1, err := generateSessionID(n) + if err != nil { + t.Fatal(err) + } + + s2, err := generateSessionID(n) + if err != nil { + t.Fatal(err) + } + + s3, err := generateSessionID(n) + if err != nil { + t.Fatal(err) + } + + m := map[SessionID]bool{ + s1: true, + s2: true, + s3: true, + } + + if len(m) != 3 { + t.Fatalf("session IDs were not unique: %v", m) + } +} + +func TestSessionIDFromBlobID(t *testing.T) { + cases := []struct { + blobID blob.ID + sessionID SessionID + }{ + {"pdeadbeef", ""}, + {"pdeadbeef-", ""}, + {"pdeadbeef-whatever", ""}, + {"pdeadbeef-s01", "s01"}, + {"pdeadbeef-s01", "s01"}, + {"sdeadbeef-s01", "s01"}, + } + + for _, tc := range cases { + if got, want := SessionIDFromBlobID(tc.blobID), tc.sessionID; got != want { + t.Errorf("invalid result for %v: %v, want %v", tc.blobID, got, want) + } + } +} diff --git a/repo/maintenance/blob_gc.go b/repo/maintenance/blob_gc.go index 87a1d3ed5..896fa6e50 100644 --- a/repo/maintenance/blob_gc.go +++ b/repo/maintenance/blob_gc.go @@ -10,6 +10,7 @@ "github.com/kopia/kopia/internal/stats" "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/content" ) // defaultBlobGCMinAge is a default MinAge for blob GC. @@ -17,15 +18,20 @@ // will periodically flush its indexes more frequently than 1/hour. const defaultBlobGCMinAge = 2 * time.Hour +// treat sessions that have not been updated in 4 days as expired. +const defaultSessionExpirationAge = 4 * 24 * time.Hour + // DeleteUnreferencedBlobsOptions provides option for blob garbage collection algorithm. type DeleteUnreferencedBlobsOptions struct { - Parallel int - Prefix blob.ID - MinAge time.Duration - DryRun bool + Parallel int + Prefix blob.ID + MinAge time.Duration + SessionExpirationAge time.Duration // treat sessions that have not been written for more than X time as expired + DryRun bool } // DeleteUnreferencedBlobs deletes old blobs that are no longer referenced by index entries. +// nolint:gocyclo func DeleteUnreferencedBlobs(ctx context.Context, rep MaintainableRepository, opt DeleteUnreferencedBlobsOptions) (int, error) { if opt.Parallel == 0 { opt.Parallel = 16 @@ -35,6 +41,10 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep MaintainableRepository, op opt.MinAge = defaultBlobGCMinAge } + if opt.SessionExpirationAge == 0 { + opt.SessionExpirationAge = defaultSessionExpirationAge + } + const deleteQueueSize = 100 var unreferenced, deleted stats.CountSum @@ -68,14 +78,31 @@ func DeleteUnreferencedBlobs(ctx context.Context, rep MaintainableRepository, op var prefixes []blob.ID if p := opt.Prefix; p != "" { prefixes = append(prefixes, p) + } else { + prefixes = append(prefixes, content.PackBlobIDPrefixRegular, content.PackBlobIDPrefixSpecial, content.BlobIDPrefixSession) } + activeSessions, err := rep.ListActiveSessions(ctx) + if err != nil { + return 0, errors.Wrap(err, "unable to load active sessions") + } + + // iterate all pack blobs + session blobs and keep ones that are too young or + // belong to alive sessions. if err := rep.ContentManager().IterateUnreferencedBlobs(ctx, prefixes, opt.Parallel, func(bm blob.Metadata) error { if age := rep.Time().Sub(bm.Timestamp); age < opt.MinAge { log(ctx).Debugf(" preserving %v because it's too new (age: %v)", bm.BlobID, age) return nil } + sid := content.SessionIDFromBlobID(bm.BlobID) + if s, ok := activeSessions[sid]; ok { + if age := rep.Time().Sub(s.CheckpointTime); age < opt.SessionExpirationAge { + log(ctx).Debugf(" preserving %v because it's part of an active session (%v)", bm.BlobID, sid) + return nil + } + } + unreferenced.Add(bm.Length) if !opt.DryRun { diff --git a/repo/maintenance/blob_gc_test.go b/repo/maintenance/blob_gc_test.go new file mode 100644 index 000000000..04bff20be --- /dev/null +++ b/repo/maintenance/blob_gc_test.go @@ -0,0 +1,207 @@ +package maintenance + +import ( + "crypto/hmac" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/repotesting" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/object" +) + +var testHMACSecret = []byte{1, 2, 3} + +func TestDeleteUnreferencedBlobs(t *testing.T) { + ctx := testlogging.Context(t) + + var env repotesting.Environment + + ta := faketime.NewTimeAdvance(time.Now(), 1*time.Second) + + // setup repository without encryption and without HMAC so we can implant session blobs + defer env.Setup(t, repotesting.Options{ + OpenOptions: func(o *repo.Options) { + o.TimeNowFunc = ta.NowFunc() + }, + NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) { + nro.BlockFormat.Encryption = "NONE" + nro.BlockFormat.Hash = "HMAC-SHA256" + nro.BlockFormat.HMACSecret = testHMACSecret + }, + }).Close(ctx, t) + + w := env.Repository.NewObjectWriter(ctx, object.WriterOptions{}) + io.WriteString(w, "hello world!") + w.Result() + w.Close() + + env.Repository.Flush(ctx) + + blobsBefore, err := blob.ListAllBlobs(ctx, env.Repository.Blobs, "") + if err != nil { + t.Fatal(err) + } + + if got, want := len(blobsBefore), 3; got != want { + t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore) + } + + // add some more unreferenced blobs + const ( + extraBlobID1 blob.ID = "pdeadbeef1" + extraBlobID2 blob.ID = "pdeadbeef2" + ) + + mustPutDummyBlob(t, env.Repository.Blobs, extraBlobID1) + mustPutDummyBlob(t, env.Repository.Blobs, extraBlobID2) + verifyBlobExists(t, env.Repository.Blobs, extraBlobID1) + verifyBlobExists(t, env.Repository.Blobs, extraBlobID2) + + // new blobs not will be deleted because of minimum age requirement + if _, err = DeleteUnreferencedBlobs(ctx, env.Repository, DeleteUnreferencedBlobsOptions{ + MinAge: 1 * time.Hour, + }); err != nil { + t.Fatal(err) + } + + verifyBlobExists(t, env.Repository.Blobs, extraBlobID1) + verifyBlobExists(t, env.Repository.Blobs, extraBlobID2) + + // new blobs will be deleted + if _, err = DeleteUnreferencedBlobs(ctx, env.Repository, DeleteUnreferencedBlobsOptions{ + MinAge: 1, + }); err != nil { + t.Fatal(err) + } + + verifyBlobNotFound(t, env.Repository.Blobs, extraBlobID1) + verifyBlobNotFound(t, env.Repository.Blobs, extraBlobID2) + + // add blobs again and + const ( + extraBlobIDWithSession1 blob.ID = "pdeadbeef1-s01" + extraBlobIDWithSession2 blob.ID = "pdeadbeef2-s01" + extraBlobIDWithSession3 blob.ID = "pdeadbeef3-s02" + ) + + mustPutDummyBlob(t, env.Repository.Blobs, extraBlobIDWithSession1) + mustPutDummyBlob(t, env.Repository.Blobs, extraBlobIDWithSession2) + mustPutDummyBlob(t, env.Repository.Blobs, extraBlobIDWithSession3) + + session1Marker := mustPutDummySessionBlob(t, env.Repository.Blobs, "s01", &content.SessionInfo{ + CheckpointTime: ta.NowFunc()(), + }) + session2Marker := mustPutDummySessionBlob(t, env.Repository.Blobs, "s02", &content.SessionInfo{ + CheckpointTime: ta.NowFunc()(), + }) + + if _, err = DeleteUnreferencedBlobs(ctx, env.Repository, DeleteUnreferencedBlobsOptions{ + MinAge: 1, + }); err != nil { + t.Fatal(err) + } + + verifyBlobExists(t, env.Repository.Blobs, extraBlobIDWithSession1) + verifyBlobExists(t, env.Repository.Blobs, extraBlobIDWithSession2) + verifyBlobExists(t, env.Repository.Blobs, extraBlobIDWithSession3) + verifyBlobExists(t, env.Repository.Blobs, session1Marker) + verifyBlobExists(t, env.Repository.Blobs, session2Marker) + + // now finish session 2 + env.Repository.Blobs.DeleteBlob(ctx, session2Marker) + + if _, err = DeleteUnreferencedBlobs(ctx, env.Repository, DeleteUnreferencedBlobsOptions{ + MinAge: 1, + }); err != nil { + t.Fatal(err) + } + + verifyBlobExists(t, env.Repository.Blobs, extraBlobIDWithSession1) + verifyBlobExists(t, env.Repository.Blobs, extraBlobIDWithSession2) + verifyBlobNotFound(t, env.Repository.Blobs, extraBlobIDWithSession3) + verifyBlobExists(t, env.Repository.Blobs, session1Marker) + verifyBlobNotFound(t, env.Repository.Blobs, session2Marker) + + // now move time into the future making session 1 timed out + ta.Advance(7 * 24 * time.Hour) + + if _, err = DeleteUnreferencedBlobs(ctx, env.Repository, DeleteUnreferencedBlobsOptions{ + MinAge: 1, + }); err != nil { + t.Fatal(err) + } + + verifyBlobNotFound(t, env.Repository.Blobs, extraBlobIDWithSession1) + verifyBlobNotFound(t, env.Repository.Blobs, extraBlobIDWithSession2) + verifyBlobNotFound(t, env.Repository.Blobs, extraBlobIDWithSession3) + verifyBlobNotFound(t, env.Repository.Blobs, session1Marker) + verifyBlobNotFound(t, env.Repository.Blobs, session2Marker) + + // make sure we're back to the starting point. + + blobsAfter, err := blob.ListAllBlobs(ctx, env.Repository.Blobs, "") + if err != nil { + t.Fatal(err) + } + + if diff := cmp.Diff(blobsBefore, blobsAfter); diff != "" { + t.Errorf("unexpected diff: %v", diff) + } +} + +func verifyBlobExists(t *testing.T, st blob.Storage, blobID blob.ID) { + t.Helper() + + if _, err := st.GetMetadata(testlogging.Context(t), blobID); err != nil { + t.Fatalf("expected blob %v to exist, got %v", blobID, err) + } +} + +func verifyBlobNotFound(t *testing.T, st blob.Storage, blobID blob.ID) { + t.Helper() + + if _, err := st.GetMetadata(testlogging.Context(t), blobID); !errors.Is(err, blob.ErrBlobNotFound) { + t.Fatalf("expected blob %v to be not found, got %v", blobID, err) + } +} + +func mustPutDummyBlob(t *testing.T, st blob.Storage, blobID blob.ID) { + t.Helper() + + if err := st.PutBlob(testlogging.Context(t), blobID, gather.FromSlice([]byte{1, 2, 3})); err != nil { + t.Fatal(err) + } +} + +func mustPutDummySessionBlob(t *testing.T, st blob.Storage, sessionIDSuffix blob.ID, si *content.SessionInfo) blob.ID { + t.Helper() + + j, err := json.Marshal(si) + if err != nil { + t.Fatal(err) + } + + h := hmac.New(sha256.New, testHMACSecret) + h.Write(j) + + blobID := blob.ID(fmt.Sprintf("s%x-%v", h.Sum(nil)[16:32], sessionIDSuffix)) + + if err := st.PutBlob(testlogging.Context(t), blobID, gather.FromSlice(j)); err != nil { + t.Fatal(err) + } + + return blobID +} diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 28e38931c..2ab88a9a2 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -42,6 +42,8 @@ type MaintainableRepository interface { repo.Writer DeriveKey(purpose []byte, keyLength int) []byte + + ListActiveSessions(ctx context.Context) (map[content.SessionID]*content.SessionInfo, error) } // Supported maintenance modes. diff --git a/repo/open.go b/repo/open.go index 088b271a4..2b37b9afa 100644 --- a/repo/open.go +++ b/repo/open.go @@ -164,6 +164,8 @@ func OpenWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw cmOpts := &content.ManagerOptions{ RepositoryFormatBytes: fb, TimeNow: defaultTime(options.TimeNowFunc), + SessionUser: lc.ClientOptions.Username, + SessionHost: lc.ClientOptions.Hostname, } cm, err := content.NewManager(ctx, st, fo, caching, cmOpts) diff --git a/repo/repository.go b/repo/repository.go index 8059aec1d..f23ab8aa2 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -133,6 +133,11 @@ func (r *DirectRepository) DeleteManifest(ctx context.Context, id manifest.ID) e return r.Manifests.Delete(ctx, id) } +// ListActiveSessions returns the map of active sessions. +func (r *DirectRepository) ListActiveSessions(ctx context.Context) (map[content.SessionID]*content.SessionInfo, error) { + return r.Content.ListActiveSessions(ctx) +} + // UpdateDescription updates the description of a connected repository. func (r *DirectRepository) UpdateDescription(d string) { r.cliOpts.Description = d diff --git a/tests/end_to_end_test/restore_fail_test.go b/tests/end_to_end_test/restore_fail_test.go index 7d702e701..0f8739948 100644 --- a/tests/end_to_end_test/restore_fail_test.go +++ b/tests/end_to_end_test/restore_fail_test.go @@ -69,7 +69,7 @@ func TestRestoreFail(t *testing.T) { func findPackBlob(blobIDs []string) string { // Pattern to match "p" followed by hexadecimal digits // Ex) "pd4c69d72b75a9d3d7d9da21096c6b60a" - patternStr := fmt.Sprintf("^%s[0-9a-f]+$", content.PackBlobIDPrefixRegular) + patternStr := fmt.Sprintf("^%s[0-9a-f]+", content.PackBlobIDPrefixRegular) pattern := regexp.MustCompile(patternStr) for _, blobID := range blobIDs {