Files
kopia/repo/object/object_manager.go
Jarek Kowalski 9bf9cac7fb refactor(repository): ensure we always parse content.ID and object.ID (#1960)
* refactor(repository): ensure we always parse content.ID and object.ID

This changes the types to be incompatible with string to prevent direct
conversion to and from string.

This has the additional benefit of reducing number of memory allocations
and bytes for all IDs.

content.ID went from 2 allocations to 1:
   typical case 32 characters + 16 bytes per-string overhead
   worst-case 65 characters + 16 bytes per-string overhead
   now: 34 bytes

object.ID went from 2 allocations to 1:
   typical case 32 characters + 16 bytes per-string overhead
   worst-case 65 characters + 16 bytes per-string overhead
   now: 36 bytes

* move index.{ID,IDRange} methods to separate files

* replaced index.IDFromHash with content.IDFromHash externally

* minor tweaks and additional tests

* Update repo/content/index/id_test.go

Co-authored-by: Julio Lopez <1953782+julio-lopez@users.noreply.github.com>

* Update repo/content/index/id_test.go

Co-authored-by: Julio Lopez <1953782+julio-lopez@users.noreply.github.com>

* pr feedback

* post-merge fixes

* pr feedback

* pr feedback

* fixed subtle regression in sortedContents()

This was actually not producing invalid results because of how base36
works, just not sorting as efficiently as it could.

Co-authored-by: Julio Lopez <1953782+julio-lopez@users.noreply.github.com>
2022-05-25 14:15:56 +00:00

230 lines
7.2 KiB
Go

// Package object implements repository support for content-addressable objects of arbitrary size.
package object
import (
"context"
"io"
"sync"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/splitter"
)
// ErrObjectNotFound is returned when an object cannot be found.
var ErrObjectNotFound = errors.New("object not found")
// Reader allows reading, seeking, getting the length of and closing of a repository object.
type Reader interface {
io.Reader
io.Seeker
io.Closer
Length() int64
}
type contentReader interface {
ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error)
GetContent(ctx context.Context, contentID content.ID) ([]byte, error)
PrefetchContents(ctx context.Context, contentIDs []content.ID, prefetchHint string) []content.ID
}
type contentManager interface {
contentReader
SupportsContentCompression() bool
WriteContent(ctx context.Context, data gather.Bytes, prefix content.IDPrefix, comp compression.HeaderID) (content.ID, error)
}
// Format describes the format of objects in a repository.
type Format struct {
Splitter string `json:"splitter,omitempty"` // splitter used to break objects into pieces of content
}
// Manager implements a content-addressable storage on top of blob storage.
type Manager struct {
Format Format
contentMgr contentManager
newSplitter splitter.Factory
writerPool sync.Pool
}
// NewWriter creates an ObjectWriter for writing to the repository.
func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w, _ := om.writerPool.Get().(*objectWriter)
w.ctx = ctx
w.om = om
w.splitter = om.newSplitter()
w.description = opt.Description
w.prefix = opt.Prefix
w.compressor = compression.ByName[opt.Compressor]
w.totalLength = 0
w.currentPosition = 0
// point the slice at the embedded array, so that we avoid allocations most of the time
w.indirectIndex = w.indirectIndexBuf[:0]
if opt.AsyncWrites > 0 {
if len(w.asyncWritesSemaphore) != 0 || cap(w.asyncWritesSemaphore) != opt.AsyncWrites {
w.asyncWritesSemaphore = make(chan struct{}, opt.AsyncWrites)
}
} else {
w.asyncWritesSemaphore = nil
}
w.buffer.Reset()
w.contentWriteError = nil
return w
}
func (om *Manager) closedWriter(ow *objectWriter) {
om.writerPool.Put(ow)
}
// Concatenate creates an object that's a result of concatenation of other objects. This is more efficient than reading
// and rewriting the objects because Concatenate can efficiently merge index entries without reading the underlying
// contents.
//
// This function exists primarily to facilitate efficient parallel uploads of very large files (>1GB). Due to bottleneck of
// splitting which is inherently sequential, we can only one use CPU core for each Writer, which limits throughput.
//
// For example when uploading a 100 GB file it is beneficial to independently upload sections of [0..25GB),
// [25..50GB), [50GB..75GB) and [75GB..100GB) and concatenate them together as this allows us to run four splitters
// in parallel utilizing more CPU cores. Because some split points now start at fixed bounaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.Errorf("empty list of objects")
}
if len(objectIDs) == 1 {
return objectIDs[0], nil
}
var (
concatenatedEntries []indirectObjectEntry
totalLength int64
err error
)
for _, objectID := range objectIDs {
concatenatedEntries, totalLength, err = appendIndexEntriesForObject(ctx, om.contentMgr, concatenatedEntries, totalLength, objectID)
if err != nil {
return EmptyID, errors.Wrapf(err, "error appending %v", objectID)
}
}
log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength)
w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
})
defer w.Close() // nolint:errcheck
if werr := writeIndirectObject(w, concatenatedEntries); werr != nil {
return EmptyID, werr
}
concatID, err := w.Result()
if err != nil {
return EmptyID, errors.Wrap(err, "error writing concatenated index")
}
return IndirectObjectID(concatID), nil
}
func appendIndexEntriesForObject(ctx context.Context, cr contentReader, indexEntries []indirectObjectEntry, startingLength int64, objectID ID) (result []indirectObjectEntry, totalLength int64, _ error) {
if indexObjectID, ok := objectID.IndexObjectID(); ok {
ndx, err := loadSeekTable(ctx, cr, indexObjectID)
if err != nil {
return nil, 0, errors.Wrapf(err, "error reading index of %v", objectID)
}
indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, ndx...)
return indexEntries, totalLength, nil
}
// non-index object - the precise length of the object cannot be determined from content due to compression and padding,
// so we must open the object to read its length.
r, err := Open(ctx, cr, objectID)
if err != nil {
return nil, 0, errors.Wrapf(err, "error opening %v", objectID)
}
defer r.Close() //nolint:errcheck
indexEntries, totalLength = appendIndexEntries(indexEntries, startingLength, indirectObjectEntry{
Start: 0,
Length: r.Length(),
Object: objectID,
})
return indexEntries, totalLength, nil
}
func appendIndexEntries(indexEntries []indirectObjectEntry, startingLength int64, incoming ...indirectObjectEntry) (result []indirectObjectEntry, totalLength int64) {
totalLength = startingLength
for _, inc := range incoming {
indexEntries = append(indexEntries, indirectObjectEntry{
Start: inc.Start + startingLength,
Length: inc.Length,
Object: inc.Object,
})
totalLength += inc.Length
}
return indexEntries, totalLength
}
func noop(contentID content.ID) error { return nil }
// PrefetchBackingContents attempts to brings contents backing the provided object IDs into the cache.
// This may succeed only partially due to cache size limits and other.
// Returns the list of content IDs prefetched.
func PrefetchBackingContents(ctx context.Context, contentMgr contentManager, objectIDs []ID, hint string) ([]content.ID, error) {
tracker := &contentIDTracker{}
for _, oid := range objectIDs {
if err := iterateBackingContents(ctx, contentMgr, oid, tracker, noop); err != nil && !errors.Is(err, ErrObjectNotFound) && !errors.Is(err, content.ErrContentNotFound) {
return nil, err
}
}
return contentMgr.PrefetchContents(ctx, tracker.contentIDs(), hint), nil
}
// NewObjectManager creates an ObjectManager with the specified content manager and format.
func NewObjectManager(ctx context.Context, bm contentManager, f Format) (*Manager, error) {
om := &Manager{
contentMgr: bm,
Format: f,
}
om.writerPool = sync.Pool{
New: func() interface{} {
return new(objectWriter)
},
}
splitterID := f.Splitter
if splitterID == "" {
splitterID = "FIXED"
}
os := splitter.GetFactory(splitterID)
if os == nil {
return nil, errors.Errorf("unsupported splitter %q", f.Splitter)
}
om.newSplitter = splitter.Pooled(os)
return om, nil
}