Compare commits

...

5 Commits

Author SHA1 Message Date
Alex Cheema
46ee72dbb0 Merge branch 'main' into cancel-tasks-on-instance-delete 2026-02-19 05:35:31 -08:00
Mustafa Alp Yılmaz
2e29605194 fix: finalize cancel tasks (#1498)
# Cancel task finalization (main.py)

After forwarding the cancel to the runner supervisor, emit TaskStatusUpdated(Complete) for the cancel task itself. This ensures the cancel task is properly removed from state.tasks.
2026-02-19 13:27:34 +00:00
Evan Quiney
cacb456cb2 remove nightly (#1538)
we have no good need for rust nightly (nor futures, for that matter)
2026-02-19 12:55:31 +00:00
Alex Cheema
7d42ded97f test: verify instance deletion cancels ongoing tasks (#1215)
The cancellation logic already exists in get_transition_events() but had
no test coverage. Add tests confirming that deleting an instance emits
TaskStatusUpdated(Cancelled) events for all Pending/Running tasks on
that instance while leaving tasks on other instances untouched.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 10:05:08 -08:00
Alex Cheema
e84d570291 dashboard: show failure reasons for runner and download errors (#1350)
Surface error messages that the backend already captures but the
dashboard was discarding. Runner failures now extract errorMessage from
RunnerFailed status, and download failures display errorMessage from
DownloadFailed payloads on both the main page and downloads page.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 09:49:31 -08:00
22 changed files with 265 additions and 304 deletions

13
Cargo.lock generated
View File

@@ -890,7 +890,7 @@ dependencies = [
"delegate",
"env_logger",
"extend",
"futures",
"futures-lite",
"libp2p",
"log",
"networking",
@@ -914,6 +914,12 @@ dependencies = [
"syn 2.0.111",
]
[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "ff"
version = "0.13.1"
@@ -1022,7 +1028,10 @@ version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"parking",
"pin-project-lite",
]
@@ -2753,7 +2762,7 @@ dependencies = [
"delegate",
"either",
"extend",
"futures",
"futures-lite",
"futures-timer",
"keccak-const",
"libp2p",

View File

@@ -29,14 +29,13 @@ util = { path = "rust/util" }
# Macro dependecies
extend = "1.2"
delegate = "0.13"
pin-project = "1"
# Utility dependencies
keccak-const = "0.2"
# Async dependencies
tokio = "1.46"
futures = "0.3"
futures-lite = "2.6.1"
futures-timer = "3.0"
# Data structures

View File

@@ -1077,7 +1077,7 @@
return {
isDownloading: false,
isFailed: statusInfo.statusText === "FAILED",
errorMessage: null,
errorMessage: statusInfo.errorMessage,
progress: null,
statusText: statusInfo.statusText,
perNode: [],
@@ -1135,10 +1135,15 @@
function deriveInstanceStatus(instanceWrapped: unknown): {
statusText: string;
statusClass: string;
errorMessage: string | null;
} {
const [, instance] = getTagged(instanceWrapped);
if (!instance || typeof instance !== "object") {
return { statusText: "PREPARING", statusClass: "inactive" };
return {
statusText: "PREPARING",
statusClass: "inactive",
errorMessage: null,
};
}
const inst = instance as {
@@ -1146,50 +1151,106 @@
};
const runnerIds = Object.keys(inst.shardAssignments?.runnerToShard || {});
const statusMap: Record<string, string> = {
RunnerWaitingForInitialization: "WaitingForInitialization",
RunnerInitializingBackend: "InitializingBackend",
RunnerWaitingForModel: "WaitingForModel",
RunnerLoading: "Loading",
RunnerLoaded: "Loaded",
RunnerWarmingUp: "WarmingUp",
RunnerReady: "Ready",
RunnerRunning: "Running",
RunnerShutdown: "Shutdown",
RunnerFailed: "Failed",
};
const statuses = runnerIds
.map((rid) => {
const r = runnersData[rid];
if (!r) return null;
const [kind] = getTagged(r);
const statusMap: Record<string, string> = {
RunnerWaitingForInitialization: "WaitingForInitialization",
RunnerInitializingBackend: "InitializingBackend",
RunnerWaitingForModel: "WaitingForModel",
RunnerLoading: "Loading",
RunnerLoaded: "Loaded",
RunnerWarmingUp: "WarmingUp",
RunnerReady: "Ready",
RunnerRunning: "Running",
RunnerShutdown: "Shutdown",
RunnerFailed: "Failed",
};
return kind ? statusMap[kind] || null : null;
const [kind, payload] = getTagged(r);
if (!kind || !statusMap[kind]) return null;
const errorMessage =
kind === "RunnerFailed" && payload && typeof payload === "object"
? (((payload as Record<string, unknown>).errorMessage as string) ??
null)
: null;
return { status: statusMap[kind], errorMessage };
})
.filter((s): s is string => s !== null);
.filter(
(s): s is { status: string; errorMessage: string | null } => s !== null,
);
const has = (s: string) => statuses.includes(s);
const has = (s: string) => statuses.some((e) => e.status === s);
if (statuses.length === 0)
return { statusText: "PREPARING", statusClass: "inactive" };
if (has("Failed")) return { statusText: "FAILED", statusClass: "failed" };
return {
statusText: "PREPARING",
statusClass: "inactive",
errorMessage: null,
};
if (has("Failed")) {
const failedEntry = statuses.find(
(e) => e.status === "Failed" && e.errorMessage,
);
return {
statusText: "FAILED",
statusClass: "failed",
errorMessage: failedEntry?.errorMessage ?? null,
};
}
if (has("Shutdown"))
return { statusText: "SHUTDOWN", statusClass: "inactive" };
return {
statusText: "SHUTDOWN",
statusClass: "inactive",
errorMessage: null,
};
if (has("Loading"))
return { statusText: "LOADING", statusClass: "starting" };
return {
statusText: "LOADING",
statusClass: "starting",
errorMessage: null,
};
if (has("WarmingUp"))
return { statusText: "WARMING UP", statusClass: "starting" };
return {
statusText: "WARMING UP",
statusClass: "starting",
errorMessage: null,
};
if (has("Running"))
return { statusText: "RUNNING", statusClass: "running" };
if (has("Ready")) return { statusText: "READY", statusClass: "loaded" };
if (has("Loaded")) return { statusText: "LOADED", statusClass: "loaded" };
return {
statusText: "RUNNING",
statusClass: "running",
errorMessage: null,
};
if (has("Ready"))
return { statusText: "READY", statusClass: "loaded", errorMessage: null };
if (has("Loaded"))
return {
statusText: "LOADED",
statusClass: "loaded",
errorMessage: null,
};
if (has("WaitingForModel"))
return { statusText: "WAITING", statusClass: "starting" };
return {
statusText: "WAITING",
statusClass: "starting",
errorMessage: null,
};
if (has("InitializingBackend"))
return { statusText: "INITIALIZING", statusClass: "starting" };
return {
statusText: "INITIALIZING",
statusClass: "starting",
errorMessage: null,
};
if (has("WaitingForInitialization"))
return { statusText: "INITIALIZING", statusClass: "starting" };
return {
statusText: "INITIALIZING",
statusClass: "starting",
errorMessage: null,
};
return { statusText: "RUNNING", statusClass: "active" };
return { statusText: "RUNNING", statusClass: "active", errorMessage: null };
}
function getBytes(value: unknown): number {

View File

@@ -30,7 +30,7 @@
modelDirectory?: string;
}
| { kind: "pending"; modelDirectory?: string }
| { kind: "failed"; modelDirectory?: string }
| { kind: "failed"; modelDirectory?: string; errorMessage?: string }
| { kind: "not_present" };
type ModelCardInfo = {
@@ -263,7 +263,10 @@
modelDirectory,
};
} else if (tag === "DownloadFailed") {
cell = { kind: "failed", modelDirectory };
const errorMsg =
((payload.errorMessage ?? payload.error_message) as string) ||
undefined;
cell = { kind: "failed", modelDirectory, errorMessage: errorMsg };
} else {
cell = { kind: "pending", modelDirectory };
}
@@ -499,7 +502,7 @@
{:else if cell.kind === "failed"}
<div
class="flex flex-col items-center gap-0.5"
title="Download failed"
title={cell.errorMessage ?? "Download failed"}
>
<svg
class="w-5 h-5 text-red-400"
@@ -512,6 +515,14 @@
clip-rule="evenodd"
></path>
</svg>
{#if cell.errorMessage}
<span
class="text-[9px] text-red-400/70 max-w-[120px] truncate"
title={cell.errorMessage}
>
{cell.errorMessage}
</span>
{/if}
{#if row.shardMetadata}
<button
type="button"
@@ -707,6 +718,11 @@
({clampPercent(cellStatus.percentage).toFixed(0)}%)
{/if}
</span>
{#if cellStatus.kind === "failed" && "errorMessage" in cellStatus && cellStatus.errorMessage}
<span class="text-[9px] text-red-400/70 break-all pl-1">
{cellStatus.errorMessage}
</span>
{/if}
{#if "modelDirectory" in cellStatus && cellStatus.modelDirectory}
<span
class="text-[9px] text-white/30 break-all pl-1"

View File

@@ -74,7 +74,6 @@
perSystem =
{ config, self', inputs', pkgs, lib, system, ... }:
let
fenixToolchain = inputs'.fenix.packages.complete;
# Use pinned nixpkgs for swift-format (swift is broken on x86_64-linux in newer nixpkgs)
pkgsSwift = import inputs.nixpkgs-swift { inherit system; };
in

View File

@@ -1,2 +0,0 @@
# we can manually exclude false-positive lint errors for dual packages (if in dependencies)
#allowed-duplicate-crates = ["hashbrown"]

View File

@@ -27,7 +27,7 @@ networking = { workspace = true }
# interop
pyo3 = { version = "0.27.2", features = [
# "abi3-py313", # tells pyo3 (and maturin) to build using the stable ABI with minimum Python version 3.13
"nightly", # enables better-supported GIL integration
# "nightly", # enables better-supported GIL integration
"experimental-async", # async support in #[pyfunction] & #[pymethods]
#"experimental-inspect", # inspection of generated binary => easier to automate type-hint generation
#"py-clone", # adding Clone-ing of `Py<T>` without GIL (may cause panics - remove if panics happen)
@@ -45,11 +45,10 @@ pyo3-log = "0.13.2"
# macro dependencies
extend = { workspace = true }
delegate = { workspace = true }
pin-project = { workspace = true }
# async runtime
tokio = { workspace = true, features = ["full", "tracing"] }
futures = { workspace = true }
futures-lite = { workspace = true }
# utility dependencies
util = { workspace = true }
@@ -60,3 +59,4 @@ env_logger = "0.11"
# Networking
libp2p = { workspace = true, features = ["full"] }
pin-project = "1.1.10"

View File

@@ -2,7 +2,6 @@
//!
use pin_project::pin_project;
use pyo3::marker::Ungil;
use pyo3::prelude::*;
use std::{
future::Future,
@@ -26,8 +25,8 @@ where
impl<F> Future for AllowThreads<F>
where
F: Future + Ungil,
F::Output: Ungil,
F: Future + Send,
F::Output: Send,
{
type Output = F::Output;

View File

@@ -4,25 +4,12 @@
//!
//!
// enable Rust-unstable features for convenience
#![feature(trait_alias)]
#![feature(tuple_trait)]
#![feature(unboxed_closures)]
// #![feature(stmt_expr_attributes)]
// #![feature(assert_matches)]
// #![feature(async_fn_in_dyn_trait)]
// #![feature(async_for_loop)]
// #![feature(auto_traits)]
// #![feature(negative_impls)]
extern crate core;
mod allow_threading;
pub(crate) mod networking;
pub(crate) mod pylibp2p;
mod ident;
mod networking;
use crate::ident::ident_submodule;
use crate::networking::networking_submodule;
use crate::pylibp2p::ident::ident_submodule;
use crate::pylibp2p::multiaddr::multiaddr_submodule;
use pyo3::prelude::PyModule;
use pyo3::{Bound, PyResult, pyclass, pymodule};
use pyo3_stub_gen::define_stub_info_gatherer;
@@ -32,14 +19,6 @@ pub(crate) mod r#const {
pub const MPSC_CHANNEL_SIZE: usize = 1024;
}
/// Namespace for all the type/trait aliases used by this crate.
pub(crate) mod alias {
use std::marker::Tuple;
pub trait SendFn<Args: Tuple + Send + 'static, Output> =
Fn<Args, Output = Output> + Send + 'static;
}
/// Namespace for crate-wide extension traits/methods
pub(crate) mod ext {
use crate::allow_threading::AllowThreads;
@@ -180,7 +159,6 @@ fn main_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
// work with maturin, where the types generate correctly, in the right folder, without
// too many importing issues...
ident_submodule(m)?;
multiaddr_submodule(m)?;
networking_submodule(m)?;
// top-level constructs

View File

@@ -8,8 +8,8 @@
use crate::r#const::MPSC_CHANNEL_SIZE;
use crate::ext::{ByteArrayExt as _, FutureExt, PyErrExt as _};
use crate::ext::{ResultExt as _, TokioMpscReceiverExt as _, TokioMpscSenderExt as _};
use crate::ident::{PyKeypair, PyPeerId};
use crate::pyclass;
use crate::pylibp2p::ident::{PyKeypair, PyPeerId};
use libp2p::futures::StreamExt as _;
use libp2p::gossipsub;
use libp2p::gossipsub::{IdentTopic, Message, MessageId, PublishError};

View File

@@ -1,8 +0,0 @@
//! A module for exposing Rust's libp2p datatypes over Pyo3
//!
//! TODO: right now we are coupled to libp2p's identity, but eventually we want to create our own
//! independent identity type of some kind or another. This may require handshaking.
//!
pub mod ident;
pub mod multiaddr;

View File

@@ -1,81 +0,0 @@
use crate::ext::ResultExt as _;
use libp2p::Multiaddr;
use pyo3::prelude::{PyBytesMethods as _, PyModule, PyModuleMethods as _};
use pyo3::types::PyBytes;
use pyo3::{Bound, PyResult, Python, pyclass, pymethods};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use std::str::FromStr as _;
/// Representation of a Multiaddr.
#[gen_stub_pyclass]
#[pyclass(name = "Multiaddr", frozen)]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct PyMultiaddr(pub Multiaddr);
#[gen_stub_pymethods]
#[pymethods]
#[allow(clippy::needless_pass_by_value)]
impl PyMultiaddr {
/// Create a new, empty multiaddress.
#[staticmethod]
fn empty() -> Self {
Self(Multiaddr::empty())
}
/// Create a new, empty multiaddress with the given capacity.
#[staticmethod]
fn with_capacity(n: usize) -> Self {
Self(Multiaddr::with_capacity(n))
}
/// Parse a `Multiaddr` value from its byte slice representation.
#[staticmethod]
fn from_bytes(bytes: Bound<'_, PyBytes>) -> PyResult<Self> {
let bytes = Vec::from(bytes.as_bytes());
Ok(Self(Multiaddr::try_from(bytes).pyerr()?))
}
/// Parse a `Multiaddr` value from its string representation.
#[staticmethod]
fn from_string(string: String) -> PyResult<Self> {
Ok(Self(Multiaddr::from_str(&string).pyerr()?))
}
/// Return the length in bytes of this multiaddress.
fn len(&self) -> usize {
self.0.len()
}
/// Returns true if the length of this multiaddress is 0.
fn is_empty(&self) -> bool {
self.0.is_empty()
}
/// Return a copy of this [`Multiaddr`]'s byte representation.
fn to_bytes<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
let bytes = self.0.to_vec();
PyBytes::new(py, &bytes)
}
/// Convert a Multiaddr to a string.
fn to_string(&self) -> String {
self.0.to_string()
}
#[gen_stub(skip)]
fn __repr__(&self) -> String {
format!("Multiaddr({})", self.0)
}
#[gen_stub(skip)]
fn __str__(&self) -> String {
self.to_string()
}
}
pub fn multiaddr_submodule(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyMultiaddr>()?;
Ok(())
}

View File

@@ -22,7 +22,7 @@ delegate = { workspace = true }
# async
tokio = { workspace = true, features = ["full"] }
futures = { workspace = true }
futures-lite = { workspace = true }
futures-timer = { workspace = true }
# utility dependencies

View File

@@ -1,4 +1,4 @@
use futures::stream::StreamExt as _;
use futures_lite::StreamExt;
use libp2p::{gossipsub, identity, swarm::SwarmEvent};
use networking::{discovery, swarm};
use tokio::{io, io::AsyncBufReadExt as _, select};
@@ -38,19 +38,19 @@ async fn main() {
println!("Publish error: {e:?}");
}
}
event = swarm.select_next_some() => match event {
event = swarm.next() => match event {
// on gossipsub incoming
SwarmEvent::Behaviour(swarm::BehaviourEvent::Gossipsub(gossipsub::Event::Message {
Some(SwarmEvent::Behaviour(swarm::BehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,
})) => println!(
}))) => println!(
"\n\nGot message: '{}' with id: {id} from peer: {peer_id}\n\n",
String::from_utf8_lossy(&message.data),
),
// on discovery
SwarmEvent::Behaviour(swarm::BehaviourEvent::Discovery(e)) => match e {
Some(SwarmEvent::Behaviour(swarm::BehaviourEvent::Discovery(e)) )=> match e {
discovery::Event::ConnectionEstablished {
peer_id, connection_id, remote_ip, remote_tcp_port
} => {
@@ -64,7 +64,7 @@ async fn main() {
}
// ignore outgoing errors: those are normal
e@SwarmEvent::OutgoingConnectionError { .. } => { log::debug!("Outgoing connection error: {e:?}"); }
e@Some(SwarmEvent::OutgoingConnectionError { .. }) => { log::debug!("Outgoing connection error: {e:?}"); }
// otherwise log any other event
e => { log::info!("Other event {e:?}"); }

View File

@@ -1,127 +0,0 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::stream::StreamExt;
use libp2p::{
gossipsub, mdns, noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux,
};
use std::error::Error;
use std::time::Duration;
use tokio::{io, io::AsyncBufReadExt, select};
use tracing_subscriber::EnvFilter;
// We create a custom network behaviour that combines Gossipsub and Mdns.
#[derive(NetworkBehaviour)]
struct MyBehaviour {
gossipsub: gossipsub::Behaviour,
mdns: mdns::tokio::Behaviour,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)?
.with_behaviour(|key| {
// Set a custom gossipsub configuration
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10))
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.build()
.map_err(io::Error::other)?; // Temporary hack because `build` does not return a proper `std::error::Error`.
// build a gossipsub network behaviour
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(key.clone()),
gossipsub_config,
)?;
let mdns =
mdns::tokio::Behaviour::new(mdns::Config::default(), key.public().to_peer_id())?;
Ok(MyBehaviour { gossipsub, mdns })
})?
.build();
println!("Running swarm with identity {}", swarm.local_peer_id());
// Create a Gossipsub topic
let topic = gossipsub::IdentTopic::new("test-net");
// subscribes to our topic
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
println!("Enter messages via STDIN and they will be sent to connected peers using Gossipsub");
// Kick it off
loop {
select! {
Ok(Some(line)) = stdin.next_line() => {
if let Err(e) = swarm
.behaviour_mut().gossipsub
.publish(topic.clone(), line.as_bytes()) {
println!("Publish error: {e:?}");
}
}
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, multiaddr) in list {
println!("mDNS discovered a new peer: {peer_id} on {multiaddr}");
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
for (peer_id, multiaddr) in list {
println!("mDNS discover peer has expired: {peer_id} on {multiaddr}");
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
}
},
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,
})) => println!(
"Got message: '{}' with id: {id} from peer: {peer_id}",
String::from_utf8_lossy(&message.data),
),
SwarmEvent::NewListenAddr { address, .. } => {
println!("Local node is listening on {address}");
}
e => {
println!("Other swarm event: {:?}", e);
}
}
}
}
}

View File

@@ -1,7 +1,7 @@
use crate::ext::MultiaddrExt;
use delegate::delegate;
use either::Either;
use futures::FutureExt;
use futures_lite::FutureExt;
use futures_timer::Delay;
use libp2p::core::transport::PortUse;
use libp2p::core::{ConnectedPoint, Endpoint};
@@ -362,7 +362,7 @@ impl NetworkBehaviour for Behaviour {
}
// retry connecting to all mDNS peers periodically (fails safely if already connected)
if self.retry_delay.poll_unpin(cx).is_ready() {
if self.retry_delay.poll(cx).is_ready() {
for (p, mas) in self.mdns_discovered.clone() {
for ma in mas {
self.dial(p, ma)

View File

@@ -31,7 +31,7 @@ pub fn create_swarm(keypair: identity::Keypair) -> alias::AnyResult<Swarm> {
mod transport {
use crate::alias;
use crate::swarm::{NETWORK_VERSION, OVERRIDE_VERSION_ENV_VAR};
use futures::{AsyncRead, AsyncWrite};
use futures_lite::{AsyncRead, AsyncWrite};
use keccak_const::Sha3_256;
use libp2p::core::muxing;
use libp2p::core::transport::Boxed;

View File

@@ -1,11 +1,10 @@
{ inputs, ... }:
{
perSystem =
{ config, self', inputs', pkgs, lib, ... }:
{ inputs', pkgs, lib, ... }:
let
# Fenix nightly toolchain with all components
fenixPkgs = inputs'.fenix.packages;
rustToolchain = fenixPkgs.complete.withComponents [
rustToolchain = inputs'.fenix.packages.stable.withComponents [
"cargo"
"rustc"
"clippy"

View File

@@ -1,2 +0,0 @@
[toolchain]
channel = "nightly"

View File

@@ -14,10 +14,12 @@ from exo.shared.models.model_cards import ModelCard, ModelId, ModelTask
from exo.shared.topology import Topology
from exo.shared.types.commands import PlaceInstance
from exo.shared.types.common import CommandId, NodeId
from exo.shared.types.events import InstanceCreated, InstanceDeleted
from exo.shared.types.events import InstanceCreated, InstanceDeleted, TaskStatusUpdated
from exo.shared.types.memory import Memory
from exo.shared.types.multiaddr import Multiaddr
from exo.shared.types.profiling import NetworkInterfaceInfo, NodeNetworkInfo
from exo.shared.types.tasks import TaskId, TaskStatus, TextGeneration
from exo.shared.types.text_generation import InputMessage, TextGenerationTaskParams
from exo.shared.types.topology import Connection, SocketConnection
from exo.shared.types.worker.instances import (
Instance,
@@ -456,3 +458,117 @@ def test_tensor_rdma_backend_connectivity_matrix(
else:
ip_part = coordinator.split(":")[0]
assert len(ip_part.split(".")) == 4
def _make_task(
instance_id: InstanceId,
status: TaskStatus = TaskStatus.Running,
) -> TextGeneration:
return TextGeneration(
task_id=TaskId(),
task_status=status,
instance_id=instance_id,
command_id=CommandId(),
task_params=TextGenerationTaskParams(
model=ModelId("test-model"),
input=[InputMessage(role="user", content="hello")],
),
)
def test_get_transition_events_delete_instance_cancels_running_tasks(
instance: Instance,
):
# arrange
instance_id = InstanceId()
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
target_instances: dict[InstanceId, Instance] = {}
task = _make_task(instance_id, TaskStatus.Running)
tasks = {task.task_id: task}
# act
events = get_transition_events(current_instances, target_instances, tasks)
# assert cancellation event should come before the deletion event
assert len(events) == 2
assert isinstance(events[0], TaskStatusUpdated)
assert events[0].task_id == task.task_id
assert events[0].task_status == TaskStatus.Cancelled
assert isinstance(events[1], InstanceDeleted)
assert events[1].instance_id == instance_id
def test_get_transition_events_delete_instance_cancels_pending_tasks(
instance: Instance,
):
# arrange
instance_id = InstanceId()
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
target_instances: dict[InstanceId, Instance] = {}
task = _make_task(instance_id, TaskStatus.Pending)
tasks = {task.task_id: task}
# act
events = get_transition_events(current_instances, target_instances, tasks)
# assert
assert len(events) == 2
assert isinstance(events[0], TaskStatusUpdated)
assert events[0].task_id == task.task_id
assert events[0].task_status == TaskStatus.Cancelled
assert isinstance(events[1], InstanceDeleted)
def test_get_transition_events_delete_instance_ignores_completed_tasks(
instance: Instance,
):
# arrange
instance_id = InstanceId()
current_instances: dict[InstanceId, Instance] = {instance_id: instance}
target_instances: dict[InstanceId, Instance] = {}
tasks = {
t.task_id: t
for t in [
_make_task(instance_id, TaskStatus.Complete),
_make_task(instance_id, TaskStatus.Failed),
_make_task(instance_id, TaskStatus.TimedOut),
_make_task(instance_id, TaskStatus.Cancelled),
]
}
# act
events = get_transition_events(current_instances, target_instances, tasks)
# assert only the InstanceDeleted event, no cancellations
assert len(events) == 1
assert isinstance(events[0], InstanceDeleted)
def test_get_transition_events_delete_instance_cancels_only_matching_tasks(
instance: Instance,
):
# arrange
instance_id_a = InstanceId()
instance_id_b = InstanceId()
current_instances: dict[InstanceId, Instance] = {
instance_id_a: instance,
instance_id_b: instance,
}
# only delete instance A, keep instance B
target_instances: dict[InstanceId, Instance] = {instance_id_b: instance}
task_a = _make_task(instance_id_a, TaskStatus.Running)
task_b = _make_task(instance_id_b, TaskStatus.Running)
tasks = {task_a.task_id: task_a, task_b.task_id: task_b}
# act
events = get_transition_events(current_instances, target_instances, tasks)
# assert only task_a should be cancelled
cancel_events = [e for e in events if isinstance(e, TaskStatusUpdated)]
delete_events = [e for e in events if isinstance(e, InstanceDeleted)]
assert len(cancel_events) == 1
assert cancel_events[0].task_id == task_a.task_id
assert cancel_events[0].task_status == TaskStatus.Cancelled
assert len(delete_events) == 1
assert delete_events[0].instance_id == instance_id_a

View File

@@ -241,6 +241,11 @@ class Worker:
cancelled_task_id=cancelled_task_id, runner_id=runner_id
):
await self.runners[runner_id].cancel_task(cancelled_task_id)
await self.event_sender.send(
TaskStatusUpdated(
task_id=task.task_id, task_status=TaskStatus.Complete
)
)
case ImageEdits() if task.task_params.total_input_chunks > 0:
# Assemble image from chunks and inject into task
cmd_id = task.command_id