mirror of
https://github.com/stan-smith/FossFLOW.git
synced 2026-04-19 22:48:55 -04:00
310 lines
11 KiB
Python
310 lines
11 KiB
Python
"""E2E test: place multiple nodes, then undo/redo through them."""
|
|
import os
|
|
import time
|
|
import pytest
|
|
from selenium import webdriver
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.common.action_chains import ActionChains
|
|
|
|
|
|
SCREENSHOT_DIR = os.path.join(os.path.dirname(__file__), "..", "screenshots")
|
|
|
|
|
|
def get_base_url():
|
|
return os.getenv("FOSSFLOW_TEST_URL", "http://localhost:3000")
|
|
|
|
|
|
def get_webdriver_url():
|
|
return os.getenv("WEBDRIVER_URL", "http://localhost:4444")
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def driver():
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--headless=new")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--window-size=1920,1080")
|
|
d = webdriver.Remote(
|
|
command_executor=get_webdriver_url(),
|
|
options=chrome_options,
|
|
)
|
|
d.implicitly_wait(10)
|
|
yield d
|
|
d.quit()
|
|
|
|
|
|
def save_screenshot(driver, name):
|
|
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
|
|
path = os.path.join(SCREENSHOT_DIR, f"{name}.png")
|
|
driver.save_screenshot(path)
|
|
return path
|
|
|
|
|
|
def dismiss_modals(driver):
|
|
try:
|
|
driver.execute_script("""
|
|
const dialogs = document.querySelectorAll('[role="dialog"], [class*="MuiDialog"]');
|
|
dialogs.forEach(d => {
|
|
const closeBtn = d.querySelector('button');
|
|
if (closeBtn) closeBtn.click();
|
|
});
|
|
""")
|
|
time.sleep(0.5)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def count_canvas_images(driver):
|
|
return driver.execute_script("""
|
|
const c = document.querySelector('.fossflow-container');
|
|
if (!c) return 0;
|
|
return c.querySelectorAll('img').length;
|
|
""")
|
|
|
|
|
|
def get_model_items_count(driver):
|
|
"""Get model items count via React fiber store discovery."""
|
|
return driver.execute_script("""
|
|
var root = document.getElementById("root");
|
|
var ck = Object.keys(root).find(function(k) { return k.startsWith("__reactContainer"); });
|
|
if (!ck) return -1;
|
|
var fiber = root[ck], queue = [fiber], v = 0;
|
|
while (queue.length > 0 && v < 3000) {
|
|
var n = queue.shift(); if (!n) continue; v++;
|
|
if (n.pendingProps && n.pendingProps.value &&
|
|
typeof n.pendingProps.value === "object" && n.pendingProps.value !== null &&
|
|
typeof n.pendingProps.value.getState === "function") {
|
|
try {
|
|
var st = n.pendingProps.value.getState();
|
|
if (st && st.views !== undefined && st.items !== undefined) {
|
|
return (st.items || []).length;
|
|
}
|
|
} catch(e) {}
|
|
}
|
|
if (n.child) queue.push(n.child);
|
|
if (n.sibling) queue.push(n.sibling);
|
|
}
|
|
return -1;
|
|
""")
|
|
|
|
|
|
def place_node_at(driver, x_offset, y_offset):
|
|
"""Select icon and place at a specific canvas offset."""
|
|
# Click "Add item (N)" button
|
|
add_btn = driver.find_element(By.CSS_SELECTOR, "button[aria-label*='Add item']")
|
|
add_btn.click()
|
|
time.sleep(0.8)
|
|
|
|
# Expand ISOFLOW icon collection
|
|
driver.execute_script("""
|
|
const buttons = document.querySelectorAll('button');
|
|
for (const btn of buttons) {
|
|
const text = btn.textContent.trim().toUpperCase();
|
|
if (text.includes('ISOFLOW') && !text.includes('IMPORT')) {
|
|
btn.click(); return;
|
|
}
|
|
}
|
|
""")
|
|
time.sleep(2)
|
|
|
|
# Select first icon
|
|
first_icon_btn = driver.execute_script("""
|
|
const buttons = document.querySelectorAll('button');
|
|
for (const btn of buttons) {
|
|
const img = btn.querySelector('img');
|
|
if (img && img.naturalWidth > 0 && img.naturalWidth <= 100) return btn;
|
|
}
|
|
for (const btn of buttons) {
|
|
const img = btn.querySelector('img');
|
|
if (img) return btn;
|
|
}
|
|
return null;
|
|
""")
|
|
if first_icon_btn is None:
|
|
return False
|
|
|
|
ActionChains(driver).click(first_icon_btn).perform()
|
|
time.sleep(0.5)
|
|
|
|
# Click on canvas at specific offset
|
|
canvas = driver.find_element(By.CLASS_NAME, "fossflow-container")
|
|
ActionChains(driver).move_to_element_with_offset(canvas, x_offset, y_offset).click().perform()
|
|
time.sleep(1)
|
|
return True
|
|
|
|
|
|
def click_undo(driver):
|
|
btn = driver.execute_script("""
|
|
return document.querySelector("button[aria-label*='Undo']");
|
|
""")
|
|
if btn:
|
|
btn.click()
|
|
time.sleep(1)
|
|
return True
|
|
return False
|
|
|
|
|
|
def click_redo(driver):
|
|
btn = driver.execute_script("""
|
|
return document.querySelector("button[aria-label*='Redo']");
|
|
""")
|
|
if btn:
|
|
btn.click()
|
|
time.sleep(1)
|
|
return True
|
|
return False
|
|
|
|
|
|
def test_multi_node_undo_redo(driver):
|
|
"""Place 3 nodes, undo all 3, redo all 3, then undo 2 and place a new one (forking history)."""
|
|
base_url = get_base_url()
|
|
|
|
print(f"\n1. Loading app at {base_url}")
|
|
driver.get(base_url)
|
|
WebDriverWait(driver, 15).until(
|
|
EC.presence_of_element_located((By.CLASS_NAME, "fossflow-container"))
|
|
)
|
|
time.sleep(2)
|
|
dismiss_modals(driver)
|
|
time.sleep(0.5)
|
|
|
|
baseline_imgs = count_canvas_images(driver)
|
|
print(f" Baseline images: {baseline_imgs}")
|
|
|
|
# --- Place 3 nodes at different positions ---
|
|
positions = [(300, 300), (500, 300), (700, 300)]
|
|
for i, (x, y) in enumerate(positions):
|
|
print(f"\n2.{i+1}. Placing node {i+1} at ({x}, {y})...")
|
|
assert place_node_at(driver, x, y), f"Failed to place node {i+1}"
|
|
imgs = count_canvas_images(driver)
|
|
items = get_model_items_count(driver)
|
|
print(f" Images: {imgs}, Model items: {items}")
|
|
assert items == i + 1, f"Expected {i+1} model items, got {items}"
|
|
|
|
save_screenshot(driver, "multi_01_three_nodes")
|
|
after_3 = count_canvas_images(driver)
|
|
items_3 = get_model_items_count(driver)
|
|
print(f"\n After 3 nodes: images={after_3}, items={items_3}")
|
|
assert items_3 == 3
|
|
|
|
# --- Undo all 3 nodes one by one ---
|
|
print("\n3. Undoing all 3 nodes...")
|
|
for i in range(3):
|
|
click_undo(driver)
|
|
imgs = count_canvas_images(driver)
|
|
items = get_model_items_count(driver)
|
|
expected = 2 - i
|
|
print(f" After undo {i+1}: images={imgs}, items={items} (expected {expected})")
|
|
assert items == expected, f"After undo {i+1}: expected {expected} items, got {items}"
|
|
|
|
save_screenshot(driver, "multi_02_all_undone")
|
|
assert get_model_items_count(driver) == 0, "Expected 0 items after undoing all 3"
|
|
print(" All 3 nodes undone.")
|
|
|
|
# --- Redo all 3 nodes one by one ---
|
|
print("\n4. Redoing all 3 nodes...")
|
|
for i in range(3):
|
|
click_redo(driver)
|
|
imgs = count_canvas_images(driver)
|
|
items = get_model_items_count(driver)
|
|
expected = i + 1
|
|
print(f" After redo {i+1}: images={imgs}, items={items} (expected {expected})")
|
|
assert items == expected, f"After redo {i+1}: expected {expected} items, got {items}"
|
|
|
|
save_screenshot(driver, "multi_03_all_redone")
|
|
assert get_model_items_count(driver) == 3, "Expected 3 items after redoing all"
|
|
print(" All 3 nodes redone.")
|
|
|
|
# --- Undo 2, then place a new node (fork history) ---
|
|
print("\n5. Undoing 2 nodes to fork history...")
|
|
click_undo(driver)
|
|
click_undo(driver)
|
|
items = get_model_items_count(driver)
|
|
print(f" After 2 undos: items={items} (expected 1)")
|
|
assert items == 1, f"Expected 1 item after 2 undos, got {items}"
|
|
|
|
# Check redo is available before fork
|
|
can_redo = driver.execute_script("""
|
|
var root = document.getElementById("root");
|
|
var ck = Object.keys(root).find(function(k) { return k.startsWith("__reactContainer"); });
|
|
if (!ck) return null;
|
|
var fiber = root[ck], queue = [fiber], v = 0;
|
|
while (queue.length > 0 && v < 3000) {
|
|
var n = queue.shift(); if (!n) continue; v++;
|
|
if (n.pendingProps && n.pendingProps.value &&
|
|
typeof n.pendingProps.value === "object" && n.pendingProps.value !== null &&
|
|
typeof n.pendingProps.value.getState === "function") {
|
|
try {
|
|
var st = n.pendingProps.value.getState();
|
|
if (st && st.views !== undefined && st.items !== undefined && st.history) {
|
|
return st.history.future.length > 0;
|
|
}
|
|
} catch(e) {}
|
|
}
|
|
if (n.child) queue.push(n.child);
|
|
if (n.sibling) queue.push(n.sibling);
|
|
}
|
|
return null;
|
|
""")
|
|
print(f" canRedo before fork: {can_redo}")
|
|
assert can_redo, "Should be able to redo before forking"
|
|
|
|
print(" Placing new node to fork history...")
|
|
assert place_node_at(driver, 400, 300), "Failed to place fork node"
|
|
items = get_model_items_count(driver)
|
|
print(f" After fork placement: items={items} (expected 2)")
|
|
assert items == 2, f"Expected 2 items after fork placement, got {items}"
|
|
|
|
# Redo should now be impossible (future was cleared by new action)
|
|
can_redo = driver.execute_script("""
|
|
var root = document.getElementById("root");
|
|
var ck = Object.keys(root).find(function(k) { return k.startsWith("__reactContainer"); });
|
|
if (!ck) return null;
|
|
var fiber = root[ck], queue = [fiber], v = 0;
|
|
while (queue.length > 0 && v < 3000) {
|
|
var n = queue.shift(); if (!n) continue; v++;
|
|
if (n.pendingProps && n.pendingProps.value &&
|
|
typeof n.pendingProps.value === "object" && n.pendingProps.value !== null &&
|
|
typeof n.pendingProps.value.getState === "function") {
|
|
try {
|
|
var st = n.pendingProps.value.getState();
|
|
if (st && st.views !== undefined && st.items !== undefined && st.history) {
|
|
return st.history.future.length > 0;
|
|
}
|
|
} catch(e) {}
|
|
}
|
|
if (n.child) queue.push(n.child);
|
|
if (n.sibling) queue.push(n.sibling);
|
|
}
|
|
return null;
|
|
""")
|
|
print(f" canRedo after fork: {can_redo}")
|
|
assert not can_redo, "Should NOT be able to redo after forking history"
|
|
|
|
save_screenshot(driver, "multi_04_forked")
|
|
|
|
# --- Undo the fork node ---
|
|
print("\n6. Undoing the fork node...")
|
|
click_undo(driver)
|
|
items = get_model_items_count(driver)
|
|
print(f" After undo fork: items={items} (expected 1)")
|
|
assert items == 1, f"Expected 1 item after undoing fork, got {items}"
|
|
|
|
# --- Redo the fork node ---
|
|
print(" Redoing the fork node...")
|
|
click_redo(driver)
|
|
items = get_model_items_count(driver)
|
|
print(f" After redo fork: items={items} (expected 2)")
|
|
assert items == 2, f"Expected 2 item after redoing fork, got {items}"
|
|
|
|
save_screenshot(driver, "multi_05_final")
|
|
print("\n SUCCESS: Multi-node undo/redo with history forking works!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v", "-s"])
|