UR now web scale

This commit is contained in:
Audrius Butkevicius
2020-06-20 21:29:39 +01:00
parent 65d0cc6423
commit c5a3ae314c
3 changed files with 108 additions and 201 deletions

View File

@@ -175,13 +175,13 @@ func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
res, err := db.Exec(`INSERT INTO VersionSummary (
SELECT
DATE_TRUNC('day', Received) AS Day,
SUBSTRING(Version FROM '^v\d.\d+') AS Ver,
SUBSTRING(Report->>'version' FROM '^v\d.\d+') AS Ver,
COUNT(*) AS Count
FROM Reports
FROM ReportsJson
WHERE
DATE_TRUNC('day', Received) > $1
AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
AND Version like 'v_.%'
AND Report->>'version' like 'v_.%'
GROUP BY Day, Ver
);
`, since)
@@ -195,11 +195,11 @@ func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
func aggregateUserMovement(db *sql.DB) (int64, error) {
rows, err := db.Query(`SELECT
DATE_TRUNC('day', Received) AS Day,
UniqueID
FROM Reports
Report->>'uniqueID'
FROM ReportsJson
WHERE
DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
AND Version like 'v_.%'
AND Report->>'version' like 'v_.%'
ORDER BY Day
`)
if err != nil {
@@ -276,16 +276,16 @@ func aggregatePerformance(db *sql.DB, since time.Time) (int64, error) {
res, err := db.Exec(`INSERT INTO Performance (
SELECT
DATE_TRUNC('day', Received) AS Day,
AVG(TotFiles) As TotFiles,
AVG(TotMiB) As TotMiB,
AVG(SHA256Perf) As SHA256Perf,
AVG(MemorySize) As MemorySize,
AVG(MemoryUsageMiB) As MemoryUsageMiB
FROM Reports
AVG((Report->>'totFiles')::numeric) As TotFiles,
AVG((Report->>'totMiB')::numeric) As TotMiB,
AVG((Report->>'sha256Perf')::numeric) As SHA256Perf,
AVG((Report->>'memorySize')::numeric) As MemorySize,
AVG((Report->>'memoryUsageMiB')::numeric) As MemoryUsageMiB
FROM ReportsJson
WHERE
DATE_TRUNC('day', Received) > $1
AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
AND Version like 'v_.%'
AND Report->>'version' like 'v_.%'
GROUP BY Day
);
`, since)
@@ -303,22 +303,22 @@ func aggregateBlockStats(db *sql.DB, since time.Time) (int64, error) {
SELECT
DATE_TRUNC('day', Received) AS Day,
COUNT(1) As Reports,
SUM(BlocksTotal) AS Total,
SUM(BlocksRenamed) AS Renamed,
SUM(BlocksReused) AS Reused,
SUM(BlocksPulled) AS Pulled,
SUM(BlocksCopyOrigin) AS CopyOrigin,
SUM(BlocksCopyOriginShifted) AS CopyOriginShifted,
SUM(BlocksCopyElsewhere) AS CopyElsewhere
FROM Reports
SUM((Report->'blockStats'->>'total')::numeric) AS Total,
SUM((Report->'blockStats'->>'renamed')::numeric) AS Renamed,
SUM((Report->'blockStats'->>'reused')::numeric) AS Reused,
SUM((Report->'blockStats'->>'pulled')::numeric) AS Pulled,
SUM((Report->'blockStats'->>'copyOrigin')::numeric) AS CopyOrigin,
SUM((Report->'blockStats'->>'copyOriginShifted')::numeric) AS CopyOriginShifted,
SUM((Report->'blockStats'->>'copyElsewhere')::numeric) AS CopyElsewhere
FROM ReportsJson
WHERE
DATE_TRUNC('day', Received) > $1
AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
AND ReportVersion = 3
AND Version like 'v_.%'
AND Version NOT LIKE 'v0.14.40%'
AND Version NOT LIKE 'v0.14.39%'
AND Version NOT LIKE 'v0.14.38%'
AND (Report->>'urVersion')::numeric >= 3
AND Report->>'version' like 'v_.%'
AND Report->>'version' NOT LIKE 'v0.14.40%'
AND Report->>'version' NOT LIKE 'v0.14.39%'
AND Report->>'version' NOT LIKE 'v0.14.38%'
GROUP BY Day
);
`, since)

View File

@@ -11,7 +11,6 @@ import (
"crypto/tls"
"database/sql"
"encoding/json"
"fmt"
"html/template"
"io"
"io/ioutil"
@@ -27,7 +26,8 @@ import (
"time"
"unicode"
geoip2 "github.com/oschwald/geoip2-golang"
"github.com/lib/pq"
"github.com/oschwald/geoip2-golang"
"github.com/syncthing/syncthing/lib/ur/contract"
)
@@ -103,198 +103,100 @@ func getEnvDefault(key, def string) string {
}
func setupDB(db *sql.DB) error {
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS Reports (
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS ReportsJson (
Received TIMESTAMP NOT NULL,
UniqueID VARCHAR(32) NOT NULL,
Version VARCHAR(32) NOT NULL,
LongVersion VARCHAR(256) NOT NULL,
Platform VARCHAR(32) NOT NULL,
NumFolders INTEGER NOT NULL,
NumDevices INTEGER NOT NULL,
TotFiles INTEGER NOT NULL,
FolderMaxFiles INTEGER NOT NULL,
TotMiB INTEGER NOT NULL,
FolderMaxMiB INTEGER NOT NULL,
MemoryUsageMiB INTEGER NOT NULL,
SHA256Perf DOUBLE PRECISION NOT NULL,
MemorySize INTEGER NOT NULL,
Date VARCHAR(8) NOT NULL
Report JSONB NOT NULL
)`)
if err != nil {
return err
}
var t string
row := db.QueryRow(`SELECT 'UniqueIDIndex'::regclass`)
if err := row.Scan(&t); err != nil {
if _, err = db.Exec(`CREATE UNIQUE INDEX UniqueIDIndex ON Reports (Date, UniqueID)`); err != nil {
if err := db.QueryRow(`SELECT 'UniqueIDJsonIndex'::regclass`).Scan(&t); err != nil {
if _, err = db.Exec(`CREATE UNIQUE INDEX UniqueIDJsonIndex ON ReportsJson ((Report->>'date'), (Report->>'uniqueID'))`); err != nil {
return err
}
}
row = db.QueryRow(`SELECT 'ReceivedIndex'::regclass`)
if err := row.Scan(&t); err != nil {
if _, err = db.Exec(`CREATE INDEX ReceivedIndex ON Reports (Received)`); err != nil {
if err := db.QueryRow(`SELECT 'ReceivedJsonIndex'::regclass`).Scan(&t); err != nil {
if _, err = db.Exec(`CREATE INDEX ReceivedJsonIndex ON ReportsJson (Received)`); err != nil {
return err
}
}
// V2
if err := db.QueryRow(`SELECT 'ReportVersionJsonIndex'::regclass`).Scan(&t); err != nil {
if _, err = db.Exec(`CREATE INDEX ReportVersionJsonIndex ON ReportsJson (cast((Report->>'urVersion') as numeric))`); err != nil {
return err
}
}
row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'reportversion'`)
if err := row.Scan(&t); err != nil {
// The ReportVersion column doesn't exist; add the new columns.
_, err = db.Exec(`ALTER TABLE Reports
ADD COLUMN ReportVersion INTEGER NOT NULL DEFAULT 0,
ADD COLUMN NumCPU INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderRO INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderIgnorePerms INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderIgnoreDelete INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderAutoNormalize INTEGER NOT NULL DEFAULT 0,
ADD COLUMN DeviceIntroducer INTEGER NOT NULL DEFAULT 0,
ADD COLUMN DeviceCustomCertName INTEGER NOT NULL DEFAULT 0,
ADD COLUMN DeviceCompressAlways INTEGER NOT NULL DEFAULT 0,
ADD COLUMN DeviceCompressMetadata INTEGER NOT NULL DEFAULT 0,
ADD COLUMN DeviceCompressNever INTEGER NOT NULL DEFAULT 0,
ADD COLUMN DeviceDynamicAddr INTEGER NOT NULL DEFAULT 0,
ADD COLUMN DeviceStaticAddr INTEGER NOT NULL DEFAULT 0,
ADD COLUMN AnnounceGlobalEnabled BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN AnnounceLocalEnabled BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN AnnounceDefaultServersDNS INTEGER NOT NULL DEFAULT 0,
ADD COLUMN AnnounceDefaultServersIP INTEGER NOT NULL DEFAULT 0,
ADD COLUMN AnnounceOtherServers INTEGER NOT NULL DEFAULT 0,
ADD COLUMN RelayEnabled BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN RelayDefaultServers INTEGER NOT NULL DEFAULT 0,
ADD COLUMN RelayOtherServers INTEGER NOT NULL DEFAULT 0,
ADD COLUMN RateLimitEnabled BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN UpgradeAllowedManual BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN UpgradeAllowedAuto BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN FolderSimpleVersioning INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderExternalVersioning INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderStaggeredVersioning INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderTrashcanVersioning INTEGER NOT NULL DEFAULT 0
`)
// Migrate from old schema to new schema if the table exists.
var count uint64
if err := db.QueryRow(`SELECT COUNT(1) FROM Reports`).Scan(&count); err == nil && count > 0 {
tx, err := db.Begin()
if err != nil {
log.Println("sql:", err)
return err
}
}
defer tx.Rollback()
row = db.QueryRow(`SELECT 'ReportVersionIndex'::regclass`)
if err := row.Scan(&t); err != nil {
if _, err = db.Exec(`CREATE INDEX ReportVersionIndex ON Reports (ReportVersion)`); err != nil {
return err
}
}
// V2.5
row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'upgradeallowedpre'`)
if err := row.Scan(&t); err != nil {
// The ReportVersion column doesn't exist; add the new columns.
_, err = db.Exec(`ALTER TABLE Reports
ADD COLUMN UpgradeAllowedPre BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN RescanIntvs INT[] NOT NULL DEFAULT '{}'
`)
stmt, err := tx.Prepare(pq.CopyIn("ReportsJson", "Received", "Report"))
if err != nil {
log.Println("sql:", err)
return err
}
}
// V3
var rep contract.Report
row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'uptime'`)
if err := row.Scan(&t); err != nil {
// The Uptime column doesn't exist; add the new columns.
_, err = db.Exec(`ALTER TABLE Reports
ADD COLUMN Uptime INTEGER NOT NULL DEFAULT 0,
ADD COLUMN NATType VARCHAR(32) NOT NULL DEFAULT '',
ADD COLUMN AlwaysLocalNets BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN CacheIgnoredFiles BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN OverwriteRemoteDeviceNames BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN ProgressEmitterEnabled BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN CustomDefaultFolderPath BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN WeakHashSelection VARCHAR(32) NOT NULL DEFAULT '',
ADD COLUMN CustomTrafficClass BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN CustomTempIndexMinBlocks BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN TemporariesDisabled BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN TemporariesCustom BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN LimitBandwidthInLan BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN CustomReleaseURL BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN RestartOnWakeup BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN CustomStunServers BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN FolderScanProgressDisabled INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderConflictsDisabled INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderConflictsUnlimited INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderConflictsOther INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderDisableSparseFiles INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderDisableTempIndexes INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderAlwaysWeakHash INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderCustomWeakHashThreshold INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderFsWatcherEnabled INTEGER NOT NULL DEFAULT 0,
ADD COLUMN FolderPullOrder JSONB NOT NULL DEFAULT '{}',
ADD COLUMN FolderFilesystemType JSONB NOT NULL DEFAULT '{}',
ADD COLUMN FolderFsWatcherDelays INT[] NOT NULL DEFAULT '{}',
ADD COLUMN GUIEnabled INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUIUseTLS INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUIUseAuth INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUIInsecureAdminAccess INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUIDebugging INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUIInsecureSkipHostCheck INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUIInsecureAllowFrameLoading INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUIListenLocal INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUIListenUnspecified INTEGER NOT NULL DEFAULT 0,
ADD COLUMN GUITheme JSONB NOT NULL DEFAULT '{}',
ADD COLUMN BlocksTotal INTEGER NOT NULL DEFAULT 0,
ADD COLUMN BlocksRenamed INTEGER NOT NULL DEFAULT 0,
ADD COLUMN BlocksReused INTEGER NOT NULL DEFAULT 0,
ADD COLUMN BlocksPulled INTEGER NOT NULL DEFAULT 0,
ADD COLUMN BlocksCopyOrigin INTEGER NOT NULL DEFAULT 0,
ADD COLUMN BlocksCopyOriginShifted INTEGER NOT NULL DEFAULT 0,
ADD COLUMN BlocksCopyElsewhere INTEGER NOT NULL DEFAULT 0,
ADD COLUMN Transport JSONB NOT NULL DEFAULT '{}',
ADD COLUMN IgnoreLines INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IgnoreInverts INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IgnoreFolded INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IgnoreDeletable INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IgnoreRooted INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IgnoreIncludes INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IgnoreEscapedIncludes INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IgnoreDoubleStars INTEGER NOT NULL DEFAULT 0,
ADD COLUMN IgnoreStars INTEGER NOT NULL DEFAULT 0
`)
rows, err := tx.Query(`DELETE FROM Reports RETURNING ` + strings.Join(rep.FieldNames(), ", "))
if err != nil {
log.Println("sql:", err)
return err
}
}
defer rows.Close()
// V3 added late in the RC
var done uint64
pct := count / 100
row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'weakhashenabled'`)
if err := row.Scan(&t); err != nil {
// The WeakHashEnabled column doesn't exist; add the new columns.
_, err = db.Exec(`ALTER TABLE Reports
ADD COLUMN WeakHashEnabled BOOLEAN NOT NULL DEFAULT FALSE
ADD COLUMN Address VARCHAR(45) NOT NULL DEFAULT ''
`)
for rows.Next() {
err := rows.Scan(rep.FieldPointers()...)
if err != nil {
log.Println("sql scan:", err)
return err
}
_, err = stmt.Exec(rep.Received, rep)
if err != nil {
log.Println("sql insert:", err)
return err
}
done++
if done%pct == 0 {
log.Printf("Migration progress %d/%d (%d%%)", done, count, (100*done)/count)
}
}
// Tell the driver bulk copy is finished
_, err = stmt.Exec()
if err != nil {
log.Println("sql stmt exec:", err)
return err
}
}
// Receive only added ad-hoc
row = db.QueryRow(`SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'reports') AND attname = 'folderrecvonly'`)
if err := row.Scan(&t); err != nil {
// The RecvOnly column doesn't exist; add it.
_, err = db.Exec(`ALTER TABLE Reports
ADD COLUMN FolderRecvOnly INTEGER NOT NULL DEFAULT 0
`)
err = stmt.Close()
if err != nil {
log.Println("sql stmt close:", err)
return err
}
_, err = tx.Exec("DROP TABLE Reports")
if err != nil {
log.Println("sql drop:", err)
return err
}
err = tx.Commit()
if err != nil {
log.Println("sql commit:", err)
return err
}
}
@@ -303,15 +205,7 @@ func setupDB(db *sql.DB) error {
}
func insertReport(db *sql.DB, r contract.Report) error {
time := time.Now().UTC()
r.Received = &time
fields := r.FieldPointers()
params := make([]string, len(fields))
for i := range params {
params[i] = fmt.Sprintf("$%d", i+1)
}
query := "INSERT INTO Reports (" + strings.Join(r.FieldNames(), ", ") + ") VALUES (" + strings.Join(params, ", ") + ")"
_, err := db.Exec(query, fields...)
_, err := db.Exec("INSERT INTO ReportsJson (Report, Received) VALUES ($1, $2)", r, time.Now().UTC())
return err
}
@@ -319,9 +213,9 @@ func insertReport(db *sql.DB, r contract.Report) error {
type withDBFunc func(*sql.DB, http.ResponseWriter, *http.Request)
func withDB(db *sql.DB, f withDBFunc) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
f(db, w, r)
})
}
}
func main() {
@@ -744,7 +638,7 @@ func getReport(db *sql.DB) map[string]interface{} {
var rep contract.Report
rows, err := db.Query(`SELECT ` + strings.Join(rep.FieldNames(), ",") + ` FROM Reports WHERE Received > now() - '1 day'::INTERVAL`)
rows, err := db.Query(`SELECT Received, Report FROM ReportsJson WHERE Received > now() - '1 day'::INTERVAL`)
if err != nil {
log.Println("sql:", err)
return nil
@@ -752,7 +646,7 @@ func getReport(db *sql.DB) map[string]interface{} {
defer rows.Close()
for rows.Next() {
err := rows.Scan(rep.FieldPointers()...)
err := rows.Scan(&rep.Received, &rep)
if err != nil {
log.Println("sql:", err)

View File

@@ -42,9 +42,9 @@ func (p *IntMap) Scan(src interface{}) error {
type Report struct {
// Generated
Received *time.Time `json:"received,omitempty"` // Only from DB
Date string `json:"date,omitempty"`
Address string `json:"address,omitempty"`
Received time.Time // Only from DB
Date string `json:"date,omitempty"`
Address string `json:"address,omitempty"`
// v1 fields
@@ -409,6 +409,19 @@ func (r *Report) FieldNames() []string {
}
}
func (r Report) Value() (driver.Value, error) {
return json.Marshal(r)
}
func (r *Report) Scan(value interface{}) error {
b, ok := value.([]byte)
if !ok {
return errors.New("type assertion to []byte failed")
}
return json.Unmarshal(b, &r)
}
func clear(v interface{}, since int) error {
s := reflect.ValueOf(v).Elem()
t := s.Type()