mirror of
https://github.com/kopia/kopia.git
synced 2026-03-28 02:53:05 -04:00
fixed deadlock caused by attempt to write asynchronous pack data
This commit is contained in:
@@ -71,7 +71,7 @@ func (r *ObjectManager) NewWriter(opt WriterOptions) ObjectWriter {
|
||||
splitter: r.newSplitter(),
|
||||
description: opt.Description,
|
||||
prefix: opt.BlockNamePrefix,
|
||||
disablePacking: opt.disablePacking,
|
||||
isPackInternalObject: opt.isPackInternalObject,
|
||||
packGroup: opt.PackGroup,
|
||||
}
|
||||
|
||||
@@ -196,7 +196,7 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt
|
||||
// hashEncryptAndWrite computes hash of a given buffer, optionally encrypts and writes it to storage.
|
||||
// The write is not guaranteed to complete synchronously in case write-back is used, but by the time
|
||||
// Repository.Close() returns all writes are guaranteed be over.
|
||||
func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buffer, prefix string, disablePacking bool) (ObjectID, error) {
|
||||
func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buffer, prefix string, isPackInternalObject bool) (ObjectID, error) {
|
||||
var data []byte
|
||||
if buffer != nil {
|
||||
data = buffer.Bytes()
|
||||
@@ -208,7 +208,7 @@ func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buff
|
||||
atomic.AddInt32(&r.stats.HashedBlocks, 1)
|
||||
atomic.AddInt64(&r.stats.HashedBytes, int64(len(data)))
|
||||
|
||||
if !disablePacking && r.packMgr.enabled() && r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength {
|
||||
if !isPackInternalObject && r.packMgr.enabled() && r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength {
|
||||
packOID, err := r.packMgr.AddToPack(packGroup, prefix+objectID.StorageBlock, data)
|
||||
return packOID, err
|
||||
}
|
||||
|
||||
@@ -59,8 +59,8 @@ type objectWriter struct {
|
||||
blockTracker *blockTracker
|
||||
splitter objectSplitter
|
||||
|
||||
disablePacking bool
|
||||
packGroup string
|
||||
isPackInternalObject bool
|
||||
packGroup string
|
||||
|
||||
pendingBlocksWG sync.WaitGroup
|
||||
|
||||
@@ -102,7 +102,7 @@ func (w *objectWriter) flushBuffer() error {
|
||||
w.buffer.Reset()
|
||||
|
||||
do := func() {
|
||||
objectID, err := w.repo.hashEncryptAndWrite(w.packGroup, &b2, w.prefix, w.disablePacking)
|
||||
objectID, err := w.repo.hashEncryptAndWrite(w.packGroup, &b2, w.prefix, w.isPackInternalObject)
|
||||
w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, objectID, length)
|
||||
if err != nil {
|
||||
w.err.add(fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err))
|
||||
@@ -113,7 +113,9 @@ func (w *objectWriter) flushBuffer() error {
|
||||
w.blockIndex[chunkID].Object = objectID
|
||||
}
|
||||
|
||||
if w.repo.async {
|
||||
// When writing pack internal object don't use asynchronous write, since we're already under the semaphore
|
||||
// and it may lead to a deadlock.
|
||||
if w.repo.async && !w.isPackInternalObject {
|
||||
w.repo.writeBackSemaphore.Lock()
|
||||
w.pendingBlocksWG.Add(1)
|
||||
w.repo.writeBackWG.Add(1)
|
||||
@@ -153,8 +155,8 @@ func (w *objectWriter) Result() (ObjectID, error) {
|
||||
blockTracker: w.blockTracker,
|
||||
splitter: w.repo.newSplitter(),
|
||||
|
||||
disablePacking: w.disablePacking,
|
||||
packGroup: w.packGroup,
|
||||
isPackInternalObject: w.isPackInternalObject,
|
||||
packGroup: w.packGroup,
|
||||
}
|
||||
|
||||
jw := jsonstream.NewWriter(iw, indirectStreamType)
|
||||
@@ -180,6 +182,6 @@ type WriterOptions struct {
|
||||
Description string
|
||||
PackGroup string
|
||||
|
||||
splitter objectSplitter
|
||||
disablePacking bool
|
||||
splitter objectSplitter
|
||||
isPackInternalObject bool
|
||||
}
|
||||
|
||||
@@ -176,10 +176,10 @@ func (p *packManager) flushPackIndexesLocked() error {
|
||||
|
||||
func (p *packManager) writePackIndexes(ndx packIndexes) error {
|
||||
w := p.objectManager.NewWriter(WriterOptions{
|
||||
disablePacking: true,
|
||||
Description: "pack index",
|
||||
BlockNamePrefix: packObjectPrefix,
|
||||
splitter: newNeverSplitter(),
|
||||
isPackInternalObject: true,
|
||||
Description: "pack index",
|
||||
BlockNamePrefix: packObjectPrefix,
|
||||
splitter: newNeverSplitter(),
|
||||
})
|
||||
defer w.Close()
|
||||
|
||||
@@ -210,9 +210,9 @@ func (p *packManager) finishPackLocked(g *packInfo) error {
|
||||
return nil
|
||||
}
|
||||
w := p.objectManager.NewWriter(WriterOptions{
|
||||
Description: fmt.Sprintf("pack:%v", g.currentPackID),
|
||||
splitter: newNeverSplitter(),
|
||||
disablePacking: true,
|
||||
Description: fmt.Sprintf("pack:%v", g.currentPackID),
|
||||
splitter: newNeverSplitter(),
|
||||
isPackInternalObject: true,
|
||||
})
|
||||
defer w.Close()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user