mirror of
https://github.com/nicolargo/glances.git
synced 2026-06-02 19:05:00 -04:00
359 lines
11 KiB
Python
Executable File
359 lines
11 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# Glances - An eye on your system
|
|
#
|
|
# SPDX-FileCopyrightText: 2026 Nicolas Hennion <nicolas@nicolargo.com>
|
|
#
|
|
# SPDX-License-Identifier: LGPL-3.0-only
|
|
#
|
|
|
|
"""Glances v5 — unit + smoke tests for AsyncScheduler.
|
|
|
|
Test stack: pytest + pytest-asyncio (auto mode). See architecture decisions §9.
|
|
|
|
Coverage:
|
|
- refresh_time precedence (explicit > plugin section > global > default)
|
|
- run_forever calls plugin.update() at least once per plugin
|
|
- stop() cancels loops cleanly
|
|
- A plugin raising in its loop does NOT kill the others
|
|
- register() rejects duplicate plugin and rejects calls during run
|
|
- Smoke: 2 plugins → both end up in the StatsStore
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from glances.config_v5 import GlancesConfigV5
|
|
from glances.plugins.plugin.base_v5 import GlancesPluginBase
|
|
from glances.scheduler_v5 import _DEFAULT_REFRESH_TIME, AsyncScheduler
|
|
from glances.stats_store_v5 import StatsStoreV5
|
|
|
|
# ---------------------------------------------------------- fake plugins
|
|
|
|
|
|
class FastPlugin(GlancesPluginBase[dict]):
|
|
plugin_name = "fast"
|
|
IS_COLLECTION = False
|
|
fields_description = {"value": {"description": "v", "unit": "number"}}
|
|
|
|
def __init__(self, store, config):
|
|
super().__init__(store, config)
|
|
self.calls = 0
|
|
|
|
async def _grab_stats(self) -> dict:
|
|
self.calls += 1
|
|
return {"value": self.calls}
|
|
|
|
|
|
class SlowPlugin(GlancesPluginBase[dict]):
|
|
plugin_name = "slow"
|
|
IS_COLLECTION = False
|
|
fields_description = {"value": {"description": "v", "unit": "number"}}
|
|
|
|
def __init__(self, store, config):
|
|
super().__init__(store, config)
|
|
self.calls = 0
|
|
|
|
async def _grab_stats(self) -> dict:
|
|
self.calls += 1
|
|
return {"value": self.calls * 10}
|
|
|
|
|
|
# A plugin whose update() bypasses the base-class swallow and re-raises,
|
|
# to exercise the scheduler's defensive try/except.
|
|
class RaisingPlugin(GlancesPluginBase[dict]):
|
|
plugin_name = "raising"
|
|
IS_COLLECTION = False
|
|
fields_description = {"value": {"description": "v", "unit": "number"}}
|
|
|
|
def __init__(self, store, config):
|
|
super().__init__(store, config)
|
|
self.calls = 0
|
|
|
|
async def _grab_stats(self) -> dict:
|
|
return {"value": 0}
|
|
|
|
async def update(self) -> None: # type: ignore[override]
|
|
self.calls += 1
|
|
raise RuntimeError("boom from update override")
|
|
|
|
|
|
# ---------------------------------------------------------- fixtures
|
|
|
|
|
|
@pytest.fixture
|
|
def store() -> StatsStoreV5:
|
|
return StatsStoreV5()
|
|
|
|
|
|
@pytest.fixture
|
|
def config(tmp_path: Path, monkeypatch) -> GlancesConfigV5:
|
|
monkeypatch.setattr(GlancesConfigV5, "SYSTEM_CONFIG_PATH", tmp_path / "etc" / "glances.conf")
|
|
monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "xdg"))
|
|
return GlancesConfigV5()
|
|
|
|
|
|
def _config_with_ini(tmp_path: Path, monkeypatch, ini_body: str) -> GlancesConfigV5:
|
|
"""Helper: build a config with the given INI content as the user file."""
|
|
user_dir = tmp_path / "xdg" / "glances"
|
|
user_dir.mkdir(parents=True)
|
|
(user_dir / "glances.conf").write_text(ini_body)
|
|
monkeypatch.setattr(GlancesConfigV5, "SYSTEM_CONFIG_PATH", tmp_path / "etc" / "glances.conf")
|
|
monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "xdg"))
|
|
return GlancesConfigV5()
|
|
|
|
|
|
# ---------------------------------------------------------- refresh_time
|
|
|
|
|
|
def test_register_uses_explicit_refresh_time(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
|
|
scheduler.register(plugin, refresh_time=7.5)
|
|
|
|
assert scheduler._entries[0].refresh_time == 7.5
|
|
|
|
|
|
def test_register_uses_plugin_section_refresh_time(store, tmp_path, monkeypatch):
|
|
config = _config_with_ini(
|
|
tmp_path,
|
|
monkeypatch,
|
|
"[fast]\nrefresh_time = 4.5\n",
|
|
)
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
|
|
scheduler.register(plugin)
|
|
|
|
assert scheduler._entries[0].refresh_time == 4.5
|
|
|
|
|
|
def test_register_falls_back_to_global(store, tmp_path, monkeypatch):
|
|
config = _config_with_ini(
|
|
tmp_path,
|
|
monkeypatch,
|
|
"[global]\nrefresh_time = 6.0\n",
|
|
)
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
|
|
scheduler.register(plugin)
|
|
|
|
assert scheduler._entries[0].refresh_time == 6.0
|
|
|
|
|
|
def test_register_falls_back_to_default(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
|
|
scheduler.register(plugin)
|
|
|
|
assert scheduler._entries[0].refresh_time == _DEFAULT_REFRESH_TIME
|
|
|
|
|
|
def test_register_rejects_non_positive_refresh_time(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
|
|
with pytest.raises(ValueError, match="must be > 0"):
|
|
scheduler.register(plugin, refresh_time=0)
|
|
|
|
|
|
# ---------------------------------------------------------- registration guards
|
|
|
|
|
|
def test_register_same_plugin_twice_raises(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
scheduler.register(plugin, refresh_time=1.0)
|
|
|
|
with pytest.raises(ValueError, match="already registered"):
|
|
scheduler.register(plugin, refresh_time=1.0)
|
|
|
|
|
|
async def test_register_after_run_raises(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
p1 = FastPlugin(store, config)
|
|
p2 = SlowPlugin(store, config)
|
|
scheduler.register(p1, refresh_time=0.05)
|
|
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
await asyncio.sleep(0.01) # let the loop start
|
|
|
|
try:
|
|
with pytest.raises(RuntimeError, match="while the scheduler is running"):
|
|
scheduler.register(p2, refresh_time=0.05)
|
|
finally:
|
|
await scheduler.stop()
|
|
await run_task
|
|
|
|
|
|
async def test_run_forever_with_no_plugins_raises(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
with pytest.raises(RuntimeError, match="no registered plugins"):
|
|
await scheduler.run_forever()
|
|
|
|
|
|
async def test_run_forever_when_already_running_raises(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
scheduler.register(plugin, refresh_time=0.05)
|
|
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
await asyncio.sleep(0.01)
|
|
|
|
try:
|
|
with pytest.raises(RuntimeError, match="already running"):
|
|
await scheduler.run_forever()
|
|
finally:
|
|
await scheduler.stop()
|
|
await run_task
|
|
|
|
|
|
# ---------------------------------------------------------- run / stop
|
|
|
|
|
|
async def test_run_forever_calls_plugin_update(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
scheduler.register(plugin, refresh_time=0.01)
|
|
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
await asyncio.sleep(0.05)
|
|
await scheduler.stop()
|
|
await run_task
|
|
|
|
assert plugin.calls >= 1
|
|
|
|
|
|
async def test_stop_cancels_loops_cleanly(store, config):
|
|
scheduler = AsyncScheduler(store, config)
|
|
plugin = FastPlugin(store, config)
|
|
scheduler.register(plugin, refresh_time=0.01)
|
|
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
await asyncio.sleep(0.03)
|
|
await scheduler.stop()
|
|
|
|
# run_forever must return cleanly (no unraised exception).
|
|
await run_task
|
|
assert scheduler._running is False
|
|
assert scheduler._tasks == []
|
|
|
|
|
|
async def test_one_plugin_crash_does_not_kill_others(store, config, caplog):
|
|
scheduler = AsyncScheduler(store, config)
|
|
raiser = RaisingPlugin(store, config)
|
|
healthy = FastPlugin(store, config)
|
|
scheduler.register(raiser, refresh_time=0.01)
|
|
scheduler.register(healthy, refresh_time=0.01)
|
|
|
|
with caplog.at_level(logging.WARNING):
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
await asyncio.sleep(0.05)
|
|
await scheduler.stop()
|
|
await run_task
|
|
|
|
assert raiser.calls >= 1
|
|
assert healthy.calls >= 1
|
|
assert "Scheduler caught exception from raising" in caplog.text
|
|
|
|
|
|
# ---------------------------------------------------------- smoke
|
|
|
|
|
|
async def test_smoke_two_plugins_write_to_store(store, config):
|
|
"""End-to-end: register 2 plugins, run, assert both reach the store."""
|
|
scheduler = AsyncScheduler(store, config)
|
|
fast = FastPlugin(store, config)
|
|
slow = SlowPlugin(store, config)
|
|
scheduler.register(fast, refresh_time=0.01)
|
|
scheduler.register(slow, refresh_time=0.01)
|
|
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
await asyncio.sleep(0.05)
|
|
await scheduler.stop()
|
|
await run_task
|
|
|
|
fast_payload = store.get("fast")
|
|
slow_payload = store.get("slow")
|
|
assert fast_payload is not None
|
|
assert slow_payload is not None
|
|
assert fast_payload["value"] >= 1
|
|
assert slow_payload["value"] >= 10
|
|
assert "time_since_update" in fast_payload
|
|
assert "time_since_update" in slow_payload
|
|
|
|
|
|
# ---------------------------------------------------------- alerts integration
|
|
|
|
|
|
class _RecordingAlerts:
|
|
"""Minimal alerts stand-in — records every ingest call."""
|
|
|
|
def __init__(self) -> None:
|
|
self.ingested: list[str] = []
|
|
|
|
async def ingest_plugin(self, plugin) -> None:
|
|
self.ingested.append(plugin.plugin_name)
|
|
|
|
|
|
class _BoomAlerts:
|
|
"""Alerts stand-in whose ingest always raises (resilience test)."""
|
|
|
|
async def ingest_plugin(self, plugin) -> None:
|
|
raise RuntimeError("alerts blew up")
|
|
|
|
|
|
async def test_alerts_ingest_is_called_after_each_plugin_update(store, config):
|
|
alerts = _RecordingAlerts()
|
|
scheduler = AsyncScheduler(store, config, alerts=alerts) # type: ignore[arg-type]
|
|
fast = FastPlugin(store, config)
|
|
scheduler.register(fast, refresh_time=0.01)
|
|
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
await asyncio.sleep(0.05)
|
|
await scheduler.stop()
|
|
await run_task
|
|
|
|
# At least one ingest per cycle. Multiple cycles run during the 50 ms window.
|
|
assert alerts.ingested.count("fast") >= 1
|
|
|
|
|
|
async def test_alerts_ingest_exception_does_not_crash_scheduler(store, config, caplog):
|
|
"""A failing alerts.ingest_plugin must never tear down the plugin loop."""
|
|
scheduler = AsyncScheduler(store, config, alerts=_BoomAlerts()) # type: ignore[arg-type]
|
|
fast = FastPlugin(store, config)
|
|
scheduler.register(fast, refresh_time=0.01)
|
|
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
with caplog.at_level(logging.WARNING):
|
|
await asyncio.sleep(0.05)
|
|
await scheduler.stop()
|
|
await run_task
|
|
|
|
# Plugin still produced stats despite alerts raising every cycle.
|
|
assert store.get("fast") is not None
|
|
assert "Alerts ingest failed" in caplog.text
|
|
|
|
|
|
async def test_scheduler_without_alerts_does_not_call_anything(store, config):
|
|
"""Back-compat: omitting `alerts=` behaves exactly like Phase 0.6."""
|
|
scheduler = AsyncScheduler(store, config)
|
|
assert scheduler.alerts is None
|
|
fast = FastPlugin(store, config)
|
|
scheduler.register(fast, refresh_time=0.01)
|
|
|
|
run_task = asyncio.create_task(scheduler.run_forever())
|
|
await asyncio.sleep(0.03)
|
|
await scheduler.stop()
|
|
await run_task
|
|
|
|
assert store.get("fast") is not None
|