refactor(general): misc cleanups (#4615)

- nit: rename var to packCountByPrefix
- leverage impossible package
- use maps.Clone
- unexport indirectObjectID
- unexport compressed
- rename function to flushBufferLocked
- add checklocks annotations to functions that must be called under w.mu
This commit is contained in:
Julio Lopez
2025-05-29 08:26:55 -07:00
committed by GitHub
parent afc635c9c3
commit ab1f62e3ad
9 changed files with 34 additions and 44 deletions

View File

@@ -7,6 +7,7 @@
"github.com/alecthomas/kingpin/v2"
"github.com/kopia/kopia/internal/impossible"
"github.com/kopia/kopia/snapshot"
)
@@ -80,9 +81,7 @@ func (c *jsonOutput) jsonIndentedBytes(v any, indent string) []byte {
b, err = json.Marshal(v)
}
if err != nil {
panic("error serializing JSON, that should not happen: " + err.Error())
}
impossible.PanicOnError(err)
return b
}

View File

@@ -5,6 +5,8 @@
"io"
"golang.org/x/crypto/hkdf"
"github.com/kopia/kopia/internal/impossible"
)
// DeriveKeyFromMasterKey computes a key for a specific purpose and length using HKDF based on the master key.
@@ -16,9 +18,9 @@ func DeriveKeyFromMasterKey(masterKey, salt, purpose []byte, length int) []byte
key := make([]byte, length)
k := hkdf.New(sha256.New, masterKey, salt, purpose)
if _, err := io.ReadFull(k, key); err != nil {
panic("unable to derive key from master key, this should never happen")
}
_, err := io.ReadFull(k, key)
impossible.PanicOnError(err)
return key
}

View File

@@ -8,6 +8,8 @@
"io"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/impossible"
)
const compressionHeaderSize = 4
@@ -80,9 +82,7 @@ func DecompressByHeader(output io.Writer, input io.Reader) error {
}
func mustSucceed(err error) {
if err != nil {
panic("unexpected error: " + err.Error())
}
impossible.PanicOnError(err)
}
func verifyCompressionHeader(reader io.Reader, want []byte) error {

View File

@@ -187,8 +187,8 @@ func findContentInShortPacks(ctx context.Context, rep repo.DirectRepository, ch
}
var (
packNumberByPrefix = map[blob.ID]int{}
firstPackByPrefix = map[blob.ID]content.PackInfo{}
packCountByPrefix = map[blob.ID]int{}
firstPackByPrefix = map[blob.ID]content.PackInfo{}
)
err := rep.ContentReader().IteratePacks(
@@ -205,16 +205,16 @@ func(pi content.PackInfo) error {
prefix := pi.PackID[0:1]
packNumberByPrefix[prefix]++
packCountByPrefix[prefix]++
if packNumberByPrefix[prefix] == 1 {
if packCountByPrefix[prefix] == 1 {
// do not immediately compact the first pack, in case it's the only pack.
firstPackByPrefix[prefix] = pi
return nil
}
//nolint:mnd
if packNumberByPrefix[prefix] == 2 {
if packCountByPrefix[prefix] == 2 {
// when we encounter the 2nd pack, emit contents from the first one too.
for _, ci := range firstPackByPrefix[prefix].ContentInfos {
ch <- contentInfoOrError{Info: ci}

View File

@@ -14,6 +14,7 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/impossible"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/index"
@@ -114,9 +115,9 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri
defer buf.Close()
gz := gzip.NewWriter(&buf)
mustSucceed(json.NewEncoder(gz).Encode(man))
mustSucceed(gz.Flush())
mustSucceed(gz.Close())
impossible.PanicOnError(json.NewEncoder(gz).Encode(man))
impossible.PanicOnError(gz.Flush())
impossible.PanicOnError(gz.Close())
// TODO: Configure manifest metadata compression with Policy setting
contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, compression.HeaderZstdFastest)

View File

@@ -6,6 +6,7 @@
"crypto/rand"
"encoding/hex"
"encoding/json"
"maps"
"sort"
"sync"
"time"
@@ -86,7 +87,7 @@ func (m *Manager) Put(_ context.Context, labels map[string]string, payload any)
e := &manifestEntry{
ID: ID(hex.EncodeToString(random)),
ModTime: m.timeNow().UTC(),
Labels: copyLabels(labels),
Labels: maps.Clone(labels),
Content: b,
}
@@ -192,7 +193,7 @@ func (m *Manager) Find(ctx context.Context, labels map[string]string) ([]*EntryM
func cloneEntryMetadata(e *manifestEntry) *EntryMetadata {
return &EntryMetadata{
ID: e.ID,
Labels: copyLabels(e.Labels),
Labels: maps.Clone(e.Labels),
Length: len(e.Content),
ModTime: e.ModTime,
}
@@ -222,12 +223,6 @@ func (m *Manager) Flush(ctx context.Context) error {
return err
}
func mustSucceed(e error) {
if e != nil {
panic("unexpected failure: " + e.Error())
}
}
// Delete marks the specified manifest ID for deletion.
func (m *Manager) Delete(ctx context.Context, id ID) error {
com, err := m.committed.getCommittedEntryOrNil(ctx, id)
@@ -278,15 +273,6 @@ func IDsFromStrings(input []string) []ID {
return result
}
func copyLabels(m map[string]string) map[string]string {
r := map[string]string{}
for k, v := range m {
r[k] = v
}
return r
}
// ManagerOptions are optional parameters for Manager creation.
type ManagerOptions struct {
TimeNow func() time.Time // Time provider

View File

@@ -148,7 +148,7 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp
return EmptyID, errors.Wrap(err, "error writing concatenated index")
}
return IndirectObjectID(concatID), nil
return indirectObjectID(concatID), nil
}
func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEntries []IndirectObjectEntry, startingLength int64, objectID ID) (result []IndirectObjectEntry, totalLength int64, _ error) {

View File

@@ -128,7 +128,7 @@ func (w *objectWriter) Write(data []byte) (n int, err error) {
// found a split point after `n` bytes, write first n bytes then flush and repeat with the remainder.
w.buffer.Append(data[0:n])
if err := w.flushBuffer(); err != nil {
if err := w.flushBufferLocked(); err != nil {
return 0, err
}
@@ -138,7 +138,8 @@ func (w *objectWriter) Write(data []byte) (n int, err error) {
return dataLen, nil
}
func (w *objectWriter) flushBuffer() error {
// +checklocks:w.mu
func (w *objectWriter) flushBufferLocked() error {
length := w.buffer.Length()
// hold a lock as we may grow the index
@@ -239,7 +240,7 @@ func maybeCompressedObjectID(contentID content.ID, isCompressed bool) ID {
oid := DirectObjectID(contentID)
if isCompressed {
oid = Compressed(oid)
oid = compressed(oid)
}
return oid
@@ -266,7 +267,7 @@ func (w *objectWriter) Result() (ID, error) {
// no need to hold a lock on w.indirectIndexGrowMutex, since growing index only happens synchronously
// and never in parallel with calling Result()
if w.buffer.Length() > 0 || len(w.indirectIndex) == 0 {
if err := w.flushBuffer(); err != nil {
if err := w.flushBufferLocked(); err != nil {
return EmptyID, err
}
}
@@ -283,6 +284,7 @@ func (w *objectWriter) Checkpoint() (ID, error) {
return w.checkpointLocked()
}
// +checklocks:w.mu
func (w *objectWriter) checkpointLocked() (ID, error) {
// wait for any in-flight asynchronous writes to finish
w.asyncWritesWG.Wait()
@@ -325,7 +327,7 @@ func (w *objectWriter) checkpointLocked() (ID, error) {
return EmptyID, err
}
return IndirectObjectID(oid), nil
return indirectObjectID(oid), nil
}
func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error {

View File

@@ -148,14 +148,14 @@ func DirectObjectID(contentID content.ID) ID {
return ID{cid: contentID}
}
// Compressed returns object ID with 'Z' prefix indicating it's compressed.
func Compressed(objectID ID) ID {
// compressed returns object ID with 'Z' prefix indicating it's compressed.
func compressed(objectID ID) ID {
objectID.compression = true
return objectID
}
// IndirectObjectID returns indirect object ID based on the underlying index object ID.
func IndirectObjectID(indexObjectID ID) ID {
// indirectObjectID returns indirect object ID based on the underlying index object ID.
func indirectObjectID(indexObjectID ID) ID {
indexObjectID.indirection++
return indexObjectID
}