Files
kde-linux/uploader/main.go
Harald Sitter 4c69b76046 uploader: do not get stuck when all channels are full
because we fed the calcs channel from the main thread we'd eventually
get stuck on the calcs workers waiting for space in the results channel
but there'd never be space because the results are processed on the main
thread and that is busy feeding calcs ... instead buffer the channels
and also make sure to feed them from a goroutine so we are starting
processing results while still producing calcs.

I feel like this may be an anti pattern and maybe one should put
everything in a routine and synchronize them all in the main thread.
would prevent this sort of nonesense at least
2026-02-17 19:57:13 +01:00

199 lines
5.0 KiB
Go

// SPDX-License-Identifier: GPL-2.0-only OR GPL-3.0-only OR LicenseRef-KDE-Accepted-GPL
// SPDX-FileCopyrightText: 2024-2025 Harald Sitter <sitter@kde.org>
package main
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"flag"
"io"
"log"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
func connectToMinIO(endpoint string) *minio.Client {
awsSection, err := readConfigAWS("default")
if err != nil {
log.Fatalln("Failed to read AWS config:", err)
}
accessKeyID := awsSection.AccessKeyId
if accessKeyID == "" {
log.Fatalln("AWS access key ID is empty")
}
secretAccessKey := awsSection.SecretKey
if secretAccessKey == "" {
log.Fatalln("AWS secret access key is empty")
}
sessionToken := awsSection.SessionToken
if secretAccessKey == "" {
log.Fatalln("AWS session token is empty")
}
useSSL := true
// Initialize minio client object.
minioClient, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, sessionToken),
Secure: useSSL,
TrailingHeaders: true,
})
if err != nil {
log.Fatalln("Failed to create MinIO client:", err)
}
buckets, err := minioClient.ListBuckets(context.Background())
if err != nil {
log.Fatalln("Failed to list buckets:", err)
}
for _, bucket := range buckets {
log.Println(bucket)
}
return minioClient
}
func sha256File(path string) string {
file, err := os.Open(path)
if err != nil {
log.Fatalf("unable to open file %s: %v", path, err)
}
defer file.Close()
hasher := sha256.New()
if _, err := io.Copy(hasher, file); err != nil {
log.Fatalf("unable to hash file %s: %v", path, err)
}
return hex.EncodeToString(hasher.Sum(nil))
}
type UploadObject struct {
ObjectName string
Path string
SHA256 string
}
func upload(client *minio.Client, bucket string, objectNamePrefix string) {
dir := "../upload-tree"
// Somewhat involved producer-consumer going on here.
// - We start N sha256 calculation workers.
// - We then feed the calcs channel with partial UploadObjects.
// - Close the calcs channel to eventually let the workers drain and finish.
// - The workers calculate the sha and then push the complete objects into the results channel.
// - Once the calcs waitgroup is done we close the results channel as well to let the uploader drain and finish.
calcs := make(chan UploadObject, 32) // mind that the size causes this channel to buffer which prevents deadlocks
results := make(chan UploadObject, 32)
calcsWg := sync.WaitGroup{}
// The sha256 calculation workers.
for i := 0; i < 4; i++ {
calcsWg.Add(1)
go func() {
defer calcsWg.Done()
for obj := range calcs {
obj.SHA256 = sha256File(obj.Path)
results <- obj
}
}()
}
// Once they are all done we can close the results channel.
go func() {
calcsWg.Wait()
log.Println("All SHA256 calculations done, closing results channel")
close(results)
}()
// The calcs channel gets fed by the walking of the upload tree.
go func() {
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
objectName, err := filepath.Rel(dir, path)
if err != nil {
return err
}
objectName = filepath.Join(objectNamePrefix, objectName)
if objectName == "" {
return errors.New("object name cannot be empty")
}
if d.IsDir() {
return nil
}
calcs <- UploadObject{
ObjectName: objectName,
Path: path,
}
return nil
})
log.Println("Finished walking upload tree, closing calcs channel")
close(calcs)
if err != nil {
log.Fatalln(err)
}
}()
// Upload. This loop only ends when results is closed and drained.
for res := range results {
log.Println("Uploading", res.ObjectName, "from", res.Path)
_, err := client.FPutObject(context.Background(), bucket, res.ObjectName, res.Path, minio.PutObjectOptions{
UserMetadata: map[string]string{
"X-KDE-SHA256": res.SHA256,
},
})
if err != nil {
log.Fatalln(err)
}
}
}
func main() {
remote := flag.String("remote", "", "remote url to upload to, e.g. s3+https://storage.kde.org/kde-linux/sysupdate/v2/store")
flag.Parse()
remoteURI, err := url.Parse(*remote)
if err != nil {
log.Fatalln("Failed to parse remote URL:", err)
}
if remoteURI.Scheme != "s3+https" {
log.Fatalln("Unsupported remote scheme:", remoteURI.Scheme)
}
if remoteURI.Host != "storage.kde.org" {
log.Fatalln("Unsupported remote host:", remoteURI.Host)
}
parts := strings.SplitN(remoteURI.Path[1:], "/", 2)
if len(parts) != 2 {
log.Fatalln("Invalid remote path, expected format: /bucket/path")
}
bucket := parts[0]
path := parts[1]
if bucket == "" {
log.Fatalln("Invalid remote path, expected format: /bucket/path")
}
if path == "" {
log.Println("Warning: path is empty, uploading to bucket root")
path = "/"
}
log.Println("Connecting to MinIO at", remoteURI.Host)
minioClient := connectToMinIO(remoteURI.Host)
log.Println("Uploading to bucket", bucket, "with path prefix", path)
upload(minioClient, bucket, path)
}