mirror of
https://github.com/syncthing/syncthing.git
synced 2025-12-23 22:18:14 -05:00
Merge branch 'infrastructure'
* infrastructure: chore(stdiscosrv): use log/slog chore(stdiscosrv): larger write buffer
This commit is contained in:
@@ -10,7 +10,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log/slog"
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
"github.com/thejerf/suture/v4"
|
"github.com/thejerf/suture/v4"
|
||||||
@@ -173,7 +173,7 @@ func (s *amqpReceiver) Serve(ctx context.Context) error {
|
|||||||
id, err = protocol.DeviceIDFromString(string(rec.Key))
|
id, err = protocol.DeviceIDFromString(string(rec.Key))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Replication device ID:", err)
|
slog.Warn("Failed to parse replication device ID", "error", err)
|
||||||
replicationRecvsTotal.WithLabelValues("error").Inc()
|
replicationRecvsTotal.WithLabelValues("error").Inc()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
io "io"
|
io "io"
|
||||||
"log"
|
"log"
|
||||||
|
"log/slog"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -93,7 +94,7 @@ func (s *apiSrv) Serve(ctx context.Context) error {
|
|||||||
if s.useHTTP {
|
if s.useHTTP {
|
||||||
listener, err := net.Listen("tcp", s.addr)
|
listener, err := net.Listen("tcp", s.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Listen:", err)
|
slog.ErrorContext(ctx, "Failed to listen", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.listener = listener
|
s.listener = listener
|
||||||
@@ -107,7 +108,7 @@ func (s *apiSrv) Serve(ctx context.Context) error {
|
|||||||
|
|
||||||
tlsListener, err := tls.Listen("tcp", s.addr, tlsCfg)
|
tlsListener, err := tls.Listen("tcp", s.addr, tlsCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Listen:", err)
|
slog.ErrorContext(ctx, "Failed to listen", "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.listener = tlsListener
|
s.listener = tlsListener
|
||||||
@@ -132,7 +133,7 @@ func (s *apiSrv) Serve(ctx context.Context) error {
|
|||||||
|
|
||||||
err := srv.Serve(s.listener)
|
err := srv.Serve(s.listener)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Serve:", err)
|
slog.ErrorContext(ctx, "Failed to serve", "error", err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -151,9 +152,7 @@ func (s *apiSrv) handler(w http.ResponseWriter, req *http.Request) {
|
|||||||
reqID := requestID(rand.Int63())
|
reqID := requestID(rand.Int63())
|
||||||
req = req.WithContext(context.WithValue(req.Context(), idKey, reqID))
|
req = req.WithContext(context.WithValue(req.Context(), idKey, reqID))
|
||||||
|
|
||||||
if debug {
|
slog.Debug("Handling request", "id", reqID, "method", req.Method, "url", req.URL, "proto", req.Proto)
|
||||||
log.Println(reqID, req.Method, req.URL, req.Proto)
|
|
||||||
}
|
|
||||||
|
|
||||||
remoteAddr := &net.TCPAddr{
|
remoteAddr := &net.TCPAddr{
|
||||||
IP: nil,
|
IP: nil,
|
||||||
@@ -174,7 +173,7 @@ func (s *apiSrv) handler(w http.ResponseWriter, req *http.Request) {
|
|||||||
var err error
|
var err error
|
||||||
remoteAddr, err = net.ResolveTCPAddr("tcp", req.RemoteAddr)
|
remoteAddr, err = net.ResolveTCPAddr("tcp", req.RemoteAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("remoteAddr:", err)
|
slog.Warn("Failed to resolve remote address", "address", req.RemoteAddr, "error", err)
|
||||||
lw.Header().Set("Retry-After", errorRetryAfterString())
|
lw.Header().Set("Retry-After", errorRetryAfterString())
|
||||||
http.Error(lw, "Internal Server Error", http.StatusInternalServerError)
|
http.Error(lw, "Internal Server Error", http.StatusInternalServerError)
|
||||||
apiRequestsTotal.WithLabelValues("no_remote_addr").Inc()
|
apiRequestsTotal.WithLabelValues("no_remote_addr").Inc()
|
||||||
@@ -197,9 +196,7 @@ func (s *apiSrv) handleGET(w http.ResponseWriter, req *http.Request) {
|
|||||||
|
|
||||||
deviceID, err := protocol.DeviceIDFromString(req.URL.Query().Get("device"))
|
deviceID, err := protocol.DeviceIDFromString(req.URL.Query().Get("device"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if debug {
|
slog.Debug("Request with bad device param", "id", reqID, "error", err)
|
||||||
log.Println(reqID, "bad device param:", err)
|
|
||||||
}
|
|
||||||
lookupRequestsTotal.WithLabelValues("bad_request").Inc()
|
lookupRequestsTotal.WithLabelValues("bad_request").Inc()
|
||||||
w.Header().Set("Retry-After", errorRetryAfterString())
|
w.Header().Set("Retry-After", errorRetryAfterString())
|
||||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||||
@@ -259,9 +256,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
|
|||||||
|
|
||||||
rawCert, err := certificateBytes(req)
|
rawCert, err := certificateBytes(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if debug {
|
slog.Debug("Request without certificates", "id", reqID, "error", err)
|
||||||
log.Println(reqID, "no certificates:", err)
|
|
||||||
}
|
|
||||||
announceRequestsTotal.WithLabelValues("no_certificate").Inc()
|
announceRequestsTotal.WithLabelValues("no_certificate").Inc()
|
||||||
w.Header().Set("Retry-After", errorRetryAfterString())
|
w.Header().Set("Retry-After", errorRetryAfterString())
|
||||||
http.Error(w, "Forbidden", http.StatusForbidden)
|
http.Error(w, "Forbidden", http.StatusForbidden)
|
||||||
@@ -270,9 +265,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
|
|||||||
|
|
||||||
var ann announcement
|
var ann announcement
|
||||||
if err := json.NewDecoder(req.Body).Decode(&ann); err != nil {
|
if err := json.NewDecoder(req.Body).Decode(&ann); err != nil {
|
||||||
if debug {
|
slog.Debug("Failed to decode request", "id", reqID, "error", err)
|
||||||
log.Println(reqID, "decode:", err)
|
|
||||||
}
|
|
||||||
announceRequestsTotal.WithLabelValues("bad_request").Inc()
|
announceRequestsTotal.WithLabelValues("bad_request").Inc()
|
||||||
w.Header().Set("Retry-After", errorRetryAfterString())
|
w.Header().Set("Retry-After", errorRetryAfterString())
|
||||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||||
@@ -283,9 +276,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
|
|||||||
|
|
||||||
addresses := fixupAddresses(remoteAddr, ann.Addresses)
|
addresses := fixupAddresses(remoteAddr, ann.Addresses)
|
||||||
if len(addresses) == 0 {
|
if len(addresses) == 0 {
|
||||||
if debug {
|
slog.Debug("Request without addresses", "id", reqID, "error", err)
|
||||||
log.Println(reqID, "no addresses")
|
|
||||||
}
|
|
||||||
announceRequestsTotal.WithLabelValues("bad_request").Inc()
|
announceRequestsTotal.WithLabelValues("bad_request").Inc()
|
||||||
w.Header().Set("Retry-After", errorRetryAfterString())
|
w.Header().Set("Retry-After", errorRetryAfterString())
|
||||||
http.Error(w, "Bad Request", http.StatusBadRequest)
|
http.Error(w, "Bad Request", http.StatusBadRequest)
|
||||||
@@ -293,9 +284,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := s.handleAnnounce(deviceID, addresses); err != nil {
|
if err := s.handleAnnounce(deviceID, addresses); err != nil {
|
||||||
if debug {
|
slog.Debug("Failed to handle request", "id", reqID, "error", err)
|
||||||
log.Println(reqID, "handle:", err)
|
|
||||||
}
|
|
||||||
announceRequestsTotal.WithLabelValues("internal_error").Inc()
|
announceRequestsTotal.WithLabelValues("internal_error").Inc()
|
||||||
w.Header().Set("Retry-After", errorRetryAfterString())
|
w.Header().Set("Retry-After", errorRetryAfterString())
|
||||||
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
|
||||||
@@ -306,9 +295,7 @@ func (s *apiSrv) handlePOST(remoteAddr *net.TCPAddr, w http.ResponseWriter, req
|
|||||||
|
|
||||||
w.Header().Set("Reannounce-After", reannounceAfterString())
|
w.Header().Set("Reannounce-After", reannounceAfterString())
|
||||||
w.WriteHeader(http.StatusNoContent)
|
w.WriteHeader(http.StatusNoContent)
|
||||||
if debug {
|
slog.Debug("Device announced", "id", reqID, "device", deviceID, "addresses", addresses)
|
||||||
log.Println(reqID, "announced", deviceID, addresses)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *apiSrv) Stop() {
|
func (s *apiSrv) Stop() {
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ import (
|
|||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"runtime"
|
"runtime"
|
||||||
@@ -74,24 +74,24 @@ func newInMemoryStore(dir string, flushInterval time.Duration, blobs blob.Store)
|
|||||||
// Try to read from blob storage
|
// Try to read from blob storage
|
||||||
latestKey, cerr := blobs.LatestKey(context.Background())
|
latestKey, cerr := blobs.LatestKey(context.Background())
|
||||||
if cerr != nil {
|
if cerr != nil {
|
||||||
log.Println("Error finding database from blob storage:", cerr)
|
slog.Error("Failed to find database in blob storage", "error", cerr)
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
fd, cerr := os.Create(path.Join(s.dir, "records.db"))
|
fd, cerr := os.Create(path.Join(s.dir, "records.db"))
|
||||||
if cerr != nil {
|
if cerr != nil {
|
||||||
log.Println("Error creating database file:", cerr)
|
slog.Error("Failed to create database file", "error", cerr)
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
if cerr := blobs.Download(context.Background(), latestKey, fd); cerr != nil {
|
if cerr := blobs.Download(context.Background(), latestKey, fd); cerr != nil {
|
||||||
log.Printf("Error downloading database from blob storage: %v", cerr)
|
slog.Error("Failed to download database from blob storage", "error", cerr)
|
||||||
}
|
}
|
||||||
_ = fd.Close()
|
_ = fd.Close()
|
||||||
nr, err = s.read()
|
nr, err = s.read()
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Error reading database:", err)
|
slog.Error("Failed to read database", "error", err)
|
||||||
}
|
}
|
||||||
log.Printf("Read %d records from database", nr)
|
slog.Info("Loaded database", "records", nr)
|
||||||
s.expireAndCalculateStatistics()
|
s.expireAndCalculateStatistics()
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
@@ -153,13 +153,13 @@ loop:
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
log.Println("Calculating statistics")
|
slog.InfoContext(ctx, "Calculating statistics")
|
||||||
s.expireAndCalculateStatistics()
|
s.expireAndCalculateStatistics()
|
||||||
log.Println("Flushing database")
|
slog.InfoContext(ctx, "Flushing database")
|
||||||
if err := s.write(); err != nil {
|
if err := s.write(); err != nil {
|
||||||
log.Println("Error writing database:", err)
|
slog.ErrorContext(ctx, "Failed to write database", "error", err)
|
||||||
}
|
}
|
||||||
log.Println("Finished flushing database")
|
slog.InfoContext(ctx, "Finished flushing database")
|
||||||
t.Reset(s.flushInterval)
|
t.Reset(s.flushInterval)
|
||||||
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@@ -256,7 +256,7 @@ func (s *inMemoryStore) write() (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bw := bufio.NewWriter(fd)
|
bw := bufio.NewWriterSize(fd, 1<<20)
|
||||||
|
|
||||||
var buf []byte
|
var buf []byte
|
||||||
var rangeErr error
|
var rangeErr error
|
||||||
@@ -310,18 +310,24 @@ func (s *inMemoryStore) write() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if info, err := os.Lstat(dbf); err == nil {
|
||||||
|
slog.Info("Saved database", "name", dbf, "size", info.Size(), "modtime", info.ModTime())
|
||||||
|
} else {
|
||||||
|
slog.Warn("Failed to stat database after save", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Upload to blob storage
|
// Upload to blob storage
|
||||||
if s.blobs != nil {
|
if s.blobs != nil {
|
||||||
fd, err = os.Open(dbf)
|
fd, err = os.Open(dbf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error uploading database to blob storage: %v", err)
|
slog.Error("Failed to upload database to blob storage", "error", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
defer fd.Close()
|
defer fd.Close()
|
||||||
if err := s.blobs.Upload(context.Background(), s.objKey, fd); err != nil {
|
if err := s.blobs.Upload(context.Background(), s.objKey, fd); err != nil {
|
||||||
log.Printf("Error uploading database to blob storage: %v", err)
|
slog.Error("Failed to upload database to blob storage", "error", err)
|
||||||
}
|
}
|
||||||
log.Println("Finished uploading database")
|
slog.Info("Finished uploading database")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -360,7 +366,7 @@ func (s *inMemoryStore) read() (int, error) {
|
|||||||
key, err = protocol.DeviceIDFromString(string(rec.Key))
|
key, err = protocol.DeviceIDFromString(string(rec.Key))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Println("Bad device ID:", err)
|
slog.Error("Got bad device ID while reading database", "error", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,9 +9,8 @@ package main
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"log"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
_ "net/http/pprof"
|
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
"runtime"
|
||||||
@@ -24,6 +23,7 @@ import (
|
|||||||
"github.com/syncthing/syncthing/internal/blob"
|
"github.com/syncthing/syncthing/internal/blob"
|
||||||
"github.com/syncthing/syncthing/internal/blob/azureblob"
|
"github.com/syncthing/syncthing/internal/blob/azureblob"
|
||||||
"github.com/syncthing/syncthing/internal/blob/s3"
|
"github.com/syncthing/syncthing/internal/blob/s3"
|
||||||
|
"github.com/syncthing/syncthing/internal/slogutil"
|
||||||
_ "github.com/syncthing/syncthing/lib/automaxprocs"
|
_ "github.com/syncthing/syncthing/lib/automaxprocs"
|
||||||
"github.com/syncthing/syncthing/lib/build"
|
"github.com/syncthing/syncthing/lib/build"
|
||||||
"github.com/syncthing/syncthing/lib/protocol"
|
"github.com/syncthing/syncthing/lib/protocol"
|
||||||
@@ -32,8 +32,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
addressExpiryTime = 2 * time.Hour
|
addressExpiryTime = 2 * time.Hour
|
||||||
databaseStatisticsInterval = 5 * time.Minute
|
|
||||||
|
|
||||||
// Reannounce-After is set to reannounceAfterSeconds +
|
// Reannounce-After is set to reannounceAfterSeconds +
|
||||||
// random(reannounzeFuzzSeconds), similar for Retry-After
|
// random(reannounzeFuzzSeconds), similar for Retry-After
|
||||||
@@ -88,13 +87,16 @@ type CLI struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
log.SetOutput(os.Stdout)
|
|
||||||
|
|
||||||
var cli CLI
|
var cli CLI
|
||||||
kong.Parse(&cli)
|
kong.Parse(&cli)
|
||||||
debug = cli.Debug
|
|
||||||
|
|
||||||
log.Println(build.LongVersionFor("stdiscosrv"))
|
level := slog.LevelInfo
|
||||||
|
if cli.Debug {
|
||||||
|
level = slog.LevelDebug
|
||||||
|
}
|
||||||
|
slogutil.SetDefaultLevel(level)
|
||||||
|
|
||||||
|
slog.Info(build.LongVersionFor("stdiscosrv"))
|
||||||
if cli.Version {
|
if cli.Version {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -106,16 +108,18 @@ func main() {
|
|||||||
var err error
|
var err error
|
||||||
cert, err = tls.LoadX509KeyPair(cli.Cert, cli.Key)
|
cert, err = tls.LoadX509KeyPair(cli.Cert, cli.Key)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
log.Println("Failed to load keypair. Generating one, this might take a while...")
|
slog.Info("Failed to load keypair. Generating one, this might take a while...")
|
||||||
cert, err = tlsutil.NewCertificate(cli.Cert, cli.Key, "stdiscosrv", 20*365, false)
|
cert, err = tlsutil.NewCertificate(cli.Cert, cli.Key, "stdiscosrv", 20*365, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalln("Failed to generate X509 key pair:", err)
|
slog.Error("Failed to generate X509 key pair", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
log.Fatalln("Failed to load keypair:", err)
|
slog.Error("Failed to load keypair", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
devID := protocol.NewDeviceID(cert.Certificate[0])
|
devID := protocol.NewDeviceID(cert.Certificate[0])
|
||||||
log.Println("Server device ID is", devID)
|
slog.Info("Loaded certificate keypair", "deviceID", devID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Root of the service tree.
|
// Root of the service tree.
|
||||||
@@ -133,7 +137,8 @@ func main() {
|
|||||||
blobs, err = azureblob.NewBlobStore(cli.DBAzureBlobAccount, cli.DBAzureBlobKey, cli.DBAzureBlobContainer)
|
blobs, err = azureblob.NewBlobStore(cli.DBAzureBlobAccount, cli.DBAzureBlobKey, cli.DBAzureBlobContainer)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to create blob store: %v", err)
|
slog.Error("Failed to create blob store", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the database.
|
// Start the database.
|
||||||
@@ -158,7 +163,9 @@ func main() {
|
|||||||
go func() {
|
go func() {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.Handle("/metrics", promhttp.Handler())
|
mux.Handle("/metrics", promhttp.Handler())
|
||||||
log.Fatal(http.ListenAndServe(cli.MetricsListen, mux))
|
err := http.ListenAndServe(cli.MetricsListen, mux)
|
||||||
|
slog.Error("Failed to serve", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -170,7 +177,7 @@ func main() {
|
|||||||
signal.Notify(signalChan, os.Interrupt)
|
signal.Notify(signalChan, os.Interrupt)
|
||||||
go func() {
|
go func() {
|
||||||
sig := <-signalChan
|
sig := <-signalChan
|
||||||
log.Printf("Received signal %s; shutting down", sig)
|
slog.Info("Received signal; shutting down", "signal", sig)
|
||||||
cancel()
|
cancel()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user