From ab1f62e3adc875f4b06034bf249b2ee4dc01715c Mon Sep 17 00:00:00 2001 From: Julio Lopez <1953782+julio-lopez@users.noreply.github.com> Date: Thu, 29 May 2025 08:26:55 -0700 Subject: [PATCH] refactor(general): misc cleanups (#4615) - nit: rename var to packCountByPrefix - leverage impossible package - use maps.Clone - unexport indirectObjectID - unexport compressed - rename function to flushBufferLocked - add checklocks annotations to functions that must be called under w.mu --- cli/json_output.go | 5 ++--- internal/crypto/key_derivation.go | 8 +++++--- repo/compression/compressor.go | 6 +++--- repo/maintenance/content_rewrite.go | 10 +++++----- repo/manifest/committed_manifest_manager.go | 7 ++++--- repo/manifest/manifest_manager.go | 20 +++----------------- repo/object/object_manager.go | 2 +- repo/object/object_writer.go | 12 +++++++----- repo/object/objectid.go | 8 ++++---- 9 files changed, 34 insertions(+), 44 deletions(-) diff --git a/cli/json_output.go b/cli/json_output.go index 6ab849ead..c801d4b85 100644 --- a/cli/json_output.go +++ b/cli/json_output.go @@ -7,6 +7,7 @@ "github.com/alecthomas/kingpin/v2" + "github.com/kopia/kopia/internal/impossible" "github.com/kopia/kopia/snapshot" ) @@ -80,9 +81,7 @@ func (c *jsonOutput) jsonIndentedBytes(v any, indent string) []byte { b, err = json.Marshal(v) } - if err != nil { - panic("error serializing JSON, that should not happen: " + err.Error()) - } + impossible.PanicOnError(err) return b } diff --git a/internal/crypto/key_derivation.go b/internal/crypto/key_derivation.go index 48c018a70..c8edbc30a 100644 --- a/internal/crypto/key_derivation.go +++ b/internal/crypto/key_derivation.go @@ -5,6 +5,8 @@ "io" "golang.org/x/crypto/hkdf" + + "github.com/kopia/kopia/internal/impossible" ) // DeriveKeyFromMasterKey computes a key for a specific purpose and length using HKDF based on the master key. @@ -16,9 +18,9 @@ func DeriveKeyFromMasterKey(masterKey, salt, purpose []byte, length int) []byte key := make([]byte, length) k := hkdf.New(sha256.New, masterKey, salt, purpose) - if _, err := io.ReadFull(k, key); err != nil { - panic("unable to derive key from master key, this should never happen") - } + _, err := io.ReadFull(k, key) + + impossible.PanicOnError(err) return key } diff --git a/repo/compression/compressor.go b/repo/compression/compressor.go index 858d1d68e..52e10856f 100644 --- a/repo/compression/compressor.go +++ b/repo/compression/compressor.go @@ -8,6 +8,8 @@ "io" "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/impossible" ) const compressionHeaderSize = 4 @@ -80,9 +82,7 @@ func DecompressByHeader(output io.Writer, input io.Reader) error { } func mustSucceed(err error) { - if err != nil { - panic("unexpected error: " + err.Error()) - } + impossible.PanicOnError(err) } func verifyCompressionHeader(reader io.Reader, want []byte) error { diff --git a/repo/maintenance/content_rewrite.go b/repo/maintenance/content_rewrite.go index 3c737a573..360f96d63 100644 --- a/repo/maintenance/content_rewrite.go +++ b/repo/maintenance/content_rewrite.go @@ -187,8 +187,8 @@ func findContentInShortPacks(ctx context.Context, rep repo.DirectRepository, ch } var ( - packNumberByPrefix = map[blob.ID]int{} - firstPackByPrefix = map[blob.ID]content.PackInfo{} + packCountByPrefix = map[blob.ID]int{} + firstPackByPrefix = map[blob.ID]content.PackInfo{} ) err := rep.ContentReader().IteratePacks( @@ -205,16 +205,16 @@ func(pi content.PackInfo) error { prefix := pi.PackID[0:1] - packNumberByPrefix[prefix]++ + packCountByPrefix[prefix]++ - if packNumberByPrefix[prefix] == 1 { + if packCountByPrefix[prefix] == 1 { // do not immediately compact the first pack, in case it's the only pack. firstPackByPrefix[prefix] = pi return nil } //nolint:mnd - if packNumberByPrefix[prefix] == 2 { + if packCountByPrefix[prefix] == 2 { // when we encounter the 2nd pack, emit contents from the first one too. for _, ci := range firstPackByPrefix[prefix].ContentInfos { ch <- contentInfoOrError{Info: ci} diff --git a/repo/manifest/committed_manifest_manager.go b/repo/manifest/committed_manifest_manager.go index 62afda9b9..90e320dee 100644 --- a/repo/manifest/committed_manifest_manager.go +++ b/repo/manifest/committed_manifest_manager.go @@ -14,6 +14,7 @@ "github.com/pkg/errors" "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/impossible" "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/content/index" @@ -114,9 +115,9 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri defer buf.Close() gz := gzip.NewWriter(&buf) - mustSucceed(json.NewEncoder(gz).Encode(man)) - mustSucceed(gz.Flush()) - mustSucceed(gz.Close()) + impossible.PanicOnError(json.NewEncoder(gz).Encode(man)) + impossible.PanicOnError(gz.Flush()) + impossible.PanicOnError(gz.Close()) // TODO: Configure manifest metadata compression with Policy setting contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, compression.HeaderZstdFastest) diff --git a/repo/manifest/manifest_manager.go b/repo/manifest/manifest_manager.go index 2d6c5755c..ba2fb5441 100644 --- a/repo/manifest/manifest_manager.go +++ b/repo/manifest/manifest_manager.go @@ -6,6 +6,7 @@ "crypto/rand" "encoding/hex" "encoding/json" + "maps" "sort" "sync" "time" @@ -86,7 +87,7 @@ func (m *Manager) Put(_ context.Context, labels map[string]string, payload any) e := &manifestEntry{ ID: ID(hex.EncodeToString(random)), ModTime: m.timeNow().UTC(), - Labels: copyLabels(labels), + Labels: maps.Clone(labels), Content: b, } @@ -192,7 +193,7 @@ func (m *Manager) Find(ctx context.Context, labels map[string]string) ([]*EntryM func cloneEntryMetadata(e *manifestEntry) *EntryMetadata { return &EntryMetadata{ ID: e.ID, - Labels: copyLabels(e.Labels), + Labels: maps.Clone(e.Labels), Length: len(e.Content), ModTime: e.ModTime, } @@ -222,12 +223,6 @@ func (m *Manager) Flush(ctx context.Context) error { return err } -func mustSucceed(e error) { - if e != nil { - panic("unexpected failure: " + e.Error()) - } -} - // Delete marks the specified manifest ID for deletion. func (m *Manager) Delete(ctx context.Context, id ID) error { com, err := m.committed.getCommittedEntryOrNil(ctx, id) @@ -278,15 +273,6 @@ func IDsFromStrings(input []string) []ID { return result } -func copyLabels(m map[string]string) map[string]string { - r := map[string]string{} - for k, v := range m { - r[k] = v - } - - return r -} - // ManagerOptions are optional parameters for Manager creation. type ManagerOptions struct { TimeNow func() time.Time // Time provider diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index 0e088a601..d04b4acb3 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -148,7 +148,7 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp return EmptyID, errors.Wrap(err, "error writing concatenated index") } - return IndirectObjectID(concatID), nil + return indirectObjectID(concatID), nil } func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEntries []IndirectObjectEntry, startingLength int64, objectID ID) (result []IndirectObjectEntry, totalLength int64, _ error) { diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index 85b51c328..1d60009fc 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -128,7 +128,7 @@ func (w *objectWriter) Write(data []byte) (n int, err error) { // found a split point after `n` bytes, write first n bytes then flush and repeat with the remainder. w.buffer.Append(data[0:n]) - if err := w.flushBuffer(); err != nil { + if err := w.flushBufferLocked(); err != nil { return 0, err } @@ -138,7 +138,8 @@ func (w *objectWriter) Write(data []byte) (n int, err error) { return dataLen, nil } -func (w *objectWriter) flushBuffer() error { +// +checklocks:w.mu +func (w *objectWriter) flushBufferLocked() error { length := w.buffer.Length() // hold a lock as we may grow the index @@ -239,7 +240,7 @@ func maybeCompressedObjectID(contentID content.ID, isCompressed bool) ID { oid := DirectObjectID(contentID) if isCompressed { - oid = Compressed(oid) + oid = compressed(oid) } return oid @@ -266,7 +267,7 @@ func (w *objectWriter) Result() (ID, error) { // no need to hold a lock on w.indirectIndexGrowMutex, since growing index only happens synchronously // and never in parallel with calling Result() if w.buffer.Length() > 0 || len(w.indirectIndex) == 0 { - if err := w.flushBuffer(); err != nil { + if err := w.flushBufferLocked(); err != nil { return EmptyID, err } } @@ -283,6 +284,7 @@ func (w *objectWriter) Checkpoint() (ID, error) { return w.checkpointLocked() } +// +checklocks:w.mu func (w *objectWriter) checkpointLocked() (ID, error) { // wait for any in-flight asynchronous writes to finish w.asyncWritesWG.Wait() @@ -325,7 +327,7 @@ func (w *objectWriter) checkpointLocked() (ID, error) { return EmptyID, err } - return IndirectObjectID(oid), nil + return indirectObjectID(oid), nil } func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error { diff --git a/repo/object/objectid.go b/repo/object/objectid.go index db8f53aa9..8d8f14735 100644 --- a/repo/object/objectid.go +++ b/repo/object/objectid.go @@ -148,14 +148,14 @@ func DirectObjectID(contentID content.ID) ID { return ID{cid: contentID} } -// Compressed returns object ID with 'Z' prefix indicating it's compressed. -func Compressed(objectID ID) ID { +// compressed returns object ID with 'Z' prefix indicating it's compressed. +func compressed(objectID ID) ID { objectID.compression = true return objectID } -// IndirectObjectID returns indirect object ID based on the underlying index object ID. -func IndirectObjectID(indexObjectID ID) ID { +// indirectObjectID returns indirect object ID based on the underlying index object ID. +func indirectObjectID(indexObjectID ID) ID { indexObjectID.indirection++ return indexObjectID }