Files
kopia/cli/command_snapshot_create.go
Julio López d886957c7d feat(snapshots): reduce kernel page-cache pressure during snapshots (#5437)
Add fadvise hints to backup read path to reduce OS cache pollution

Issue FADV_SEQUENTIAL at open and FADV_DONTNEED at close for every file read during backup. This tells the kernel to read-ahead aggressively and then reclaim pages once the file is fully consumed, reducing cache pressure from large backups without adding measurable overhead.

Uses SyscallConn().Control() instead of os.File.Fd() to avoid switching file descriptors to blocking mode, which would remove them from Go's epoll poller and stall the upload pipeline.

Incremental FADV_DONTNEED during reads was tested and removed — it adds ~15% overhead on large-file workloads by fighting the kernel's own LRU. Restore write-path cache eviction was also tested and not included.

Authored-by: Rajat Gupta rajat.gupta@veeam.com
2026-06-26 19:29:13 -07:00

517 lines
16 KiB
Go

package cli
import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/fs"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/fs/virtualfs"
"github.com/kopia/kopia/notification"
"github.com/kopia/kopia/notification/notifydata"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/upload"
)
const (
maxSnapshotDescriptionLength = 1024
timeFormat = "2006-01-02 15:04:05 MST"
)
type commandSnapshotCreate struct {
snapshotCreateSources []string
snapshotCreateAll bool
snapshotCreateDescription string
snapshotCreateCheckpointInterval time.Duration
snapshotCreateFailFast bool
snapshotCreateForceHash float64
snapshotCreateParallelUploads int
snapshotCreateStartTime string
snapshotCreateEndTime string
snapshotCreateForceEnableActions bool
snapshotCreateForceDisableActions bool
snapshotCreateStdinFileName string
snapshotCreateCheckpointUploadLimitMB int64
snapshotCreateTags []string
flushPerSource bool
sourceOverride string
sendSnapshotReport bool
snapshotCreateStreamingReads bool
pins []string
logDirDetail int
logEntryDetail int
jo jsonOutput
svc appServices
out textOutput
}
func (c *commandSnapshotCreate) setup(svc appServices, parent commandParent) {
cmd := parent.Command("create", "Creates a snapshot of local directory or file.")
cmd.Arg("source", "Files or directories to create snapshot(s) of.").StringsVar(&c.snapshotCreateSources)
cmd.Flag("all", "Create snapshots for files or directories previously backed up by this user on this computer. Cannot be used when a source path argument is also specified.").BoolVar(&c.snapshotCreateAll)
cmd.Flag("upload-limit-mb", "Stop the backup process after the specified amount of data (in MB) has been uploaded.").PlaceHolder("MB").Default("0").Int64Var(&c.snapshotCreateCheckpointUploadLimitMB)
cmd.Flag("checkpoint-interval", "Interval between periodic checkpoints (must be <= 45 minutes).").Hidden().DurationVar(&c.snapshotCreateCheckpointInterval)
cmd.Flag("description", "Free-form snapshot description.").StringVar(&c.snapshotCreateDescription)
cmd.Flag("fail-fast", "Fail fast when creating snapshot.").Envar(svc.EnvName("KOPIA_SNAPSHOT_FAIL_FAST")).BoolVar(&c.snapshotCreateFailFast)
cmd.Flag("force-hash", "Force hashing of source files for a given percentage of files [0.0 .. 100.0]").Default("0").Float64Var(&c.snapshotCreateForceHash)
cmd.Flag("parallel", "Upload N files in parallel").PlaceHolder("N").Default("0").IntVar(&c.snapshotCreateParallelUploads)
cmd.Flag("start-time", "Override snapshot start timestamp.").StringVar(&c.snapshotCreateStartTime)
cmd.Flag("end-time", "Override snapshot end timestamp.").StringVar(&c.snapshotCreateEndTime)
cmd.Flag("force-enable-actions", "Enable snapshot actions even if globally disabled on this client").Hidden().BoolVar(&c.snapshotCreateForceEnableActions)
cmd.Flag("force-disable-actions", "Disable snapshot actions even if globally enabled on this client").Hidden().BoolVar(&c.snapshotCreateForceDisableActions)
cmd.Flag("stdin-file", "File path to be used for stdin data snapshot.").StringVar(&c.snapshotCreateStdinFileName)
cmd.Flag("tags", "Tags applied on the snapshot. Must be provided in the <key>:<value> format.").StringsVar(&c.snapshotCreateTags)
cmd.Flag("pin", "Create a pinned snapshot that will not expire automatically").StringsVar(&c.pins)
cmd.Flag("flush-per-source", "Flush writes at the end of each source").Hidden().BoolVar(&c.flushPerSource)
cmd.Flag("override-source", "Override the source of the snapshot.").StringVar(&c.sourceOverride)
cmd.Flag("send-snapshot-report", "Send a snapshot report notification using configured notification profiles").Default("true").BoolVar(&c.sendSnapshotReport)
cmd.Flag("hint-streaming-reads", "Release file memory after reading to reduce backup memory footprint (Linux only, best-effort).").
Default("false").Hidden().BoolVar(&c.snapshotCreateStreamingReads)
c.logDirDetail = -1
c.logEntryDetail = -1
cmd.Flag("log-dir-detail", "Override log level for directories").IntVar(&c.logDirDetail)
cmd.Flag("log-entry-detail", "Override log level for entries").IntVar(&c.logEntryDetail)
c.jo.setup(svc, cmd)
c.out.setup(svc)
c.svc = svc
cmd.Action(svc.repositoryWriterAction(c.run))
}
//nolint:gocyclo
func (c *commandSnapshotCreate) run(ctx context.Context, rep repo.RepositoryWriter) error {
sources := c.snapshotCreateSources
if c.snapshotCreateAll && len(sources) > 0 {
return errors.New("cannot use --all when a source path argument is specified")
}
if err := maybeAutoUpgradeRepository(ctx, rep); err != nil {
return errors.Wrap(err, "error upgrading repository")
}
if c.snapshotCreateAll {
local, err := getLocalBackupPaths(ctx, rep)
if err != nil {
return err
}
sources = append(sources, local...)
}
if len(sources) == 0 {
return errors.New("no snapshot sources")
}
if err := validateStartEndTime(c.snapshotCreateStartTime, c.snapshotCreateEndTime); err != nil {
return err
}
if len(c.snapshotCreateDescription) > maxSnapshotDescriptionLength {
return errors.New("description too long")
}
localfsOpts := localfs.Options{StreamingReads: c.snapshotCreateStreamingReads}
u := c.setupUploader(rep)
var finalErrors []string
tags, err := getTags(c.snapshotCreateTags)
if err != nil {
return err
}
var st notifydata.MultiSnapshotStatus
for _, snapshotDir := range sources {
if u.IsCanceled() {
log(ctx).Info("Upload canceled")
break
}
fsEntry, sourceInfo, setManual, err := c.getContentToSnapshot(ctx, snapshotDir, rep, localfsOpts)
if err != nil {
finalErrors = append(finalErrors, fmt.Sprintf("failed to prepare source: %s", err))
}
if err := c.snapshotSingleSource(ctx, fsEntry, setManual, rep, u, sourceInfo, tags, &st); err != nil {
finalErrors = append(finalErrors, err.Error())
}
}
if c.sendSnapshotReport {
notification.Send(ctx, rep, "snapshot-report", st, c.reportSeverity(st), c.svc.notificationTemplateOptions())
}
// ensure we flush at least once in the session to properly close all pending buffers,
// otherwise the session will be reported as memory leak.
// by default the wrapper function does not flush on errors, which is what we want to do always.
if !c.flushPerSource {
if ferr := rep.Flush(ctx); ferr != nil {
return errors.Wrap(ferr, "flush error")
}
}
if len(finalErrors) == 0 {
return nil
}
if len(finalErrors) == 1 {
return errors.New(finalErrors[0])
}
return errors.Errorf("encountered %v errors:\n%v", len(finalErrors), strings.Join(finalErrors, "\n"))
}
func (c *commandSnapshotCreate) reportSeverity(st notifydata.MultiSnapshotStatus) notification.Severity {
switch st.OverallStatusCode() {
case notifydata.StatusCodeFatal:
return notification.SeverityError
case notifydata.StatusCodeWarnings:
return notification.SeverityWarning
default:
return notification.SeverityReport
}
}
func getTags(tagStrings []string) (map[string]string, error) {
numberOfPartsInTagString := 2
// tagKeyPrefix is the prefix for user defined tag keys.
tagKeyPrefix := "tag:"
tags := map[string]string{}
for _, tagkv := range tagStrings {
parts := strings.SplitN(tagkv, ":", numberOfPartsInTagString)
if len(parts) != numberOfPartsInTagString {
return nil, errors.Errorf("Invalid tag format (%s). Requires <key>:<value>", tagkv)
}
key := tagKeyPrefix + parts[0]
if _, ok := tags[key]; ok {
return nil, errors.Errorf("Duplicate tag <key> found. (%s)", parts[0])
}
tags[key] = parts[1]
}
return tags, nil
}
func validateStartEndTime(st, et string) error {
startTime, err := parseTimestamp(st)
if err != nil {
return errors.Wrap(err, "could not parse start-time")
}
endTime, err := parseTimestamp(et)
if err != nil {
return errors.Wrap(err, "could not parse end-time")
}
if startTimeAfterEndTime(startTime, endTime) {
return errors.New("start time override cannot be after the end time override")
}
return nil
}
func (c *commandSnapshotCreate) setupUploader(rep repo.RepositoryWriter) *upload.Uploader {
u := upload.NewUploader(rep)
u.MaxUploadBytes = c.snapshotCreateCheckpointUploadLimitMB << 20 //nolint:mnd
if c.snapshotCreateForceEnableActions {
u.EnableActions = true
}
if c.snapshotCreateForceDisableActions {
u.EnableActions = false
}
if l := c.logDirDetail; l != -1 {
ld := policy.LogDetail(l)
u.OverrideDirLogDetail = &ld
}
if l := c.logEntryDetail; l != -1 {
ld := policy.LogDetail(l)
u.OverrideEntryLogDetail = &ld
}
if interval := c.snapshotCreateCheckpointInterval; interval != 0 {
u.CheckpointInterval = interval
}
c.svc.onTerminate(u.Cancel)
u.ForceHashPercentage = c.snapshotCreateForceHash
u.ParallelUploads = c.snapshotCreateParallelUploads
u.FailFast = c.snapshotCreateFailFast
u.Progress = c.svc.getProgress()
return u
}
func parseTimestamp(timestamp string) (time.Time, error) {
if timestamp == "" {
return time.Time{}, nil
}
//nolint:wrapcheck
return time.Parse(timeFormat, timestamp)
}
func startTimeAfterEndTime(startTime, endTime time.Time) bool {
return !startTime.IsZero() &&
!endTime.IsZero() &&
startTime.After(endTime)
}
//nolint:gocyclo,funlen
func (c *commandSnapshotCreate) snapshotSingleSource(
ctx context.Context,
fsEntry fs.Entry,
setManual bool,
rep repo.RepositoryWriter,
u *upload.Uploader,
sourceInfo snapshot.SourceInfo,
tags map[string]string,
st *notifydata.MultiSnapshotStatus,
) (finalErr error) {
log(ctx).Infof("Snapshotting %v ...", sourceInfo)
var mwe notifydata.ManifestWithError
mwe.Manifest.Source = sourceInfo
st.Snapshots = append(st.Snapshots, &mwe)
defer func() {
if finalErr != nil {
mwe.Error = finalErr.Error()
}
}()
var previous []*snapshot.Manifest
previous, finalErr = snapshot.FindPreviousManifests(ctx, rep, sourceInfo, nil)
if finalErr != nil {
return errors.Wrap(finalErr, "unable to find previous manifests")
}
if len(previous) > 0 {
mwe.Previous = previous[0]
}
policyTree, finalErr := policy.TreeForSource(ctx, rep, sourceInfo)
if finalErr != nil {
return errors.Wrap(finalErr, "unable to get policy tree")
}
manifest, finalErr := u.Upload(ctx, fsEntry, policyTree, sourceInfo, previous...)
if finalErr != nil {
// fail-fast uploads will fail here without recording a manifest, other uploads will
// possibly fail later.
return errors.Wrap(finalErr, "upload error")
}
manifest.Description = c.snapshotCreateDescription
manifest.Tags = tags
manifest.UpdatePins(c.pins, nil)
startTimeOverride, _ := parseTimestamp(c.snapshotCreateStartTime)
endTimeOverride, _ := parseTimestamp(c.snapshotCreateEndTime)
if !startTimeOverride.IsZero() {
if endTimeOverride.IsZero() {
// Calculate the correct end time based on current duration if they're not specified
duration := manifest.EndTime.Sub(manifest.StartTime)
manifest.EndTime = fs.UTCTimestampFromTime(startTimeOverride).Add(duration)
}
manifest.StartTime = fs.UTCTimestampFromTime(startTimeOverride)
}
if !endTimeOverride.IsZero() {
if startTimeOverride.IsZero() {
inverseDuration := manifest.StartTime.Sub(manifest.EndTime)
manifest.StartTime = fs.UTCTimestampFromTime(endTimeOverride).Add(inverseDuration)
}
manifest.EndTime = fs.UTCTimestampFromTime(endTimeOverride)
}
mwe.Manifest = *manifest
ignoreIdenticalSnapshot := policyTree.EffectivePolicy().RetentionPolicy.IgnoreIdenticalSnapshots.OrDefault(false)
if ignoreIdenticalSnapshot && len(previous) > 0 {
if previous[0].RootObjectID() == manifest.RootObjectID() {
log(ctx).Info("\n Not saving snapshot because no files have been changed since previous snapshot")
return nil
}
}
if _, finalErr = snapshot.SaveSnapshot(ctx, rep, manifest); finalErr != nil {
return errors.Wrap(finalErr, "cannot save manifest")
}
if _, finalErr = policy.ApplyRetentionPolicy(ctx, rep, sourceInfo, true); finalErr != nil {
return errors.Wrap(finalErr, "unable to apply retention policy")
}
if setManual {
if finalErr = policy.SetManual(ctx, rep, sourceInfo); finalErr != nil {
return errors.Wrap(finalErr, "unable to set manual field in scheduling policy for source")
}
}
if c.flushPerSource {
if ferr := rep.Flush(ctx); ferr != nil {
return errors.Wrap(ferr, "flush error")
}
}
c.svc.getProgress().Finish()
return c.reportSnapshotStatus(ctx, manifest)
}
func (c *commandSnapshotCreate) reportSnapshotStatus(ctx context.Context, manifest *snapshot.Manifest) error {
var maybePartial string
if manifest.IncompleteReason != "" {
maybePartial = " partial"
}
sourceInfo := manifest.Source
snapID := manifest.ID
if c.jo.jsonOutput {
c.out.printStdout("%s\n", c.jo.jsonIndentedBytes(manifest, " "))
} else {
log(ctx).Infof("Created%v snapshot with root %v and ID %v in %v", maybePartial, manifest.RootObjectID(), snapID, manifest.EndTime.Sub(manifest.StartTime).Truncate(time.Second))
}
if ds := manifest.RootEntry.DirSummary; ds != nil {
if ds.IgnoredErrorCount > 0 {
log(ctx).Warnf("Ignored %v error(s) while snapshotting %v.", ds.IgnoredErrorCount, sourceInfo)
}
if ds.FatalErrorCount > 0 {
return errors.Errorf("Found %v fatal error(s) while snapshotting %v.", ds.FatalErrorCount, sourceInfo) //nolint:revive
}
}
return nil
}
func getLocalBackupPaths(ctx context.Context, rep repo.Repository) ([]string, error) {
log(ctx).Debugf("Looking for previous backups of '%v@%v'...", rep.ClientOptions().Hostname, rep.ClientOptions().Username)
sources, err := snapshot.ListSources(ctx, rep)
if err != nil {
return nil, errors.Wrap(err, "unable to list sources")
}
var result []string
for _, src := range sources {
// add all sources belonging to the repository user@host
// ignore sources that have Manual field set to true in the SchedulingPolicy
includeSource, err := shouldSnapshotSource(ctx, src, rep)
if err != nil {
return nil, err
}
if includeSource {
result = append(result, src.Path)
}
}
return result, nil
}
func shouldSnapshotSource(ctx context.Context, src snapshot.SourceInfo, rep repo.Repository) (bool, error) {
policyTree, err := policy.TreeForSource(ctx, rep, src)
if err != nil {
return false, errors.Wrapf(err, "unable to get policy tree for source %v", src)
}
return src.Host == rep.ClientOptions().Hostname &&
src.UserName == rep.ClientOptions().Username &&
!policy.IsManualSnapshot(policyTree), nil
}
// the setManual return value is true when a snapshot is manually created, such
// as when overriding the source info or snapshotting from stdin.
func (c *commandSnapshotCreate) getContentToSnapshot(ctx context.Context, dir string, rep repo.RepositoryWriter, opts localfs.Options) (fsEntry fs.Entry, info snapshot.SourceInfo, setManual bool, err error) {
var absDir string
absDir, err = filepath.Abs(dir)
if err != nil {
return nil, info, false, errors.Wrapf(err, "invalid source %v", dir)
}
if c.sourceOverride != "" {
info, err = parseFullSource(c.sourceOverride, rep.ClientOptions().Hostname, rep.ClientOptions().Username)
if err != nil {
return nil, info, false, errors.Wrapf(err, "invalid source override %v", c.sourceOverride)
}
setManual = true
} else {
info = snapshot.SourceInfo{
Path: filepath.Clean(absDir),
Host: rep.ClientOptions().Hostname,
UserName: rep.ClientOptions().Username,
}
}
if c.snapshotCreateStdinFileName != "" {
// stdin source will be snapshotted using a virtual static root directory with a single streaming file entry
// Create a new static directory with the given name and add a streaming file entry with os.Stdin reader
fsEntry = virtualfs.NewStaticDirectory(absDir, []fs.Entry{
virtualfs.StreamingFileFromReader(c.snapshotCreateStdinFileName, io.NopCloser(c.svc.stdin())),
})
setManual = true
} else {
fsEntry, err = getLocalFSEntry(ctx, absDir, opts)
if err != nil {
return nil, info, false, errors.Wrap(err, "unable to get local filesystem entry")
}
}
return fsEntry, info, setManual, nil
}
func parseFullSource(str, hostname, username string) (snapshot.SourceInfo, error) {
sourceInfo, err := snapshot.ParseSourceInfo(str, hostname, username)
if err != nil {
return snapshot.SourceInfo{}, errors.Wrapf(err, "not a valid source %v", str)
} else if sourceInfo.Host == "" || sourceInfo.UserName == "" || sourceInfo.Path == "" {
return snapshot.SourceInfo{}, errors.Errorf("source does not resolve into host, user and path: '%s'", str)
}
return sourceInfo, nil
}