mirror of
https://github.com/jokob-sk/NetAlertX.git
synced 2026-02-07 05:21:53 -05:00
Merge pull request #1480 from adamoutler/agentic-workflows
MCP Bridge Fixes & Dev Environment Automation
This commit is contained in:
@@ -31,4 +31,17 @@ cat "${DEVCONTAINER_DIR}/resources/devcontainer-Dockerfile"
|
||||
|
||||
echo "Generated $OUT_FILE using root dir $ROOT_DIR"
|
||||
|
||||
# Passive Gemini MCP config
|
||||
TOKEN=$(grep '^API_TOKEN=' /data/config/app.conf 2>/dev/null | cut -d"'" -f2)
|
||||
if [ -n "${TOKEN}" ]; then
|
||||
mkdir -p "${ROOT_DIR}/.gemini"
|
||||
[ -f "${ROOT_DIR}/.gemini/settings.json" ] || echo "{}" > "${ROOT_DIR}/.gemini/settings.json"
|
||||
jq --arg t "$TOKEN" '.mcpServers["netalertx-devcontainer"] = {url: "http://127.0.0.1:20212/mcp/sse", headers: {Authorization: ("Bearer " + $t)}}' "${ROOT_DIR}/.gemini/settings.json" > "${ROOT_DIR}/.gemini/settings.json.tmp" && mv "${ROOT_DIR}/.gemini/settings.json.tmp" "${ROOT_DIR}/.gemini/settings.json"
|
||||
|
||||
# VS Code MCP config
|
||||
mkdir -p "${ROOT_DIR}/.vscode"
|
||||
[ -f "${ROOT_DIR}/.vscode/mcp.json" ] || echo "{}" > "${ROOT_DIR}/.vscode/mcp.json"
|
||||
jq --arg t "$TOKEN" '.servers["netalertx-devcontainer"] = {type: "sse", url: "http://127.0.0.1:20212/mcp/sse", headers: {Authorization: ("Bearer " + $t)}}' "${ROOT_DIR}/.vscode/mcp.json" > "${ROOT_DIR}/.vscode/mcp.json.tmp" && mv "${ROOT_DIR}/.vscode/mcp.json.tmp" "${ROOT_DIR}/.vscode/mcp.json"
|
||||
fi
|
||||
|
||||
echo "Done."
|
||||
@@ -1,6 +1,6 @@
|
||||
---
|
||||
name: testing-workflow
|
||||
description: Read before running tests. Detailed instructions for single, astandard unit tests (fast), full suites (slow), and handling authentication. Tests must be run when a job is complete.
|
||||
description: Read before running tests. Detailed instructions for single, standard unit tests (fast), full suites (slow), handling authentication, and obtaining the API Token. Tests must be run when a job is complete.
|
||||
---
|
||||
|
||||
# Testing Workflow
|
||||
@@ -8,6 +8,19 @@ After code is developed, tests must be run to ensure the integrity of the final
|
||||
|
||||
**Crucial:** Tests MUST be run inside the container to access the correct runtime environment (DB, Config, Dependencies).
|
||||
|
||||
## 0. Pre-requisites: Environment Check
|
||||
|
||||
Before running any tests, verify you are inside the development container:
|
||||
|
||||
```bash
|
||||
ls -d /workspaces/NetAlertX
|
||||
```
|
||||
|
||||
**IF** this directory does not exist, you are likely on the host machine. You **MUST** immediately activate the `devcontainer-management` skill to enter the container or run commands inside it.
|
||||
|
||||
```text
|
||||
activate_skill("devcontainer-management")
|
||||
```
|
||||
|
||||
## 1. Full Test Suite (MANDATORY DEFAULT)
|
||||
|
||||
@@ -38,13 +51,24 @@ cd /workspaces/NetAlertX; pytest test/<path_to_test>
|
||||
cd /workspaces/NetAlertX; pytest test/api_endpoints/test_mcp_extended_endpoints.py
|
||||
```
|
||||
|
||||
## Authentication in Tests
|
||||
## Authentication & Environment Reset
|
||||
|
||||
The test environment uses `API_TOKEN`. The most reliable way to retrieve the current token from a running container is:
|
||||
Authentication tokens are required to perform certain operations such as manual testing or crafting expressions to work with the web APIs. After making code changes, you MUST reset the environment to ensure the new code is running and verify you have the latest `API_TOKEN`.
|
||||
|
||||
```bash
|
||||
python3 -c "from helper import get_setting_value; print(get_setting_value('API_TOKEN'))"
|
||||
```
|
||||
1. **Reset Environment:** Run the setup script inside the container.
|
||||
```bash
|
||||
bash /workspaces/NetAlertX/.devcontainer/scripts/setup.sh
|
||||
```
|
||||
2. **Wait for Stabilization:** Wait at least 5 seconds for services (nginx, python server, etc.) to start.
|
||||
```bash
|
||||
sleep 5
|
||||
```
|
||||
3. **Obtain Token:** Retrieve the current token from the container.
|
||||
```bash
|
||||
python3 -c "from helper import get_setting_value; print(get_setting_value('API_TOKEN'))"
|
||||
```
|
||||
|
||||
The retrieved token MUST be used in all subsequent API or test calls requiring authentication.
|
||||
|
||||
### Troubleshooting
|
||||
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -46,3 +46,5 @@ docker-compose.yml.ffsb42
|
||||
.env.omada.ffsb42
|
||||
.venv
|
||||
test_mounts/
|
||||
.gemini/settings.json
|
||||
.vscode/mcp.json
|
||||
|
||||
@@ -49,7 +49,7 @@ sequenceDiagram
|
||||
API-->>MCP: 5. Available tools spec
|
||||
MCP-->>AI: 6. Tool definitions
|
||||
AI->>MCP: 7. tools/call: search_devices
|
||||
MCP->>API: 8. POST /mcp/sse/devices/search
|
||||
MCP->>API: 8. POST /devices/search
|
||||
API->>DB: 9. Query devices
|
||||
DB-->>API: 10. Device data
|
||||
API-->>MCP: 11. JSON response
|
||||
@@ -72,9 +72,9 @@ graph LR
|
||||
end
|
||||
|
||||
subgraph "NetAlertX API Server (:20211)"
|
||||
F[Device APIs<br/>/mcp/sse/devices/*]
|
||||
G[Network Tools<br/>/mcp/sse/nettools/*]
|
||||
H[Events API<br/>/mcp/sse/events/*]
|
||||
F[Device APIs<br/>/devices/*]
|
||||
G[Network Tools<br/>/nettools/*]
|
||||
H[Events API<br/>/events/*]
|
||||
end
|
||||
|
||||
subgraph "Backend"
|
||||
@@ -182,27 +182,28 @@ eventSource.onmessage = function(event) {
|
||||
|
||||
| Tool | Endpoint | Description |
|
||||
|------|----------|-------------|
|
||||
| `list_devices` | `/mcp/sse/devices/by-status` | List devices by online status |
|
||||
| `get_device_info` | `/mcp/sse/device/<mac>` | Get detailed device information |
|
||||
| `search_devices` | `/mcp/sse/devices/search` | Search devices by MAC, name, or IP |
|
||||
| `get_latest_device` | `/mcp/sse/devices/latest` | Get most recently connected device |
|
||||
| `set_device_alias` | `/mcp/sse/device/<mac>/set-alias` | Set device friendly name |
|
||||
| `list_devices` | `/devices/by-status` | List devices by online status |
|
||||
| `get_device_info` | `/device/{mac}` | Get detailed device information |
|
||||
| `search_devices` | `/devices/search` | Search devices by MAC, name, or IP |
|
||||
| `get_latest_device` | `/devices/latest` | Get most recently connected device |
|
||||
| `set_device_alias` | `/device/{mac}/set-alias` | Set device friendly name |
|
||||
|
||||
### Network Tools
|
||||
|
||||
| Tool | Endpoint | Description |
|
||||
|------|----------|-------------|
|
||||
| `trigger_scan` | `/mcp/sse/nettools/trigger-scan` | Trigger network discovery scan |
|
||||
| `get_open_ports` | `/mcp/sse/device/open_ports` | Get stored NMAP open ports for device |
|
||||
| `wol_wake_device` | `/mcp/sse/nettools/wakeonlan` | Wake device using Wake-on-LAN |
|
||||
| `get_network_topology` | `/mcp/sse/devices/network/topology` | Get network topology map |
|
||||
| `trigger_scan` | `/nettools/trigger-scan` | Trigger network discovery scan to find new devices. |
|
||||
| `run_nmap_scan` | `/nettools/nmap` | Perform NMAP scan on a target to identify open ports. |
|
||||
| `get_open_ports` | `/device/open_ports` | Get stored NMAP open ports. Use `run_nmap_scan` first if empty. |
|
||||
| `wol_wake_device` | `/nettools/wakeonlan` | Wake device using Wake-on-LAN |
|
||||
| `get_network_topology` | `/devices/network/topology` | Get network topology map |
|
||||
|
||||
### Event & Monitoring Tools
|
||||
|
||||
| Tool | Endpoint | Description |
|
||||
|------|----------|-------------|
|
||||
| `get_recent_alerts` | `/mcp/sse/events/recent` | Get events from last 24 hours |
|
||||
| `get_last_events` | `/mcp/sse/events/last` | Get 10 most recent events |
|
||||
| `get_recent_alerts` | `/events/recent` | Get events from last 24 hours |
|
||||
| `get_last_events` | `/events/last` | Get 10 most recent events |
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -72,6 +72,7 @@ from .openapi.schemas import ( # noqa: E402 [flake8 lint suppression]
|
||||
DeviceUpdateRequest,
|
||||
DeviceInfo,
|
||||
BaseResponse, DeviceTotalsResponse,
|
||||
DeviceTotalsNamedResponse,
|
||||
DeleteDevicesRequest, DeviceImportRequest,
|
||||
DeviceImportResponse, UpdateDeviceColumnRequest,
|
||||
LockDeviceFieldRequest, UnlockDeviceFieldsRequest,
|
||||
@@ -289,7 +290,6 @@ def api_get_setting(setKey):
|
||||
# --------------------------
|
||||
# Device Endpoints
|
||||
# --------------------------
|
||||
@app.route('/mcp/sse/device/<mac>', methods=['GET', 'POST'])
|
||||
@app.route("/device/<mac>", methods=["GET"])
|
||||
@validate_request(
|
||||
operation_id="get_device_info",
|
||||
@@ -432,7 +432,7 @@ def api_device_copy(payload=None):
|
||||
@validate_request(
|
||||
operation_id="update_device_column",
|
||||
summary="Update Device Column",
|
||||
description="Update a specific database column for a device.",
|
||||
description="Update a specific database column for a device. Use this to mark devices as favorites (columnName='devFavorite', columnValue=1). See `get_favorite_devices` to retrieve them.",
|
||||
path_params=[{
|
||||
"name": "mac",
|
||||
"description": "Device MAC address",
|
||||
@@ -554,7 +554,6 @@ def api_device_fields_unlock(payload=None):
|
||||
# Devices Collections
|
||||
# --------------------------
|
||||
|
||||
@app.route('/mcp/sse/device/<mac>/set-alias', methods=['POST'])
|
||||
@app.route('/device/<mac>/set-alias', methods=['POST'])
|
||||
@validate_request(
|
||||
operation_id="set_device_alias",
|
||||
@@ -582,16 +581,25 @@ def api_device_set_alias(mac, payload=None):
|
||||
return jsonify(result)
|
||||
|
||||
|
||||
@app.route('/mcp/sse/device/open_ports', methods=['POST'])
|
||||
@app.route('/device/open_ports', methods=['POST'])
|
||||
@validate_request(
|
||||
operation_id="get_open_ports",
|
||||
summary="Get Open Ports",
|
||||
description="Retrieve open ports for a target IP or MAC address. Returns cached NMAP scan results.",
|
||||
description="Retrieve open ports for a target IP or MAC address. Returns cached NMAP scan results. If no ports are found, run a scan first using `run_nmap_scan`.",
|
||||
request_model=OpenPortsRequest,
|
||||
response_model=OpenPortsResponse,
|
||||
tags=["nettools"],
|
||||
auth_callable=is_authorized
|
||||
auth_callable=is_authorized,
|
||||
links={
|
||||
"RunNmapScan": {
|
||||
"operationId": "run_nmap_scan",
|
||||
"parameters": {
|
||||
"scan": "$response.body#/target",
|
||||
"mode": "fast"
|
||||
},
|
||||
"description": "Refresh the open ports data by running a new NMAP scan on this target."
|
||||
}
|
||||
}
|
||||
)
|
||||
def api_device_open_ports(payload=None):
|
||||
"""Get stored NMAP open ports for a target IP or MAC."""
|
||||
@@ -606,7 +614,7 @@ def api_device_open_ports(payload=None):
|
||||
open_ports = device_handler.getOpenPorts(target)
|
||||
|
||||
if not open_ports:
|
||||
return jsonify({"success": False, "error": f"No stored open ports for {target}. Run a scan with `/nettools/trigger-scan`"}), 404
|
||||
return jsonify({"success": False, "error": f"No stored open ports for {target}. Run a scan with the 'run_nmap_scan' tool (or /nettools/nmap)."}), 404
|
||||
|
||||
return jsonify({"success": True, "target": target, "open_ports": open_ports})
|
||||
|
||||
@@ -674,7 +682,6 @@ def api_delete_unknown_devices(payload=None):
|
||||
return jsonify(device_handler.deleteUnknownDevices())
|
||||
|
||||
|
||||
@app.route('/mcp/sse/devices/export', methods=['GET'])
|
||||
@app.route("/devices/export", methods=["GET"])
|
||||
@app.route("/devices/export/<format>", methods=["GET"])
|
||||
@validate_request(
|
||||
@@ -716,7 +723,6 @@ def api_export_devices(format=None, payload=None):
|
||||
)
|
||||
|
||||
|
||||
@app.route('/mcp/sse/devices/import', methods=['POST'])
|
||||
@app.route("/devices/import", methods=["POST"])
|
||||
@validate_request(
|
||||
operation_id="import_devices",
|
||||
@@ -746,12 +752,11 @@ def api_import_csv(payload=None):
|
||||
return jsonify(result)
|
||||
|
||||
|
||||
@app.route('/mcp/sse/devices/totals', methods=['GET'])
|
||||
@app.route("/devices/totals", methods=["GET"])
|
||||
@validate_request(
|
||||
operation_id="get_device_totals",
|
||||
summary="Get Device Totals",
|
||||
description="Get device statistics including total count, online/offline counts, new devices, and archived devices.",
|
||||
summary="Get Device Totals (Deprecated)",
|
||||
description="Get device statistics including total count, online/offline counts, new devices, and archived devices. Deprecated: use /devices/totals/named instead.",
|
||||
response_model=DeviceTotalsResponse,
|
||||
tags=["devices"],
|
||||
auth_callable=is_authorized
|
||||
@@ -761,7 +766,30 @@ def api_devices_totals(payload=None):
|
||||
return jsonify(device_handler.getTotals())
|
||||
|
||||
|
||||
@app.route('/mcp/sse/devices/by-status', methods=['GET', 'POST'])
|
||||
@app.route("/devices/totals/named", methods=["GET"])
|
||||
@validate_request(
|
||||
operation_id="get_device_totals_named",
|
||||
summary="Get Named Device Totals",
|
||||
description="Get device statistics with named fields including total count, online/offline counts, new devices, and archived devices.",
|
||||
response_model=DeviceTotalsNamedResponse,
|
||||
tags=["devices"],
|
||||
auth_callable=is_authorized
|
||||
)
|
||||
def api_devices_totals_named(payload=None):
|
||||
device_handler = DeviceInstance()
|
||||
totals_list = device_handler.getTotals()
|
||||
# totals_list order: [devices, connected, favorites, new, down, archived]
|
||||
totals_dict = {
|
||||
"devices": totals_list[0] if len(totals_list) > 0 else 0,
|
||||
"connected": totals_list[1] if len(totals_list) > 1 else 0,
|
||||
"favorites": totals_list[2] if len(totals_list) > 2 else 0,
|
||||
"new": totals_list[3] if len(totals_list) > 3 else 0,
|
||||
"down": totals_list[4] if len(totals_list) > 4 else 0,
|
||||
"archived": totals_list[5] if len(totals_list) > 5 else 0
|
||||
}
|
||||
return jsonify({"success": True, "totals": totals_dict})
|
||||
|
||||
|
||||
@app.route("/devices/by-status", methods=["GET", "POST"])
|
||||
@validate_request(
|
||||
operation_id="list_devices_by_status_api",
|
||||
@@ -811,12 +839,11 @@ def api_devices_by_status(payload: DeviceListRequest = None):
|
||||
return jsonify(device_handler.getByStatus(status))
|
||||
|
||||
|
||||
@app.route('/mcp/sse/devices/search', methods=['POST'])
|
||||
@app.route('/devices/search', methods=['POST'])
|
||||
@validate_request(
|
||||
operation_id="search_devices_api",
|
||||
summary="Search Devices",
|
||||
description="Search for devices based on various criteria like name, IP, MAC, or vendor.",
|
||||
description="Search for devices based on various criteria like name, IP, MAC, or vendor. Use this to find MAC addresses for other tools.",
|
||||
request_model=DeviceSearchRequest,
|
||||
response_model=DeviceSearchResponse,
|
||||
tags=["devices"],
|
||||
@@ -878,7 +905,6 @@ def api_devices_search(payload=None):
|
||||
return jsonify({"success": True, "devices": matches})
|
||||
|
||||
|
||||
@app.route('/mcp/sse/devices/latest', methods=['GET'])
|
||||
@app.route('/devices/latest', methods=['GET'])
|
||||
@validate_request(
|
||||
operation_id="get_latest_device",
|
||||
@@ -899,12 +925,11 @@ def api_devices_latest(payload=None):
|
||||
return jsonify([latest])
|
||||
|
||||
|
||||
@app.route('/mcp/sse/devices/favorite', methods=['GET'])
|
||||
@app.route('/devices/favorite', methods=['GET'])
|
||||
@validate_request(
|
||||
operation_id="get_favorite_devices",
|
||||
summary="Get Favorite Devices",
|
||||
description="Get list of devices marked as favorites.",
|
||||
description="Get list of devices marked as favorites. Use `update_device_column` with 'devFavorite' to add devices.",
|
||||
response_model=DeviceListResponse,
|
||||
tags=["devices"],
|
||||
auth_callable=is_authorized
|
||||
@@ -916,11 +941,10 @@ def api_devices_favorite(payload=None):
|
||||
favorite = device_handler.getFavorite()
|
||||
|
||||
if not favorite:
|
||||
return jsonify({"success": False, "message": "No devices found", "error": "No devices found"}), 404
|
||||
return jsonify({"success": False, "message": "No devices found", "error": "No favorite devices found. Mark devices using `update_device_column`."}), 404
|
||||
return jsonify([favorite])
|
||||
|
||||
|
||||
@app.route('/mcp/sse/devices/network/topology', methods=['GET'])
|
||||
@app.route('/devices/network/topology', methods=['GET'])
|
||||
@validate_request(
|
||||
operation_id="get_network_topology",
|
||||
@@ -942,7 +966,6 @@ def api_devices_network_topology(payload=None):
|
||||
# --------------------------
|
||||
# Net tools
|
||||
# --------------------------
|
||||
@app.route('/mcp/sse/nettools/wakeonlan', methods=['POST'])
|
||||
@app.route("/nettools/wakeonlan", methods=["POST"])
|
||||
@validate_request(
|
||||
operation_id="wake_on_lan",
|
||||
@@ -979,7 +1002,6 @@ def api_wakeonlan(payload=None):
|
||||
return wakeonlan(mac)
|
||||
|
||||
|
||||
@app.route('/mcp/sse/nettools/traceroute', methods=['POST'])
|
||||
@app.route("/nettools/traceroute", methods=["POST"])
|
||||
@validate_request(
|
||||
operation_id="perform_traceroute",
|
||||
@@ -1036,11 +1058,20 @@ def api_nslookup(payload: NslookupRequest = None):
|
||||
@validate_request(
|
||||
operation_id="run_nmap_scan",
|
||||
summary="NMAP Scan",
|
||||
description="Perform an NMAP scan on a target IP.",
|
||||
description="Perform an NMAP scan on a target IP to identify open ports. This data is used by `get_open_ports`.",
|
||||
request_model=NmapScanRequest,
|
||||
response_model=NmapScanResponse,
|
||||
tags=["nettools"],
|
||||
auth_callable=is_authorized
|
||||
auth_callable=is_authorized,
|
||||
links={
|
||||
"GetOpenPorts": {
|
||||
"operationId": "get_open_ports",
|
||||
"parameters": {
|
||||
"target": "$response.body#/ip"
|
||||
},
|
||||
"description": "View the open ports discovered by this scan."
|
||||
}
|
||||
}
|
||||
)
|
||||
def api_nmap(payload: NmapScanRequest = None):
|
||||
"""
|
||||
@@ -1084,7 +1115,6 @@ def api_network_interfaces(payload=None):
|
||||
return network_interfaces()
|
||||
|
||||
|
||||
@app.route('/mcp/sse/nettools/trigger-scan', methods=['POST'])
|
||||
@app.route("/nettools/trigger-scan", methods=["GET", "POST"])
|
||||
@validate_request(
|
||||
operation_id="trigger_network_scan",
|
||||
@@ -1139,7 +1169,6 @@ def api_trigger_scan(payload=None):
|
||||
# MCP Server
|
||||
# --------------------------
|
||||
@app.route('/openapi.json', methods=['GET'])
|
||||
@app.route('/mcp/sse/openapi.json', methods=['GET'])
|
||||
def serve_openapi_spec():
|
||||
# Allow unauthenticated access to the spec itself so Swagger UI can load.
|
||||
# The actual API endpoints remain protected.
|
||||
@@ -1356,7 +1385,7 @@ def api_add_to_execution_queue(payload=None):
|
||||
path_params=[{
|
||||
"name": "mac",
|
||||
"description": "Device MAC address",
|
||||
"schema": {"type": "string"}
|
||||
"schema": {"type": "string", "pattern": "^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$"}
|
||||
}],
|
||||
request_model=CreateEventRequest,
|
||||
response_model=BaseResponse,
|
||||
@@ -1498,7 +1527,6 @@ def api_get_events_totals(payload=None):
|
||||
return jsonify(totals)
|
||||
|
||||
|
||||
@app.route('/mcp/sse/events/recent', methods=['GET', 'POST'])
|
||||
@app.route('/events/recent', methods=['GET', 'POST'])
|
||||
@validate_request(
|
||||
operation_id="get_recent_events",
|
||||
@@ -1518,7 +1546,6 @@ def api_events_default_24h(payload=None):
|
||||
return api_events_recent(hours)
|
||||
|
||||
|
||||
@app.route('/mcp/sse/events/last', methods=['GET', 'POST'])
|
||||
@app.route('/events/last', methods=['GET', 'POST'])
|
||||
@validate_request(
|
||||
operation_id="get_last_events",
|
||||
@@ -1707,7 +1734,7 @@ def api_get_session_events(payload=None):
|
||||
auth_callable=is_authorized
|
||||
)
|
||||
def metrics(payload=None):
|
||||
# Return Prometheus metrics as plain text
|
||||
# Return Prometheus metrics as plain text (not JSON)
|
||||
return Response(get_metric_stats(), mimetype="text/plain")
|
||||
|
||||
|
||||
|
||||
@@ -749,7 +749,8 @@ def _execute_tool(route: Dict[str, Any], args: Dict[str, Any]) -> Dict[str, Any]
|
||||
"type": "text",
|
||||
"text": json.dumps(json_content, indent=2)
|
||||
})
|
||||
except json.JSONDecodeError:
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
# Fallback for endpoints that return plain text instead of JSON (e.g., /metrics)
|
||||
content.append({
|
||||
"type": "text",
|
||||
"text": api_response.text
|
||||
|
||||
@@ -39,8 +39,7 @@ ALLOWED_DEVICE_COLUMNS = Literal[
|
||||
]
|
||||
|
||||
ALLOWED_NMAP_MODES = Literal[
|
||||
"quick", "intense", "ping", "comprehensive", "fast", "normal", "detail", "skipdiscovery",
|
||||
"-sS", "-sT", "-sU", "-sV", "-O"
|
||||
"fast", "normal", "detail", "skipdiscovery"
|
||||
]
|
||||
|
||||
NOTIFICATION_LEVELS = Literal["info", "warning", "error", "alert", "interrupt"]
|
||||
@@ -301,6 +300,24 @@ class DeviceTotalsResponse(RootModel):
|
||||
root: List[int] = Field(default_factory=list, description="List of counts: [all, online, favorites, new, offline, archived]")
|
||||
|
||||
|
||||
class DeviceTotalsNamedResponse(BaseResponse):
|
||||
"""Response with named device statistics."""
|
||||
totals: Dict[str, int] = Field(
|
||||
...,
|
||||
description="Dictionary of counts",
|
||||
json_schema_extra={
|
||||
"examples": [{
|
||||
"devices": 10,
|
||||
"connected": 5,
|
||||
"favorites": 2,
|
||||
"new": 1,
|
||||
"down": 0,
|
||||
"archived": 2
|
||||
}]
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class DeviceExportRequest(BaseModel):
|
||||
"""Request for exporting devices."""
|
||||
format: Literal["csv", "json"] = Field(
|
||||
@@ -702,7 +719,7 @@ class SessionInfo(BaseModel):
|
||||
|
||||
class CreateSessionRequest(BaseModel):
|
||||
"""Request to create a session."""
|
||||
mac: str = Field(..., description="Device MAC")
|
||||
mac: str = Field(..., description="Device MAC", pattern=MAC_PATTERN)
|
||||
ip: str = Field(..., description="Device IP")
|
||||
start_time: str = Field(..., description="Start time")
|
||||
end_time: Optional[str] = Field(None, description="End time")
|
||||
|
||||
@@ -54,7 +54,7 @@ def test_trigger_scan_ARPSCAN(mock_queue_class, client, api_token):
|
||||
mock_queue_class.return_value = mock_queue
|
||||
|
||||
payload = {"type": "ARPSCAN"}
|
||||
response = client.post("/mcp/sse/nettools/trigger-scan", json=payload, headers=auth_headers(api_token))
|
||||
response = client.post("/nettools/trigger-scan", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
@@ -71,7 +71,7 @@ def test_trigger_scan_invalid_type(mock_queue_class, client, api_token):
|
||||
mock_queue_class.return_value = mock_queue
|
||||
|
||||
payload = {"type": "invalid_type", "target": "192.168.1.0/24"}
|
||||
response = client.post("/mcp/sse/nettools/trigger-scan", json=payload, headers=auth_headers(api_token))
|
||||
response = client.post("/nettools/trigger-scan", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 400
|
||||
data = response.get_json()
|
||||
@@ -267,7 +267,7 @@ def test_get_latest_device(mock_db_conn, client, api_token):
|
||||
|
||||
def test_openapi_spec(client, api_token):
|
||||
"""Test openapi_spec endpoint contains MCP tool paths."""
|
||||
response = client.get("/mcp/sse/openapi.json", headers=auth_headers(api_token))
|
||||
response = client.get("/openapi.json", headers=auth_headers(api_token))
|
||||
assert response.status_code == 200
|
||||
spec = response.get_json()
|
||||
|
||||
@@ -297,7 +297,7 @@ def test_mcp_devices_export_csv(mock_db_conn, client, api_token):
|
||||
mock_conn.execute.return_value = mock_execute_result
|
||||
mock_db_conn.return_value = mock_conn
|
||||
|
||||
response = client.get("/mcp/sse/devices/export", headers=auth_headers(api_token))
|
||||
response = client.get("/devices/export", headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
# CSV response should have content-type header
|
||||
@@ -314,7 +314,7 @@ def test_mcp_devices_export_json(mock_export, client, api_token):
|
||||
"columns": ["devMac", "devName", "devLastIP"],
|
||||
}
|
||||
|
||||
response = client.get("/mcp/sse/devices/export?format=json", headers=auth_headers(api_token))
|
||||
response = client.get("/devices/export?format=json", headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
@@ -339,7 +339,7 @@ def test_mcp_devices_import_json(mock_db_conn, client, api_token):
|
||||
mock_import.return_value = {"success": True, "message": "Imported 2 devices"}
|
||||
|
||||
payload = {"content": "bW9ja2VkIGNvbnRlbnQ="} # base64 encoded content
|
||||
response = client.post("/mcp/sse/devices/import", json=payload, headers=auth_headers(api_token))
|
||||
response = client.post("/devices/import", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
@@ -362,7 +362,7 @@ def test_mcp_devices_totals(mock_db_conn, client, api_token):
|
||||
mock_conn.cursor.return_value = mock_sql
|
||||
mock_db_conn.return_value = mock_conn
|
||||
|
||||
response = client.get("/mcp/sse/devices/totals", headers=auth_headers(api_token))
|
||||
response = client.get("/devices/totals", headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
@@ -380,7 +380,7 @@ def test_mcp_traceroute(mock_traceroute, client, api_token):
|
||||
mock_traceroute.return_value = ({"success": True, "output": "traceroute output"}, 200)
|
||||
|
||||
payload = {"devLastIP": "8.8.8.8"}
|
||||
response = client.post("/mcp/sse/nettools/traceroute", json=payload, headers=auth_headers(api_token))
|
||||
response = client.post("/nettools/traceroute", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.get_json()
|
||||
@@ -395,7 +395,7 @@ def test_mcp_traceroute_missing_ip(mock_traceroute, client, api_token):
|
||||
mock_traceroute.return_value = ({"success": False, "error": "Invalid IP: None"}, 400)
|
||||
|
||||
payload = {} # Missing devLastIP
|
||||
response = client.post("/mcp/sse/nettools/traceroute", json=payload, headers=auth_headers(api_token))
|
||||
response = client.post("/nettools/traceroute", json=payload, headers=auth_headers(api_token))
|
||||
|
||||
assert response.status_code == 422
|
||||
data = response.get_json()
|
||||
|
||||
Reference in New Issue
Block a user