Files
kopia/internal/server/source_manager.go

230 lines
5.8 KiB
Go

package server
import (
"context"
"sync"
"time"
"github.com/kopia/kopia/internal/serverapi"
"github.com/kopia/kopia/fs/localfs"
"github.com/kopia/kopia/snapshot"
"github.com/rs/zerolog/log"
)
// sourceManager manages the state machine of each source
// Possible states:
//
// - INITIALIZING - fetching configuration from repository
// - READY - waiting for next snapshot
// - PAUSED - inactive
// - FAILED - inactive
// - UPLOADING - uploading a snapshot
type sourceManager struct {
server *Server
src snapshot.SourceInfo
closed chan struct{}
mu sync.RWMutex
pol *snapshot.Policy
state string
nextSnapshotTime time.Time
lastCompleteSnapshot *snapshot.Manifest
lastSnapshot *snapshot.Manifest
// state of current upload
uploadPath string
uploadPathCompleted int64
uploadPathTotal int64
}
func (s *sourceManager) Status() serverapi.SourceStatus {
s.mu.RLock()
defer s.mu.RUnlock()
st := serverapi.SourceStatus{
Source: s.src,
Status: s.state,
LastSnapshotTime: s.lastSnapshot.StartTime,
NextSnapshotTime: s.nextSnapshotTime,
Policy: s.pol,
}
st.UploadStatus.UploadingPath = s.uploadPath
st.UploadStatus.UploadingPathCompleted = s.uploadPathCompleted
st.UploadStatus.UploadingPathTotal = s.uploadPathTotal
return st
}
func (s *sourceManager) setStatus(stat string) {
s.mu.Lock()
s.state = stat
s.mu.Unlock()
}
func (s *sourceManager) run() {
s.setStatus("INITIALIZING")
defer s.setStatus("STOPPED")
s.setStatus("RUNNING")
s.refreshStatus()
for {
var timeBeforeNextSnapshot time.Duration
if !s.nextSnapshotTime.IsZero() && s.server.hostname == s.src.Host {
timeBeforeNextSnapshot = time.Until(s.nextSnapshotTime)
log.Info().Msgf("time to next snapshot %v is %v", s.src, timeBeforeNextSnapshot)
} else {
timeBeforeNextSnapshot = 24 * time.Hour
}
select {
case <-s.closed:
return
case <-time.After(15 * time.Second):
case <-time.After(timeBeforeNextSnapshot):
log.Info().Msgf("snapshotting %v", s.src)
s.snapshot()
s.refreshStatus()
}
}
}
func (s *sourceManager) Progress(path string, pathCompleted, pathTotal int64, stats *snapshot.Stats) {
s.mu.Lock()
defer s.mu.Unlock()
s.uploadPath = path
s.uploadPathCompleted = pathCompleted
s.uploadPathTotal = pathTotal
log.Debug().Msgf("path: %v %v/%v", path, pathCompleted, pathTotal)
}
func (s *sourceManager) UploadFinished() {
s.mu.Lock()
defer s.mu.Unlock()
s.uploadPath = ""
s.uploadPathCompleted = 0
s.uploadPathTotal = 0
}
func (s *sourceManager) upload() serverapi.SourceActionResponse {
log.Info().Msgf("upload triggered via API: %v", s.src)
return serverapi.SourceActionResponse{Success: true}
}
func (s *sourceManager) cancel() serverapi.SourceActionResponse {
log.Info().Msgf("cancel triggered via API: %v", s.src)
return serverapi.SourceActionResponse{Success: true}
}
func (s *sourceManager) pause() serverapi.SourceActionResponse {
log.Info().Msgf("pause triggered via API: %v", s.src)
return serverapi.SourceActionResponse{Success: true}
}
func (s *sourceManager) resume() serverapi.SourceActionResponse {
log.Info().Msgf("resume triggered via API: %v", s.src)
return serverapi.SourceActionResponse{Success: true}
}
func (s *sourceManager) snapshot() {
s.server.beginUpload(s.src)
defer s.server.endUpload(s.src)
localEntry, err := localfs.NewEntry(s.src.Path, nil)
if err != nil {
log.Error().Msgf("unable to create local filesystem: %v", err)
return
}
u := snapshot.NewUploader(s.server.rep)
u.FilesPolicy = s.pol.FilesPolicy
u.Progress = s
ctx := context.Background()
log.Info().Msgf("starting upload of %v", s.src)
manifest, err := u.Upload(ctx, localEntry, s.src, s.lastSnapshot)
if err != nil {
log.Error().Msgf("upload error: %v", err)
return
}
snapshotID, err := s.server.snapshotManager.SaveSnapshot(manifest)
if err != nil {
log.Error().Msgf("unable to save snapshot: %v", err)
return
}
log.Info().Msgf("created snapshot %v", snapshotID)
if err := s.server.rep.Flush(ctx); err != nil {
log.Error().Msgf("unable to flush: %v", err)
return
}
}
func (s *sourceManager) findClosestNextSnapshotTime() time.Time {
nextSnapshotTime := time.Now().Add(24 * time.Hour)
if s.pol != nil {
// compute next snapshot time based on interval
if interval := s.pol.SchedulingPolicy.Interval; interval != nil {
nt := s.lastSnapshot.StartTime.Add(*interval).Truncate(*interval)
if nt.Before(nextSnapshotTime) {
nextSnapshotTime = nt
}
}
for _, tod := range s.pol.SchedulingPolicy.TimesOfDay {
nowLocalTime := time.Now().Local()
localSnapshotTime := time.Date(nowLocalTime.Year(), nowLocalTime.Month(), nowLocalTime.Day(), tod.Hour, tod.Minute, 0, 0, time.Local)
if tod.Hour < nowLocalTime.Hour() || (tod.Hour == nowLocalTime.Hour() && tod.Minute < nowLocalTime.Minute()) {
localSnapshotTime = localSnapshotTime.Add(24 * time.Hour)
}
if localSnapshotTime.Before(nextSnapshotTime) {
nextSnapshotTime = localSnapshotTime
}
}
}
return nextSnapshotTime
}
func (s *sourceManager) refreshStatus() {
log.Info().Msgf("refreshing state for %v", s.src)
pol, _, err := s.server.policyManager.GetEffectivePolicy(s.src)
if err != nil {
s.setStatus("FAILED")
return
}
s.pol = pol
snapshots, err := s.server.snapshotManager.ListSnapshots(s.src)
if err != nil {
s.setStatus("FAILED")
return
}
s.lastCompleteSnapshot = nil
snaps := snapshot.SortByTime(snapshots, true)
if len(snaps) > 0 {
s.lastSnapshot = snaps[0]
s.nextSnapshotTime = s.findClosestNextSnapshotTime()
} else {
s.nextSnapshotTime = time.Time{}
s.lastSnapshot = nil
}
}
func newSourceManager(src snapshot.SourceInfo, server *Server) *sourceManager {
m := &sourceManager{
src: src,
server: server,
state: "UNKNOWN",
closed: make(chan struct{}),
}
return m
}