testsuite: add depth/cross-dir/daemon coverage helpers to rsyncfns.py

Add helpers for the option-coverage expansion (the path-handling restructure
changes parent-component resolution, so options must be exercised at depth and
across directory boundaries):

  * make_tree() builds a tree with a regular file at every level so a property
    can be checked at the tree root and >=3 levels deep;
  * walk_files()/walk_dirs() iterate entries for per-level assertions;
  * assert_same/assert_mode/assert_mtime_close/assert_is_symlink/
    assert_hardlinked/assert_not_hardlinked/assert_exists/assert_not_exists
    assert the concrete property an option controls (not just dest == src);
  * write_daemon_conf() writes an arbitrary rsyncd.conf (globals + modules)
    for daemon-parameter tests, beyond build_rsyncd_conf's fixed four modules;
  * forced_protocol() lets protocol-sensitive tests gate sub-cases.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Andrew Tridgell
2026-05-24 07:47:10 +10:00
parent 7bba25e675
commit 1d828f35ca

View File

@@ -19,6 +19,7 @@ from __future__ import annotations
import atexit
import fcntl
import filecmp
import os
import platform
import shlex
@@ -329,6 +330,17 @@ def rsync_argv(*args: str) -> list:
return shlex.split(RSYNC) + list(args)
def forced_protocol():
"""The protocol version pinned via --protocol=N in the RSYNC command, or
None when the run isn't pinning one (so the binary negotiates its newest).
Protocol-sensitive tests use this to gate sub-cases -- e.g. the split
between --append and --append-verify only exists at protocol >= 30; at
protocol 29 plain --append behaves like the old verifying append."""
import re
m = re.search(r'--protocol[ =](\d+)', RSYNC)
return int(m.group(1)) if m else None
def run_rsync(*args: str, check: bool = True,
capture_output: bool = False) -> subprocess.CompletedProcess:
"""Run rsync with the given arguments.
@@ -980,3 +992,191 @@ def check_perms(path, expected: str) -> 'None':
print(f"permissions: {perms} on {path}")
print(f"should be: {expected}")
test_fail(f"check_perms failed for {path}")
# --- depth / cross-dir coverage helpers ------------------------------------
# Added for the option-coverage expansion (see testsuite/COVERAGE.md).
# The path-handling restructure changes how parent components resolve, so its
# bugs surface only at DEPTH and across directory boundaries -- these helpers
# build trees with an entry at every level and assert the concrete property an
# option controls (not just dest == src).
def make_tree(root, depth: int = 3, *, data: bool = False,
content_lines: int = 20, data_size: int = 4096,
dirname: str = 'd', leaf: str = 'f'):
"""Create a layered directory tree with one regular file at every level.
For depth=3 under `root`:
root/f0
root/d1/f1
root/d1/d2/f2
root/d1/d2/d3/f3
so an option's effect can be checked at the tree root AND >=3 levels deep
(the parent-component resolution the path restructure rewrites).
Returns (dirs, files): `dirs` the created subdirectories outermost-first,
`files` the regular files shallow-first. Content is deterministic
(make_text_file) unless data=True (make_data_file, delta-friendly).
"""
root = Path(root)
root.mkdir(parents=True, exist_ok=True)
dirs = []
files = []
cur = root
for level in range(depth + 1):
f = cur / f'{leaf}{level}'
if data:
make_data_file(f, data_size)
else:
make_text_file(f, content_lines)
files.append(f)
if level < depth:
cur = cur / f'{dirname}{level + 1}'
cur.mkdir(exist_ok=True)
dirs.append(cur)
return dirs, files
def walk_files(root) -> list:
"""Every regular (non-symlink) file under `root`, sorted, recursively.
For asserting a per-entry property holds at every depth."""
root = Path(root)
return sorted(p for p in root.rglob('*')
if p.is_file() and not p.is_symlink())
def walk_dirs(root) -> list:
"""Every subdirectory under `root`, sorted, recursively."""
root = Path(root)
return sorted(p for p in root.rglob('*')
if p.is_dir() and not p.is_symlink())
def _tag(label: str) -> str:
return f"{label}: " if label else ""
def assert_same(a, b, label: str = '') -> 'None':
"""Fail unless files `a` and `b` have byte-identical content."""
if not filecmp.cmp(str(a), str(b), shallow=False):
test_fail(f"{_tag(label)}content differs between {a} and {b}")
def assert_mode(path, expected_octal: int, label: str = '') -> 'None':
"""Fail unless the permission bits (low 12) of `path` equal expected_octal
(pass an int like 0o644). Does not follow symlinks."""
mode = stat.S_IMODE(os.stat(path, follow_symlinks=False).st_mode)
if mode != expected_octal:
test_fail(f"{_tag(label)}mode {mode:04o} != expected "
f"{expected_octal:04o} on {path}")
def assert_mtime_close(a, b, tol: float = 1.0, label: str = '') -> 'None':
"""Fail unless the mtimes of `a` and `b` are within `tol` seconds.
`b` may be a number (an explicit epoch mtime) instead of a path."""
ma = os.stat(a, follow_symlinks=False).st_mtime
mb = b if isinstance(b, (int, float)) else os.stat(
b, follow_symlinks=False).st_mtime
if abs(ma - mb) > tol:
test_fail(f"{_tag(label)}mtime {ma} vs {mb} differ by > {tol}s "
f"(checking {a})")
def assert_is_symlink(path, target: str = None, label: str = '') -> 'None':
"""Fail unless `path` is a symlink (optionally pointing exactly at
`target`)."""
if not os.path.islink(path):
test_fail(f"{_tag(label)}{path} is not a symlink")
if target is not None:
actual = os.readlink(path)
if actual != target:
test_fail(f"{_tag(label)}{path} -> {actual!r}, "
f"expected {target!r}")
def assert_hardlinked(a, b, label: str = '') -> 'None':
"""Fail unless `a` and `b` are the same inode (a hard link / --link-dest
result)."""
sa = os.stat(a, follow_symlinks=False)
sb = os.stat(b, follow_symlinks=False)
if (sa.st_dev, sa.st_ino) != (sb.st_dev, sb.st_ino):
test_fail(f"{_tag(label)}{a} and {b} are not hard-linked "
f"(ino {sa.st_ino} vs {sb.st_ino})")
def assert_not_hardlinked(a, b, label: str = '') -> 'None':
"""Fail if `a` and `b` share an inode (e.g. --copy-dest must copy, not
link)."""
sa = os.stat(a, follow_symlinks=False)
sb = os.stat(b, follow_symlinks=False)
if (sa.st_dev, sa.st_ino) == (sb.st_dev, sb.st_ino):
test_fail(f"{_tag(label)}{a} and {b} unexpectedly share "
f"inode {sa.st_ino}")
def assert_exists(path, label: str = '') -> 'None':
"""Fail unless `path` exists (a symlink counts even if dangling)."""
if not os.path.lexists(path):
test_fail(f"{_tag(label)}{path} does not exist")
def assert_not_exists(path, label: str = '') -> 'None':
"""Fail if `path` exists (a dangling symlink counts as existing)."""
if os.path.lexists(path):
test_fail(f"{_tag(label)}{path} exists but should not")
def write_daemon_conf(modules, globals=None, *,
name: str = 'test-rsyncd.conf') -> 'Path':
"""Write a custom rsyncd.conf for daemon-parameter tests.
`modules` is a list of (module_name, {param: value}) pairs; `globals` an
optional dict of global parameters that override the minimal defaults
(pid file / use chroot=no / hosts allow / log file / max verbosity).
Mirrors build_rsyncd_conf()'s root-aware uid/gid handling (only emitted
when running as root) and writes the same `ignore23` wrapper, but lets a
test set arbitrary parameters/modules beyond the fixed four. Returns the
config path; pair with start_test_daemon().
"""
conf = SCRATCHDIR / name
pidfile = SCRATCHDIR / 'rsyncd.pid'
logfile = SCRATCHDIR / 'rsyncd.log'
g = {
'pid file': str(pidfile),
'use chroot': 'no',
'hosts allow': 'localhost 127.0.0.0/8',
'log file': str(logfile),
'max verbosity': '4',
}
if globals:
g.update(globals)
if get_testuid() == get_rootuid():
g.setdefault('uid', str(get_rootuid()))
g.setdefault('gid', str(get_rootgid()))
else:
# Non-root cannot set uid/gid in rsyncd.conf.
g.pop('uid', None)
g.pop('gid', None)
lines = ['# autogenerated by rsyncfns.write_daemon_conf', '']
lines += [f'{k} = {v}' for k, v in g.items()]
lines.append('')
for mod_name, params in modules:
lines.append(f'[{mod_name}]')
lines += [f'\t{k} = {v}' for k, v in params.items()]
lines.append('')
conf.write_text('\n'.join(lines) + '\n')
ignore23 = SCRATCHDIR / 'ignore23'
if not ignore23.exists():
ignore23.write_text(
'#!/bin/sh\n'
'if "${@}"; then exit; fi\n'
'ret=$?\n'
'if test $ret = 23; then exit; fi\n'
'exit $ret\n'
)
ignore23.chmod(0o755)
return conf