From 9c7f92c81f5a3b54ba313cac1f384bf6716d6c7c Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Tue, 19 Aug 2025 19:37:46 +0200 Subject: [PATCH] feat(p2p): automatically sync installed models between instances (#6108) * feat(p2p): sync models between federated nodes This change makes sure that between federated nodes all the models are synced with each other. Note: this works exclusively with models belonging to a gallery. It does not sync files between the nodes, but rather it synces the node setup. E.g. All the nodes needs to have configured the same galleries and install models without any local editing. Signed-off-by: Ettore Di Giacinto * Make nodes stable Signed-off-by: Ettore Di Giacinto * Fixups on syncing Signed-off-by: Ettore Di Giacinto * ui: improve p2p view Signed-off-by: Ettore Di Giacinto --------- Signed-off-by: Ettore Di Giacinto --- core/application/application.go | 20 +- core/application/startup.go | 14 +- core/cli/api/p2p.go | 11 +- core/cli/run.go | 8 +- core/explorer/discovery.go | 3 +- core/http/app.go | 16 +- core/http/elements/p2p.go | 105 ++++--- core/http/endpoints/openai/realtime.go | 6 +- core/http/routes/openai.go | 18 +- core/http/views/p2p.html | 394 ++++++++++++++++++++++--- core/p2p/federated_server.go | 3 +- core/p2p/node.go | 38 +-- core/p2p/p2p.go | 15 +- core/p2p/sync.go | 102 +++++++ core/schema/localai.go | 21 +- 15 files changed, 623 insertions(+), 151 deletions(-) create mode 100644 core/p2p/sync.go diff --git a/core/application/application.go b/core/application/application.go index d49260eae..c852566d7 100644 --- a/core/application/application.go +++ b/core/application/application.go @@ -2,6 +2,7 @@ package application import ( "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/services" "github.com/mudler/LocalAI/core/templates" "github.com/mudler/LocalAI/pkg/model" ) @@ -11,6 +12,7 @@ type Application struct { modelLoader *model.ModelLoader applicationConfig *config.ApplicationConfig templatesEvaluator *templates.Evaluator + galleryService *services.GalleryService } func newApplication(appConfig *config.ApplicationConfig) *Application { @@ -22,7 +24,7 @@ func newApplication(appConfig *config.ApplicationConfig) *Application { } } -func (a *Application) BackendLoader() *config.ModelConfigLoader { +func (a *Application) ModelConfigLoader() *config.ModelConfigLoader { return a.backendLoader } @@ -37,3 +39,19 @@ func (a *Application) ApplicationConfig() *config.ApplicationConfig { func (a *Application) TemplatesEvaluator() *templates.Evaluator { return a.templatesEvaluator } + +func (a *Application) GalleryService() *services.GalleryService { + return a.galleryService +} + +func (a *Application) start() error { + galleryService := services.NewGalleryService(a.ApplicationConfig(), a.ModelLoader()) + err := galleryService.Start(a.ApplicationConfig().Context, a.ModelConfigLoader(), a.ApplicationConfig().SystemState) + if err != nil { + return err + } + + a.galleryService = galleryService + + return nil +} diff --git a/core/application/startup.go b/core/application/startup.go index 8ebd44071..69238d54f 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -68,7 +68,7 @@ func New(opts ...config.AppOption) (*Application, error) { configLoaderOpts := options.ToConfigLoaderOptions() - if err := application.BackendLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil { + if err := application.ModelConfigLoader().LoadModelConfigsFromPath(options.SystemState.Model.ModelsPath, configLoaderOpts...); err != nil { log.Error().Err(err).Msg("error loading config files") } @@ -77,12 +77,12 @@ func New(opts ...config.AppOption) (*Application, error) { } if options.ConfigFile != "" { - if err := application.BackendLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil { + if err := application.ModelConfigLoader().LoadMultipleModelConfigsSingleFile(options.ConfigFile, configLoaderOpts...); err != nil { log.Error().Err(err).Msg("error loading config file") } } - if err := application.BackendLoader().Preload(options.SystemState.Model.ModelsPath); err != nil { + if err := application.ModelConfigLoader().Preload(options.SystemState.Model.ModelsPath); err != nil { log.Error().Err(err).Msg("error downloading models") } @@ -99,7 +99,7 @@ func New(opts ...config.AppOption) (*Application, error) { } if options.Debug { - for _, v := range application.BackendLoader().GetAllModelsConfigs() { + for _, v := range application.ModelConfigLoader().GetAllModelsConfigs() { log.Debug().Msgf("Model: %s (config: %+v)", v.Name, v) } } @@ -132,7 +132,7 @@ func New(opts ...config.AppOption) (*Application, error) { if options.LoadToMemory != nil && !options.SingleBackend { for _, m := range options.LoadToMemory { - cfg, err := application.BackendLoader().LoadModelConfigFileByNameDefaultOptions(m, options) + cfg, err := application.ModelConfigLoader().LoadModelConfigFileByNameDefaultOptions(m, options) if err != nil { return nil, err } @@ -152,6 +152,10 @@ func New(opts ...config.AppOption) (*Application, error) { // Watch the configuration directory startWatcher(options) + if err := application.start(); err != nil { + return nil, err + } + log.Info().Msg("core/startup process completed!") return application, nil } diff --git a/core/cli/api/p2p.go b/core/cli/api/p2p.go index a2ecfe3fe..9e94e94d6 100644 --- a/core/cli/api/p2p.go +++ b/core/cli/api/p2p.go @@ -7,13 +7,15 @@ import ( "os" "strings" + "github.com/mudler/LocalAI/core/application" "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/edgevpn/pkg/node" "github.com/rs/zerolog/log" ) -func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool) error { +func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool, app *application.Application) error { var n *node.Node // Here we are avoiding creating multiple nodes: // - if the federated mode is enabled, we create a federated node and expose a service @@ -39,6 +41,11 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa } n = node + + // start node sync in the background + if err := p2p.Sync(ctx, node, app); err != nil { + return err + } } // If the p2p mode is enabled, we start the service discovery @@ -58,7 +65,7 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa // Attach a ServiceDiscoverer to the p2p node log.Info().Msg("Starting P2P server discovery...") - if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) { + if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node schema.NodeData) { var tunnelAddresses []string for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) { if v.IsOnline() { diff --git a/core/cli/run.go b/core/cli/run.go index eb94becd7..96bb203a9 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -144,10 +144,6 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { backgroundCtx := context.Background() - if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated); err != nil { - return err - } - idleWatchDog := r.EnableWatchdogIdle busyWatchDog := r.EnableWatchdogBusy @@ -216,5 +212,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { return err } + if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated, app); err != nil { + return err + } + return appHTTP.Listen(r.Address) } diff --git a/core/explorer/discovery.go b/core/explorer/discovery.go index fe6470cb8..454614172 100644 --- a/core/explorer/discovery.go +++ b/core/explorer/discovery.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog/log" "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/edgevpn/pkg/blockchain" ) @@ -177,7 +178,7 @@ func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockch atLeastOneWorker := false DATA: for _, v := range data[d] { - nd := &p2p.NodeData{} + nd := &schema.NodeData{} if err := v.Unmarshal(nd); err != nil { continue DATA } diff --git a/core/http/app.go b/core/http/app.go index c73e94308..09f068834 100644 --- a/core/http/app.go +++ b/core/http/app.go @@ -197,21 +197,15 @@ func API(application *application.Application) (*fiber.App, error) { router.Use(csrf.New()) } - galleryService := services.NewGalleryService(application.ApplicationConfig(), application.ModelLoader()) - err = galleryService.Start(application.ApplicationConfig().Context, application.BackendLoader(), application.ApplicationConfig().SystemState) - if err != nil { - return nil, err - } + requestExtractor := middleware.NewRequestExtractor(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()) - requestExtractor := middleware.NewRequestExtractor(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()) - - routes.RegisterElevenLabsRoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()) - routes.RegisterLocalAIRoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig(), galleryService) + routes.RegisterElevenLabsRoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()) + routes.RegisterLocalAIRoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService()) routes.RegisterOpenAIRoutes(router, requestExtractor, application) if !application.ApplicationConfig().DisableWebUI { - routes.RegisterUIRoutes(router, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig(), galleryService) + routes.RegisterUIRoutes(router, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), application.GalleryService()) } - routes.RegisterJINARoutes(router, requestExtractor, application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()) + routes.RegisterJINARoutes(router, requestExtractor, application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()) // Define a custom 404 handler // Note: keep this at the bottom! diff --git a/core/http/elements/p2p.go b/core/http/elements/p2p.go index 6c0a5a577..4f191772f 100644 --- a/core/http/elements/p2p.go +++ b/core/http/elements/p2p.go @@ -7,7 +7,7 @@ import ( "github.com/chasefleming/elem-go" "github.com/chasefleming/elem-go/attrs" "github.com/microcosm-cc/bluemonday" - "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/LocalAI/core/schema" ) func renderElements(n []elem.Node) string { @@ -18,7 +18,7 @@ func renderElements(n []elem.Node) string { return render } -func P2PNodeStats(nodes []p2p.NodeData) string { +func P2PNodeStats(nodes []schema.NodeData) string { online := 0 for _, n := range nodes { if n.IsOnline() { @@ -26,15 +26,17 @@ func P2PNodeStats(nodes []p2p.NodeData) string { } } - class := "text-blue-400" + class := "text-green-400" if online == 0 { class = "text-red-400" + } else if online < len(nodes) { + class = "text-yellow-400" } nodesElements := []elem.Node{ elem.Span( attrs.Props{ - "class": class + " font-bold text-xl", + "class": class + " font-bold text-2xl", }, elem.Text(fmt.Sprintf("%d", online)), ), @@ -49,9 +51,16 @@ func P2PNodeStats(nodes []p2p.NodeData) string { return renderElements(nodesElements) } -func P2PNodeBoxes(nodes []p2p.NodeData) string { - nodesElements := []elem.Node{} +func P2PNodeBoxes(nodes []schema.NodeData) string { + if len(nodes) == 0 { + return `
+ +

No nodes available

+

Start some workers to see them here

+
` + } + render := "" for _, n := range nodes { nodeID := bluemonday.StrictPolicy().Sanitize(n.ID) @@ -59,67 +68,89 @@ func P2PNodeBoxes(nodes []p2p.NodeData) string { statusIconClass := "text-green-400" statusText := "Online" statusTextClass := "text-green-400" + cardHoverClass := "hover:shadow-green-500/20 hover:border-green-400/50" if !n.IsOnline() { statusIconClass = "text-red-400" statusText = "Offline" statusTextClass = "text-red-400" + cardHoverClass = "hover:shadow-red-500/20 hover:border-red-400/50" } - nodesElements = append(nodesElements, + nodeCard := elem.Div( + attrs.Props{ + "class": "bg-gradient-to-br from-gray-800/90 to-gray-900/80 border border-gray-700/50 rounded-xl p-5 shadow-xl transition-all duration-300 " + cardHoverClass + " backdrop-blur-sm", + }, + // Header with node icon and status elem.Div( attrs.Props{ - "class": "bg-gray-800/80 border border-gray-700/50 rounded-xl p-4 shadow-lg transition-all duration-300 hover:shadow-blue-900/20 hover:border-blue-700/50", + "class": "flex items-center justify-between mb-4", }, - // Node ID and status indicator in top row + // Node info elem.Div( attrs.Props{ - "class": "flex items-center justify-between mb-3", + "class": "flex items-center", }, - // Node ID with icon elem.Div( attrs.Props{ - "class": "flex items-center", + "class": "w-10 h-10 bg-blue-500/20 rounded-lg flex items-center justify-center mr-3", }, elem.I( attrs.Props{ - "class": "fas fa-server text-blue-400 mr-2", + "class": "fas fa-server text-blue-400 text-lg", }, ), - elem.Span( + ), + elem.Div( + attrs.Props{}, + elem.H4( attrs.Props{ - "class": "text-white font-medium", + "class": "text-white font-semibold text-sm", + }, + elem.Text("Node"), + ), + elem.P( + attrs.Props{ + "class": "text-gray-400 text-xs font-mono break-all", }, elem.Text(nodeID), ), ), - // Status indicator - elem.Div( - attrs.Props{ - "class": "flex items-center", - }, - elem.I( - attrs.Props{ - "class": "fas fa-circle animate-pulse " + statusIconClass + " mr-1.5", - }, - ), - elem.Span( - attrs.Props{ - "class": statusTextClass, - }, - elem.Text(statusText), - ), - ), ), - // Bottom section with timestamp + // Status badge elem.Div( attrs.Props{ - "class": "text-xs text-gray-400 pt-1 border-t border-gray-700/30", + "class": "flex items-center bg-gray-900/50 rounded-full px-3 py-1.5 border border-gray-700/50", }, - elem.Text("Last updated: "+time.Now().UTC().Format("2006-01-02 15:04:05")), + elem.I( + attrs.Props{ + "class": "fas fa-circle animate-pulse " + statusIconClass + " mr-2 text-xs", + }, + ), + elem.Span( + attrs.Props{ + "class": statusTextClass + " text-xs font-medium", + }, + elem.Text(statusText), + ), ), - )) + ), + // Footer with timestamp + elem.Div( + attrs.Props{ + "class": "text-xs text-gray-500 pt-3 border-t border-gray-700/30 flex items-center", + }, + elem.I( + attrs.Props{ + "class": "fas fa-clock mr-2", + }, + ), + elem.Text("Updated: "+time.Now().UTC().Format("15:04:05")), + ), + ) + + render += nodeCard.Render() } - return renderElements(nodesElements) + return render } diff --git a/core/http/endpoints/openai/realtime.go b/core/http/endpoints/openai/realtime.go index 8f330e873..2e692b52a 100644 --- a/core/http/endpoints/openai/realtime.go +++ b/core/http/endpoints/openai/realtime.go @@ -239,7 +239,7 @@ func registerRealtime(application *application.Application) func(c *websocket.Co m, cfg, err := newTranscriptionOnlyModel( &pipeline, - application.BackendLoader(), + application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), ) @@ -313,7 +313,7 @@ func registerRealtime(application *application.Application) func(c *websocket.Co if err := updateTransSession( session, &sessionUpdate, - application.BackendLoader(), + application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), ); err != nil { @@ -342,7 +342,7 @@ func registerRealtime(application *application.Application) func(c *websocket.Co if err := updateSession( session, &sessionUpdate, - application.BackendLoader(), + application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig(), ); err != nil { diff --git a/core/http/routes/openai.go b/core/http/routes/openai.go index 8a2789407..e4e31f700 100644 --- a/core/http/routes/openai.go +++ b/core/http/routes/openai.go @@ -26,7 +26,7 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildFilteredFirstAvailableDefaultModel(config.BuildUsecaseFilterFn(config.FLAG_CHAT)), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.ChatEndpoint(application.BackendLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), + openai.ChatEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), } app.Post("/v1/chat/completions", chatChain...) app.Post("/chat/completions", chatChain...) @@ -37,7 +37,7 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildConstantDefaultModelNameMiddleware("gpt-4o"), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.EditEndpoint(application.BackendLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), + openai.EditEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), } app.Post("/v1/edits", editChain...) app.Post("/edits", editChain...) @@ -48,7 +48,7 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildConstantDefaultModelNameMiddleware("gpt-4o"), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.CompletionEndpoint(application.BackendLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), + openai.CompletionEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.TemplatesEvaluator(), application.ApplicationConfig()), } app.Post("/v1/completions", completionChain...) app.Post("/completions", completionChain...) @@ -60,7 +60,7 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildConstantDefaultModelNameMiddleware("gpt-4o"), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.EmbeddingsEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()), + openai.EmbeddingsEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()), } app.Post("/v1/embeddings", embeddingChain...) app.Post("/embeddings", embeddingChain...) @@ -71,22 +71,22 @@ func RegisterOpenAIRoutes(app *fiber.App, re.BuildFilteredFirstAvailableDefaultModel(config.BuildUsecaseFilterFn(config.FLAG_TRANSCRIPT)), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.TranscriptEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig()), + openai.TranscriptEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig()), ) app.Post("/v1/audio/speech", re.BuildFilteredFirstAvailableDefaultModel(config.BuildUsecaseFilterFn(config.FLAG_TTS)), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.TTSRequest) }), - localai.TTSEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())) + localai.TTSEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())) // images app.Post("/v1/images/generations", re.BuildConstantDefaultModelNameMiddleware("stablediffusion"), re.SetModelAndConfig(func() schema.LocalAIRequest { return new(schema.OpenAIRequest) }), re.SetOpenAIRequest, - openai.ImageEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())) + openai.ImageEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())) // List models - app.Get("/v1/models", openai.ListModelsEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())) - app.Get("/models", openai.ListModelsEndpoint(application.BackendLoader(), application.ModelLoader(), application.ApplicationConfig())) + app.Get("/v1/models", openai.ListModelsEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())) + app.Get("/models", openai.ListModelsEndpoint(application.ModelConfigLoader(), application.ModelLoader(), application.ApplicationConfig())) } diff --git a/core/http/views/p2p.html b/core/http/views/p2p.html index bd6324bf6..87fead0a9 100644 --- a/core/http/views/p2p.html +++ b/core/http/views/p2p.html @@ -15,11 +15,11 @@

- Distributed inference with P2P + Distributed AI Computing

- Distribute computation by sharing and loadbalancing instances or sharding model weights + Scale your AI workloads across multiple devices with peer-to-peer distribution @@ -27,10 +27,90 @@

-
-

- LocalAI uses P2P technologies to enable distribution of work between peers. It is possible to share an instance with Federation and/or split the weights of a model across peers (only available with llama.cpp models). You can now share computational resources between your devices or your friends! -

+ +
+ +
+
+
+
+ +
+
+

+ + How P2P Distribution Works + +

+

+ LocalAI leverages cutting-edge peer-to-peer technologies to distribute AI workloads intelligently across your network +

+
+ + +
+ +
+
+ +
+

Instance Federation

+

+ Share complete LocalAI instances across your network for load balancing and redundancy. Perfect for scaling across multiple devices. +

+
+ + +
+
+ +
+

Model Sharding

+

+ Split large model weights across multiple workers. Currently supported with llama.cpp backends for efficient memory usage. +

+
+ + +
+
+ +
+

Resource Sharing

+

+ Pool computational resources from multiple devices, including your friends' machines, to handle larger workloads collaboratively. +

+
+
+ + +
+
+
+ Faster +
+

Parallel processing

+
+
+
+ Scalable +
+

Add more nodes

+
+
+
+ Resilient +
+

Fault tolerant

+
+
+
+ Efficient +
+

Resource optimization

+
+
+
@@ -64,21 +144,106 @@
{{ else }} - -
-
-
- -

Federated Nodes: - -

+ +
+ +
+
+
+
+ +
+
+

Federation

+

Instance sharing

+
+
+
+
+

nodes

+
-

- You can start LocalAI in federated mode to share your instance, or start the federated server to balance requests between nodes of the federation. -

+
+ + Load balanced instances +
+
-
-
+ +
+
+
+
+ +
+
+

Workers

+

Model sharding

+
+
+
+
+

workers

+
+
+
+ + Distributed computation +
+
+ + +
+
+
+
+ +
+
+

Network

+

Connection token

+
+
+ +
+
+ + Ready to connect +
+
+
+ + +
+
+
+
+
+ +
+
+

Federation Network

+

Instance load balancing and sharing

+
+
+
+
Active Nodes
+
+
+
+ +
+

+ + Start LocalAI in federated mode to share your instance, or launch a federated server to distribute requests intelligently across multiple nodes in your network. +

+
+ + +
+
@@ -168,38 +333,52 @@ docker run -ti --net host -e TOKEN="{{.P2PToken}}" --
- -
-
-
- -

Workers (llama.cpp): - -

+ +
+
+
+
+
+ +
+
+

Worker Network

+

Distributed model computation (llama.cpp)

+
+
+
+
Active Workers
+
+
+
+ +
+

+ + Deploy llama.cpp workers to split model weights across multiple devices. This enables processing larger models by distributing computational load and memory requirements. +

-

- You can start llama.cpp workers to distribute weights between the workers and offload part of the computation. To start a new worker, you can use the CLI or Docker. -

-
-
+ +
+
-
+

- Start a new llama.cpp P2P worker + Start a new llama.cpp worker

    @@ -221,7 +400,7 @@ docker run -ti --net host -e TOKEN="{{.P2PToken}}" -- export TOKEN="{{.P2PToken}}"
    local-ai worker p2p-llama-cpp-rpc -

    For all the options available, please refer to the documentation.

    +

    For all the options available, please refer to the documentation.

@@ -256,23 +435,148 @@ docker run -ti --net host -e TOKEN="{{.P2PToken}}" -- .token { word-break: break-all; } - .workers .grid div { - display: flex; - flex-direction: column; - justify-content: space-between; + + /* Enhanced scrollbar styling */ + .scrollbar-thin::-webkit-scrollbar { + width: 6px; } + + .scrollbar-thin::-webkit-scrollbar-track { + background: rgba(31, 41, 55, 0.5); + border-radius: 6px; + } + + .scrollbar-thin::-webkit-scrollbar-thumb { + background: rgba(107, 114, 128, 0.5); + border-radius: 6px; + } + + .scrollbar-thin::-webkit-scrollbar-thumb:hover { + background: rgba(107, 114, 128, 0.8); + } + + /* Animation enhancements */ .fa-circle-nodes { animation: pulseGlow 2s ease-in-out infinite; } - @keyframes pulseGlow { - 0%, 100% { filter: drop-shadow(0 0 2px rgba(96, 165, 250, 0.3)); } - 50% { filter: drop-shadow(0 0 8px rgba(96, 165, 250, 0.7)); } + + .fa-puzzle-piece { + animation: rotateGlow 3s ease-in-out infinite; } + + @keyframes pulseGlow { + 0%, 100% { + filter: drop-shadow(0 0 2px rgba(96, 165, 250, 0.3)); + transform: scale(1); + } + 50% { + filter: drop-shadow(0 0 8px rgba(96, 165, 250, 0.7)); + transform: scale(1.05); + } + } + + @keyframes rotateGlow { + 0%, 100% { + filter: drop-shadow(0 0 2px rgba(147, 51, 234, 0.3)); + transform: rotate(0deg) scale(1); + } + 33% { + filter: drop-shadow(0 0 6px rgba(147, 51, 234, 0.6)); + transform: rotate(10deg) scale(1.05); + } + 66% { + filter: drop-shadow(0 0 4px rgba(147, 51, 234, 0.4)); + transform: rotate(-5deg) scale(1.02); + } + } + + /* Copy button enhancements */ .copy-icon:hover, button:hover .fa-copy { color: #60a5fa; transform: scale(1.1); transition: all 0.2s ease; } + + /* Node card hover effects */ + .workers .grid > div { + transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1); + } + + .workers .grid > div:hover { + transform: translateY(-4px); + box-shadow: 0 20px 40px rgba(0, 0, 0, 0.3); + } + + /* Status indicator animations */ + .animate-pulse { + animation: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite; + } + + @keyframes pulse { + 0%, 100% { + opacity: 1; + } + 50% { + opacity: .5; + } + } + + /* Enhanced tab styling */ + .tablink { + position: relative; + overflow: hidden; + } + + .tablink::before { + content: ''; + position: absolute; + top: 0; + left: -100%; + width: 100%; + height: 100%; + background: linear-gradient(90deg, transparent, rgba(255, 255, 255, 0.1), transparent); + transition: left 0.5s ease; + } + + .tablink:hover::before { + left: 100%; + } + + /* Loading spinner for HTMX */ + .htmx-indicator { + display: none; + } + + .htmx-request .htmx-indicator { + display: inline; + } + + /* Card gradient overlays */ + .card-overlay { + background: linear-gradient(135deg, rgba(59, 130, 246, 0.1) 0%, rgba(99, 102, 241, 0.1) 100%); + } + + /* Enhanced button styles */ + button[onclick*="copyClipboard"] { + transition: all 0.2s ease; + backdrop-filter: blur(8px); + } + + button[onclick*="copyClipboard"]:hover { + transform: scale(1.05); + box-shadow: 0 4px 12px rgba(59, 130, 246, 0.3); + } + + /* Code block enhancements */ + code { + position: relative; + transition: all 0.2s ease; + } + + code:hover { + box-shadow: 0 4px 12px rgba(234, 179, 8, 0.2); + border-color: rgba(234, 179, 8, 0.3) !important; + } diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go index e382576ba..6f5cfb053 100644 --- a/core/p2p/federated_server.go +++ b/core/p2p/federated_server.go @@ -7,6 +7,7 @@ import ( "io" "net" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/edgevpn/pkg/node" "github.com/rs/zerolog/log" ) @@ -21,7 +22,7 @@ func (f *FederatedServer) Start(ctx context.Context) error { return fmt.Errorf("creating a new node: %w", err) } - if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) { + if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel schema.NodeData) { log.Debug().Msgf("Discovered node: %s", tunnel.ID) }, false); err != nil { return err diff --git a/core/p2p/node.go b/core/p2p/node.go index 6c43dde00..78efb77ca 100644 --- a/core/p2p/node.go +++ b/core/p2p/node.go @@ -1,8 +1,11 @@ package p2p import ( + "slices" + "strings" "sync" - "time" + + "github.com/mudler/LocalAI/core/schema" ) const ( @@ -10,57 +13,48 @@ const ( WorkerID = "worker" ) -type NodeData struct { - Name string - ID string - TunnelAddress string - ServiceID string - LastSeen time.Time -} - -func (d NodeData) IsOnline() bool { - now := time.Now() - // if the node was seen in the last 40 seconds, it's online - return now.Sub(d.LastSeen) < 40*time.Second -} - var mu sync.Mutex -var nodes = map[string]map[string]NodeData{} +var nodes = map[string]map[string]schema.NodeData{} -func GetAvailableNodes(serviceID string) []NodeData { +func GetAvailableNodes(serviceID string) []schema.NodeData { if serviceID == "" { serviceID = defaultServicesID } mu.Lock() defer mu.Unlock() - var availableNodes = []NodeData{} + var availableNodes = []schema.NodeData{} for _, v := range nodes[serviceID] { availableNodes = append(availableNodes, v) } + + slices.SortFunc(availableNodes, func(a, b schema.NodeData) int { + return strings.Compare(a.ID, b.ID) + }) + return availableNodes } -func GetNode(serviceID, nodeID string) (NodeData, bool) { +func GetNode(serviceID, nodeID string) (schema.NodeData, bool) { if serviceID == "" { serviceID = defaultServicesID } mu.Lock() defer mu.Unlock() if _, ok := nodes[serviceID]; !ok { - return NodeData{}, false + return schema.NodeData{}, false } nd, exists := nodes[serviceID][nodeID] return nd, exists } -func AddNode(serviceID string, node NodeData) { +func AddNode(serviceID string, node schema.NodeData) { if serviceID == "" { serviceID = defaultServicesID } mu.Lock() defer mu.Unlock() if nodes[serviceID] == nil { - nodes[serviceID] = map[string]NodeData{} + nodes[serviceID] = map[string]schema.NodeData{} } nodes[serviceID][node.ID] = node } diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index e21c5e2f7..ec550eb1f 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p/core/peer" + "github.com/mudler/LocalAI/core/schema" "github.com/mudler/LocalAI/pkg/utils" "github.com/mudler/edgevpn/pkg/config" "github.com/mudler/edgevpn/pkg/node" @@ -169,7 +170,7 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services -func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error { +func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node schema.NodeData), allocate bool) error { if servicesID == "" { servicesID = defaultServicesID } @@ -200,8 +201,8 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri return nil } -func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) { - tunnels := make(chan NodeData) +func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan schema.NodeData, error) { + tunnels := make(chan schema.NodeData) ledger, err := n.Ledger() if err != nil { @@ -234,7 +235,7 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin for k, v := range data { // New worker found in the ledger data as k (worker id) - nd := &NodeData{} + nd := &schema.NodeData{} if err := v.Unmarshal(nd); err != nil { zlog.Error().Msg("cannot unmarshal node data") continue @@ -254,14 +255,14 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin } type nodeServiceData struct { - NodeData NodeData + NodeData schema.NodeData CancelFunc context.CancelFunc } var service = map[string]nodeServiceData{} var muservice sync.Mutex -func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) { +func ensureService(ctx context.Context, n *node.Node, nd *schema.NodeData, sserv string, allocate bool) { muservice.Lock() defer muservice.Unlock() nd.ServiceID = sserv @@ -346,7 +347,7 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) (* 20*time.Second, func() { updatedMap := map[string]interface{}{} - updatedMap[name] = &NodeData{ + updatedMap[name] = &schema.NodeData{ Name: name, LastSeen: time.Now(), ID: nodeID(name), diff --git a/core/p2p/sync.go b/core/p2p/sync.go new file mode 100644 index 000000000..f9be422a6 --- /dev/null +++ b/core/p2p/sync.go @@ -0,0 +1,102 @@ +package p2p + +import ( + "context" + "slices" + "time" + + "github.com/google/uuid" + "github.com/mudler/LocalAI/core/application" + "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/services" + + "github.com/mudler/edgevpn/pkg/node" + zlog "github.com/rs/zerolog/log" +) + +func syncState(ctx context.Context, n *node.Node, app *application.Application) error { + zlog.Debug().Msg("[p2p-sync] Syncing state") + + whatWeHave := []string{} + for _, model := range app.ModelConfigLoader().GetAllModelsConfigs() { + whatWeHave = append(whatWeHave, model.Name) + } + + ledger, _ := n.Ledger() + currentData := ledger.CurrentData() + zlog.Debug().Msgf("[p2p-sync] Current data: %v", currentData) + data, exists := ledger.GetKey("shared_state", "models") + if !exists { + ledger.AnnounceUpdate(ctx, time.Minute, "shared_state", "models", whatWeHave) + zlog.Debug().Msgf("No models found in the ledger, announced our models: %v", whatWeHave) + } + + models := []string{} + if err := data.Unmarshal(&models); err != nil { + zlog.Warn().Err(err).Msg("error unmarshalling models") + return nil + } + + zlog.Debug().Msgf("[p2p-sync] Models that are present in this instance: %v\nModels that are in the ledger: %v", whatWeHave, models) + + // Sync with our state + whatIsNotThere := []string{} + for _, model := range whatWeHave { + if !slices.Contains(models, model) { + whatIsNotThere = append(whatIsNotThere, model) + } + } + if len(whatIsNotThere) > 0 { + zlog.Debug().Msgf("[p2p-sync] Announcing our models: %v", append(models, whatIsNotThere...)) + ledger.AnnounceUpdate( + ctx, + 1*time.Minute, + "shared_state", + "models", + append(models, whatIsNotThere...), + ) + } + + // Check if we have a model that is not in our state, otherwise install it + for _, model := range models { + if slices.Contains(whatWeHave, model) { + zlog.Debug().Msgf("[p2p-sync] Model %s is already present in this instance", model) + continue + } + + // we install model + zlog.Info().Msgf("[p2p-sync] Installing model which is not present in this instance: %s", model) + + uuid, err := uuid.NewUUID() + if err != nil { + zlog.Error().Err(err).Msg("error generating UUID") + continue + } + + app.GalleryService().ModelGalleryChannel <- services.GalleryOp[gallery.GalleryModel]{ + ID: uuid.String(), + GalleryElementName: model, + Galleries: app.ApplicationConfig().Galleries, + BackendGalleries: app.ApplicationConfig().BackendGalleries, + } + } + + return nil +} + +func Sync(ctx context.Context, n *node.Node, app *application.Application) error { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Minute): + if err := syncState(ctx, n, app); err != nil { + zlog.Error().Err(err).Msg("error syncing state") + } + } + + } + }() + return nil +} diff --git a/core/schema/localai.go b/core/schema/localai.go index d093faafe..5949e743d 100644 --- a/core/schema/localai.go +++ b/core/schema/localai.go @@ -1,7 +1,8 @@ package schema import ( - "github.com/mudler/LocalAI/core/p2p" + "time" + gopsutil "github.com/shirou/gopsutil/v3/process" ) @@ -107,9 +108,23 @@ type StoresFindResponse struct { Similarities []float32 `json:"similarities" yaml:"similarities"` } +type NodeData struct { + Name string + ID string + TunnelAddress string + ServiceID string + LastSeen time.Time +} + +func (d NodeData) IsOnline() bool { + now := time.Now() + // if the node was seen in the last 40 seconds, it's online + return now.Sub(d.LastSeen) < 40*time.Second +} + type P2PNodesResponse struct { - Nodes []p2p.NodeData `json:"nodes" yaml:"nodes"` - FederatedNodes []p2p.NodeData `json:"federated_nodes" yaml:"federated_nodes"` + Nodes []NodeData `json:"nodes" yaml:"nodes"` + FederatedNodes []NodeData `json:"federated_nodes" yaml:"federated_nodes"` } type SysInfoModel struct {