From ba97486a05e4927f4dcd8448346846dc3cb2a715 Mon Sep 17 00:00:00 2001 From: Rophy Tsai Date: Sat, 11 Apr 2026 15:41:33 +0000 Subject: [PATCH 1/6] feat: add continuous soak test mode to fuzz test framework Phase A - Storage cleanup (works with existing finite runs): - Add created_at column to all FUZZ_* tables for TTL-based purge - Add FUZZ_WKL.cleanup() procedure + DBMS_SCHEDULER job (every 30min) - Add archive log cleanup loop in fuzz-test.sh up (hourly) - Add seq dict pruning in kafka-consumer.py (every 10min, 24h TTL) - Add SQLite event purge in validator.py after each validation cycle Phase B - Continuous soak operation: - Add FUZZ_WKL.run_forever() with rate limiting via DBMS_SESSION.SLEEP - Add SOAK_MODE to validator.py: continuous validate-purge cycles - Add stall detection (exit if no new events for 5min) - Add fuzz-test.sh soak subcommand with health monitoring - Add SQLite busy_timeout to handle concurrent writer contention Tested: 2-min finite run (11,238 events, 0 mismatches) and 5-min soak run (2,407 events validated in cycle 1, 0 mismatches) on RAC VM. --- tests/dbz-twin/rac/SOAK-TEST.md | 222 +++++++ tests/dbz-twin/rac/docker-compose-fuzz.yaml | 5 + tests/dbz-twin/rac/fuzz-test.sh | 212 +++++++ tests/dbz-twin/rac/kafka-consumer.py | 33 +- tests/dbz-twin/rac/perf/fuzz-workload.sql | 160 ++++- tests/dbz-twin/rac/validator.py | 670 ++++++++++++-------- 6 files changed, 1018 insertions(+), 284 deletions(-) create mode 100644 tests/dbz-twin/rac/SOAK-TEST.md diff --git a/tests/dbz-twin/rac/SOAK-TEST.md b/tests/dbz-twin/rac/SOAK-TEST.md new file mode 100644 index 00000000..4bb1975d --- /dev/null +++ b/tests/dbz-twin/rac/SOAK-TEST.md @@ -0,0 +1,222 @@ +# Soak Test Implementation Guide + +Upgrade the existing fuzz test into a continuous soak test that runs indefinitely +with 24h TTL-based storage cleanup. + +## Current State + +The fuzz test runs a finite workload, waits for drain, validates once, then tears down. +Key components already exist: + +- `fuzz-test.sh` — orchestration (up/down/run/validate/db-check) +- `kafka-consumer.py` — Kafka → SQLite consumer (infinite loop) +- `validator.py` — per-event comparison with per-node watermarks +- `fuzz-workload.sql` — PL/SQL package generating random DML with event_id tracking +- `docker-compose-fuzz.yaml` — Kafka, Debezium connectors, consumer, validator + +## Goal + +Run the full pipeline indefinitely: +1. Load generator produces DML at constant rate, non-stop +2. Consumer writes to SQLite non-stop +3. Validator periodically checks data up to a watermark, reports pass/fail, purges old data +4. All storage cleaned up on a 24h TTL +5. Runs forever unless stopped or a failure is detected + +## Storage TTL: 24h Everywhere + +All transient data uses the same 24h retention window: + +| Storage | Current | Change | +|---------|---------|--------| +| Kafka topics | 24h retention | No change needed | +| SQLite events | Grows forever | Purge events older than 24h after validation | +| Archive logs | No cleanup | Cron: `find /shared/redo/archivelog -mtime +1 -delete` | +| Oracle tables | Grows forever | DELETE rows with `created_at < SYSDATE - 1` | + +If any component falls behind by more than ~1 hour, treat it as a test failure +and stop the whole setup for investigation. The 24h buffer is deliberately generous. + +## Changes Required + +### 1. Workload Generator — Infinite Loop + +**File**: `perf/fuzz-workload.sql` (FUZZ_WKL package) + +Current: `run(p_duration_min NUMBER)` exits after N minutes. + +Change: Add `run_forever(p_rate_per_sec NUMBER)` procedure that: +- Runs DML in an infinite loop with pacing (e.g., 10 ops/sec) +- Calls `DBMS_SESSION.SLEEP()` between batches to maintain target rate +- Handles connection drops gracefully (reconnect and continue) +- Periodically logs throughput stats to `FUZZ_STATS` table + +**Invocation**: `fuzz-test.sh soak` starts the workload via SSH on both RAC nodes +as background processes. No fixed duration. + +### 2. Consumer — Add Timestamp Column + +**File**: `kafka-consumer.py` + +Current SQLite schema: +```sql +CREATE TABLE lm_events ( + event_id TEXT, seq INTEGER, table_name TEXT, op TEXT, + raw_json TEXT, consumed_at REAL, + PRIMARY KEY (event_id, seq) +); +``` + +`consumed_at` already exists (epoch float). No schema change needed — use it for TTL purge. + +**Memory concern**: The `lm_seq` / `olr_seq` dicts track event_id → next seq number +and grow unbounded. Add periodic cleanup: remove entries where the event_id's +`consumed_at` is older than 24h. The comment warns against trimming due to seq reset +bugs — the fix is to only trim entries outside the 24h window (no active events +will reference them). + +### 3. Validator — Periodic + Purge Mode + +**File**: `validator.py` + +Current: Polls until idle (120s no new events), then validates once and exits. + +Change to continuous mode: +- Run in an infinite loop with configurable interval (e.g., every 5 minutes) +- Each cycle: + 1. Compute per-node watermarks (same logic as today) + 2. Validate all events up to watermark + 3. Report results: `[SOAK] cycle=42 validated=12345 mismatches=0 elapsed=3.2s` + 4. **Purge**: DELETE from lm_events/olr_events WHERE consumed_at < (now - 24h) + 5. If any non-LOB mismatches found, write failure report and exit non-zero +- Never exit on idle — in soak mode, idle just means the workload is between batches + +**New env vars**: +- `SOAK_MODE=1` — enables continuous validation + purge (default: off, preserving current behavior) +- `PURGE_TTL_HOURS=24` — how old events must be before purge +- `VALIDATE_INTERVAL_SEC=300` — seconds between validation cycles + +### 4. Archive Log Cleanup (part of `fuzz-test.sh up`) + +**Where**: Started as a background job during `fuzz-test.sh up`, not soak-specific. +Useful for any fuzz run — even finite runs leave behind archive logs. + +Add to `fuzz-test.sh up` after all services are started: +```bash +# Background archive cleanup every hour (killed on `down`) +while true; do + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "find /shared/redo/archivelog -mtime +1 -delete" + sleep 3600 +done & +echo $! > "$WORK_DIR/archive_cleanup.pid" +``` + +Kill in `fuzz-test.sh down`: +```bash +[ -f "$WORK_DIR/archive_cleanup.pid" ] && kill "$(cat "$WORK_DIR/archive_cleanup.pid")" 2>/dev/null +``` + +### 5. Oracle Table Cleanup (part of `fuzz-test.sh up`) + +**File**: `perf/fuzz-workload.sql` + +Add a `cleanup()` procedure to FUZZ_WKL package: +```sql +PROCEDURE cleanup IS +BEGIN + FOR t IN (SELECT table_name FROM user_tables + WHERE table_name LIKE 'FUZZ_%' + AND table_name != 'FUZZ_STATS') LOOP + EXECUTE IMMEDIATE 'DELETE FROM ' || t.table_name || + ' WHERE created_at < SYSDATE - 1'; + COMMIT; + END LOOP; +END; +``` + +**Prerequisite**: Add `created_at DATE DEFAULT SYSDATE` column to all FUZZ_* tables +in the table creation DDL (already in `fuzz-test.sh up`). + +Schedule via Oracle DBMS_SCHEDULER job created during `fuzz-test.sh up`: +```sql +BEGIN + DBMS_SCHEDULER.CREATE_JOB( + job_name => 'FUZZ_CLEANUP', + job_type => 'PLSQL_BLOCK', + job_action => 'BEGIN FUZZ_WKL.cleanup; END;', + repeat_interval => 'FREQ=MINUTELY;INTERVAL=30', + enabled => TRUE + ); +END; +``` + +Drop in `fuzz-test.sh down` table cleanup. + +### 6. New Subcommand: `fuzz-test.sh soak` + +Orchestration for the continuous run: + +``` +fuzz-test.sh soak [rate-per-sec] +``` + +Steps: +1. Verify all services running (reuse Stage 1 from `run`) +2. Start archive log cleanup loop (background) +3. Start workload on both RAC nodes (background, infinite) +4. Print status line every 60s: + `[SOAK] uptime=2h13m LM=45230 OLR=45228 validated=44000 mismatches=0` +5. Monitor for failures: + - Validator exits non-zero → stop everything, print report + - Consumer dies → stop everything + - OLR container exits → stop everything +6. On SIGINT/SIGTERM: graceful shutdown (stop workload, drain, final validate) + +### 7. Health Monitoring + +Add a stall detector — if no new events arrive for 5 minutes, something is broken: + +In the validator's continuous loop, track `last_event_time`. If +`now - last_event_time > 300s` while the workload is running, report: +``` +[STALL] No new events for 5m. Last LM event: N1_00045230 at 10:15:02. Last OLR event: N1_00045228 at 10:15:01. +``` +Then exit non-zero to trigger the soak test shutdown. + +## Implementation Order + +**Phase A — Cleanup (part of fuzz environment, not soak-specific):** +1. **Add `created_at` column** to FUZZ_* table DDL + Oracle cleanup job (DBMS_SCHEDULER) +2. **Archive log cleanup** — background loop started by `fuzz-test.sh up` +3. **Consumer seq dict cleanup** — prune entries older than 24h +4. **Validator purge** — DELETE validated events older than 24h (enabled by default) + +Phase A can be tested with the existing finite workload — just run longer and +verify storage stays bounded. + +**Phase B — Soak mode (continuous operation):** +5. **Validator soak mode** — continuous validate-purge loop instead of exit-on-idle +6. **Workload `run_forever`** — infinite loop with rate limiting +7. **`fuzz-test.sh soak`** subcommand — ties everything together +8. **Health monitoring** — stall detection + +## Testing the Upgrade + +1. Run `fuzz-test.sh soak 5` (5 ops/sec) for 1 hour +2. Verify validator reports pass every 5 minutes +3. Verify SQLite size stays bounded (check after 30min) +4. Verify archive log directory doesn't grow past ~2 GB +5. Verify Oracle tablespace stays bounded +6. Kill OLR mid-run → confirm stall detection fires within 5 minutes +7. Run for 24+ hours → confirm TTL purge works across the full window + +## Files to Modify + +| File | Change | +|------|--------| +| `fuzz-test.sh` | Add `soak` subcommand | +| `validator.py` | Add SOAK_MODE, continuous loop, purge | +| `kafka-consumer.py` | Add seq dict cleanup for old entries | +| `perf/fuzz-workload.sql` | Add `run_forever()`, `cleanup()`, `created_at` columns | +| `docker-compose-fuzz.yaml` | Add SOAK_MODE env var to validator service | diff --git a/tests/dbz-twin/rac/docker-compose-fuzz.yaml b/tests/dbz-twin/rac/docker-compose-fuzz.yaml index 999fa9df..4ce381d4 100644 --- a/tests/dbz-twin/rac/docker-compose-fuzz.yaml +++ b/tests/dbz-twin/rac/docker-compose-fuzz.yaml @@ -80,6 +80,7 @@ services: LM_TOPIC: lm-events OLR_TOPIC: olr-events OLR_LOB_TOPIC: olr-lob-events + PURGE_TTL_HOURS: "${PURGE_TTL_HOURS:-24}" volumes: - ./kafka-consumer.py:/app/kafka-consumer.py:ro - fuzz-data:/app/data @@ -100,6 +101,10 @@ services: SQLITE_DB: /app/data/fuzz.db POLL_INTERVAL: "10" IDLE_TIMEOUT: "120" + SOAK_MODE: "${SOAK_MODE:-0}" + PURGE_TTL_HOURS: "${PURGE_TTL_HOURS:-24}" + VALIDATE_INTERVAL_SEC: "${VALIDATE_INTERVAL_SEC:-300}" + STALL_TIMEOUT_SEC: "${STALL_TIMEOUT_SEC:-300}" volumes: - ./validator.py:/app/validator.py:ro - fuzz-data:/app/data diff --git a/tests/dbz-twin/rac/fuzz-test.sh b/tests/dbz-twin/rac/fuzz-test.sh index 8959bbe4..240c8c62 100755 --- a/tests/dbz-twin/rac/fuzz-test.sh +++ b/tests/dbz-twin/rac/fuzz-test.sh @@ -10,6 +10,7 @@ # Actions: # up Start infrastructure (Kafka, Debezium, consumer, validator, OLR) # run [duration-min] Deploy fuzz workload and run for N minutes (default: 30) +# soak [rate-per-sec] Start continuous soak test (default rate: 10 ops/sec) # status Show consumer/validator status and OLR memory # validate Run validator (wait for idle timeout, report results) # logs [component] Show logs (kafka, logminer, olr, lob-logminer, consumer, validator, olr-vm) @@ -49,6 +50,7 @@ DB_CONN2="${DB_CONN2:-olr_test/olr_test@//racnodep2:1521/ORCLPDB}" OLR_CONTAINER="olr-debezium" COMPOSE_FILE="$SCRIPT_DIR/docker-compose-fuzz.yaml" +WORK_DIR="/tmp/fuzz_soak_state" # ---- SSH helpers ---- _vm_sqlplus() { @@ -226,6 +228,37 @@ action_up() { sleep 2 done + # Start background archive log cleanup (hourly, killed on `down`) + mkdir -p "$WORK_DIR" + ( + while true; do + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "find /shared/redo/archivelog -mtime +1 -delete" 2>/dev/null || true + sleep 3600 + done + ) & + echo $! > "$WORK_DIR/archive_cleanup.pid" + echo " Archive cleanup: started (PID $!, hourly)" + + # Create Oracle cleanup scheduler job (purges FUZZ_* rows older than 24h) + cat > "$WORK_DIR/create_cleanup_job.sql" <<'SQL' +SET FEEDBACK OFF +BEGIN + BEGIN DBMS_SCHEDULER.DROP_JOB('FUZZ_CLEANUP', TRUE); EXCEPTION WHEN OTHERS THEN NULL; END; + DBMS_SCHEDULER.CREATE_JOB( + job_name => 'FUZZ_CLEANUP', + job_type => 'PLSQL_BLOCK', + job_action => 'BEGIN FUZZ_WKL.cleanup; END;', + repeat_interval => 'FREQ=MINUTELY;INTERVAL=30', + enabled => TRUE + ); +END; +/ +EXIT +SQL + _exec_user "$WORK_DIR/create_cleanup_job.sql" > /dev/null 2>&1 || true + echo " Oracle cleanup job: created (every 30min)" + echo "" echo " OLR memory: $(_olr_memory_mb) MB" echo "" @@ -450,9 +483,39 @@ action_logs() { action_down() { echo "=== Stopping fuzz test infrastructure ===" + + # Kill background archive cleanup + if [[ -f "$WORK_DIR/archive_cleanup.pid" ]]; then + kill "$(cat "$WORK_DIR/archive_cleanup.pid")" 2>/dev/null || true + rm -f "$WORK_DIR/archive_cleanup.pid" + echo " Archive cleanup: stopped" + fi + + # Kill soak workload processes + for pidfile in "$WORK_DIR"/soak_node*.pid; do + [[ -f "$pidfile" ]] && kill "$(cat "$pidfile")" 2>/dev/null || true + done + rm -f "$WORK_DIR"/soak_node*.pid + + # Drop Oracle cleanup scheduler job + local drop_sql="$WORK_DIR/drop_cleanup_job.sql" + if [[ -d "$WORK_DIR" ]]; then + cat > "$drop_sql" <<'SQL' +SET FEEDBACK OFF +BEGIN DBMS_SCHEDULER.DROP_JOB('FUZZ_CLEANUP', TRUE); EXCEPTION WHEN OTHERS THEN NULL; END; +/ +EXIT +SQL + _exec_user "$drop_sql" > /dev/null 2>&1 || true + fi + + # Stop soak validator (named container, not part of compose up) + docker rm -f fuzz-validator 2>/dev/null || true + docker compose -f "$COMPOSE_FILE" down -v 2>/dev/null ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ "podman stop -t5 $OLR_CONTAINER 2>/dev/null; podman rm $OLR_CONTAINER 2>/dev/null; true" + rm -rf "$WORK_DIR" echo " Done." } @@ -477,9 +540,158 @@ action_db_check() { python3 "$SCRIPT_DIR/db-check.py" } +action_soak() { + local rate="${1:-10}" + local skip_lob="${SKIP_LOB:-0}" + + echo "=== Starting soak test (rate=${rate} ops/sec) ===" + + # Verify services are running + for svc in fuzz-kafka fuzz-dbz-logminer fuzz-dbz-olr fuzz-dbz-lob-logminer fuzz-consumer; do + if ! docker ps --format '{{.Names}}' | grep -q "^${svc}$"; then + echo "ERROR: $svc is not running. Run './fuzz-test.sh up' first." >&2 + exit 1 + fi + done + if ! ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman ps --format '{{.Names}}' | grep -q $OLR_CONTAINER" 2>/dev/null; then + echo "ERROR: OLR container not running on VM." >&2 + exit 1 + fi + echo " All services: OK" + + mkdir -p "$WORK_DIR" + + # Start validator in soak mode (continuous validation + purge) + echo " Starting validator in soak mode..." + docker rm -f fuzz-validator 2>/dev/null || true + docker compose -f "$COMPOSE_FILE" run -d \ + --name fuzz-validator \ + -e SOAK_MODE=1 \ + -e PURGE_TTL_HOURS="${PURGE_TTL_HOURS:-24}" \ + -e VALIDATE_INTERVAL_SEC="${VALIDATE_INTERVAL_SEC:-300}" \ + -e STALL_TIMEOUT_SEC="${STALL_TIMEOUT_SEC:-300}" \ + validator > /dev/null 2>&1 + echo " Validator: started (soak mode)" + + # Log switch to flush redo + local log_switch_sql="$WORK_DIR/log_switch.sql" + cat > "$log_switch_sql" <<'SQL' +SET FEEDBACK OFF +ALTER SYSTEM SWITCH ALL LOGFILE; +BEGIN DBMS_SESSION.SLEEP(2); END; +/ +EXIT +SQL + _exec_sysdba "$log_switch_sql" > /dev/null + + # Create runner scripts for infinite workload + cat > "$WORK_DIR/soak_node1.sql" < ${rate}, p_seed => 42, p_node_id => 1, p_skip_lob => ${skip_lob}); +EXIT; +SQL + cat > "$WORK_DIR/soak_node2.sql" < ${rate}, p_seed => 137, p_node_id => 2, p_skip_lob => ${skip_lob}); +EXIT; +SQL + + _vm_copy_in "$WORK_DIR/soak_node1.sql" "/tmp/soak_node1.sql" "$RAC_NODE1" + _vm_copy_in "$WORK_DIR/soak_node2.sql" "/tmp/soak_node2.sql" "$RAC_NODE2" + + # Start workload on both nodes (background) + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman exec $RAC_NODE1 su - oracle -c 'export ORACLE_SID=$ORACLE_SID1; sqlplus -S $DB_CONN1 @/tmp/soak_node1.sql'" \ + > "$WORK_DIR/soak_out1.log" 2>&1 & + local pid1=$! + echo $pid1 > "$WORK_DIR/soak_node1.pid" + + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman exec $RAC_NODE2 su - oracle -c 'export ORACLE_SID=$ORACLE_SID2; sqlplus -S $DB_CONN2 @/tmp/soak_node2.sql'" \ + > "$WORK_DIR/soak_out2.log" 2>&1 & + local pid2=$! + echo $pid2 > "$WORK_DIR/soak_node2.pid" + + echo " Workload: started on both nodes (PIDs: $pid1, $pid2)" + echo "" + echo "=== Soak test running. Monitor with: ./fuzz-test.sh status ===" + echo "=== Stop with: ./fuzz-test.sh down ===" + + # Graceful shutdown handler + _soak_shutdown() { + echo "" + echo "=== Shutting down soak test ===" + kill $pid1 $pid2 2>/dev/null || true + wait $pid1 $pid2 2>/dev/null || true + echo " Workload: stopped" + echo " Run './fuzz-test.sh down' to tear down infrastructure." + } + trap _soak_shutdown INT TERM + + # Monitor loop: print status + detect failures + local start_time=$SECONDS + while true; do + sleep 60 + + # Check workload processes + local n1_ok=true n2_ok=true + kill -0 $pid1 2>/dev/null || n1_ok=false + kill -0 $pid2 2>/dev/null || n2_ok=false + + # Check validator + local val_ok=true + docker ps --format '{{.Names}}' | grep -q "^fuzz-validator$" || val_ok=false + + # Check OLR + local olr_ok=true + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ + "podman ps --format '{{.Names}}' | grep -q $OLR_CONTAINER" 2>/dev/null || olr_ok=false + + # Get status + local uptime_sec=$(( SECONDS - start_time )) + local uptime_h=$(( uptime_sec / 3600 )) + local uptime_m=$(( (uptime_sec % 3600) / 60 )) + local mem=$(_olr_memory_mb) + local consumer_line + consumer_line=$(docker logs --tail 1 fuzz-consumer 2>/dev/null | grep -o '\[consumer\].*' || echo "?") + local validator_line + validator_line=$(docker logs --tail 1 fuzz-validator 2>/dev/null | grep -o '\[SOAK\].*' || echo "?") + + printf "[SOAK] uptime=%dh%02dm OLR=%sMB %s %s\n" \ + "$uptime_h" "$uptime_m" "$mem" "$consumer_line" "$validator_line" + + # Detect failures + if ! $val_ok; then + echo "" + echo "=== FAIL: Validator exited ===" + docker logs --tail 20 fuzz-validator 2>/dev/null + _soak_shutdown + exit 1 + fi + if ! $olr_ok; then + echo "" + echo "=== FAIL: OLR container exited ===" + ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman logs --tail 20 $OLR_CONTAINER" 2>/dev/null + _soak_shutdown + exit 1 + fi + if ! $n1_ok && ! $n2_ok; then + echo "" + echo "=== WARN: Both workload processes exited ===" + echo " Node 1 log tail:" + tail -5 "$WORK_DIR/soak_out1.log" 2>/dev/null | sed 's/^/ /' + echo " Node 2 log tail:" + tail -5 "$WORK_DIR/soak_out2.log" 2>/dev/null | sed 's/^/ /' + _soak_shutdown + exit 1 + fi + done +} + case "$ACTION" in up) action_up ;; run) action_run "$@" ;; + soak) action_soak "$@" ;; status) action_status ;; validate) action_validate ;; db-check) action_db_check ;; diff --git a/tests/dbz-twin/rac/kafka-consumer.py b/tests/dbz-twin/rac/kafka-consumer.py index 086cc514..7378b3da 100644 --- a/tests/dbz-twin/rac/kafka-consumer.py +++ b/tests/dbz-twin/rac/kafka-consumer.py @@ -20,6 +20,7 @@ KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP', 'localhost:9092') SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db') +PURGE_TTL_HOURS = int(os.environ.get('PURGE_TTL_HOURS', '24')) OP_MAP = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE'} @@ -30,9 +31,10 @@ def init_db(db_path): """Create SQLite database and tables.""" os.makedirs(os.path.dirname(db_path) or '.', exist_ok=True) - conn = sqlite3.connect(db_path) + conn = sqlite3.connect(db_path, timeout=30) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA busy_timeout=30000") conn.execute(""" CREATE TABLE IF NOT EXISTS lm_events ( event_id TEXT NOT NULL, @@ -171,6 +173,7 @@ def main(): batch = [] batch_start = time.time() last_report = time.time() + last_seq_cleanup = time.time() try: while True: @@ -226,11 +229,29 @@ def main(): batch = [] batch_start = time.time() - # Seq maps grow with each unique event_id but entries are small - # (string key -> int value). For a 60-min run with ~30K events, - # this is ~2MB. Do not trim — trimming caused seq reset bugs - # where later events for the same event_id got seq=0 again, - # overwriting earlier events via INSERT OR REPLACE. + # Purge seq dict entries for events older than TTL. + # Safe because events outside the 24h window will never receive + # new Kafka messages — they're well past Kafka's retention. + now = time.time() + if now - last_seq_cleanup >= 600: # every 10 minutes + cutoff = now - PURGE_TTL_HOURS * 3600 + for tbl, seq_map in [('lm_events', lm_seq), + ('olr_events', olr_seq)]: + old_eids = set() + rows = conn.execute( + f"SELECT DISTINCT event_id FROM {tbl} " + "WHERE consumed_at < ?", (cutoff,)).fetchall() + old_eids = {r[0] for r in rows} + pruned = 0 + for eid in old_eids: + if eid in seq_map: + del seq_map[eid] + pruned += 1 + if pruned: + print(f"[consumer] Pruned {pruned} seq entries " + f"from {tbl} (older than {PURGE_TTL_HOURS}h)", + flush=True) + last_seq_cleanup = now # Report progress every 30 seconds now = time.time() diff --git a/tests/dbz-twin/rac/perf/fuzz-workload.sql b/tests/dbz-twin/rac/perf/fuzz-workload.sql index f3089ca7..ea02237b 100644 --- a/tests/dbz-twin/rac/perf/fuzz-workload.sql +++ b/tests/dbz-twin/rac/perf/fuzz-workload.sql @@ -40,7 +40,8 @@ CREATE TABLE olr_test.FUZZ_SCALAR ( col_date DATE, col_ts TIMESTAMP(6), col_raw RAW(200), - col_flag NUMBER(1) DEFAULT 0 + col_flag NUMBER(1) DEFAULT 0, + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_SCALAR ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -57,7 +58,8 @@ CREATE TABLE olr_test.FUZZ_WIDE ( n06 NUMBER, n07 NUMBER, n08 NUMBER, n09 NUMBER, n10 NUMBER, d01 DATE, d02 DATE, d03 DATE, t01 TIMESTAMP, t02 TIMESTAMP, t03 TIMESTAMP, - r01 RAW(50), r02 RAW(50), r03 RAW(50) + r01 RAW(50), r02 RAW(50), r03 RAW(50), + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_WIDE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -69,7 +71,8 @@ CREATE TABLE olr_test.FUZZ_LOB ( event_id VARCHAR2(30) NOT NULL, label VARCHAR2(50), content CLOB, - bin_data BLOB + bin_data BLOB, + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_LOB ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -81,7 +84,8 @@ CREATE TABLE olr_test.FUZZ_PART ( event_id VARCHAR2(30) NOT NULL, region VARCHAR2(20), val NUMBER, - payload VARCHAR2(500) + payload VARCHAR2(500), + created_at DATE DEFAULT SYSDATE ) PARTITION BY LIST (region) ( PARTITION p_east VALUES ('EAST'), PARTITION p_west VALUES ('WEST'), @@ -99,7 +103,8 @@ CREATE TABLE olr_test.FUZZ_NOPK ( name VARCHAR2(100), value NUMBER, status VARCHAR2(20), - ts TIMESTAMP DEFAULT SYSTIMESTAMP + ts TIMESTAMP DEFAULT SYSTIMESTAMP, + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_NOPK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -111,7 +116,8 @@ CREATE TABLE olr_test.FUZZ_MAXSTR ( event_id VARCHAR2(30) NOT NULL, col_long1 VARCHAR2(4000), col_long2 VARCHAR2(4000), - col_short VARCHAR2(10) + col_short VARCHAR2(10), + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_MAXSTR ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -123,7 +129,8 @@ CREATE TABLE olr_test.FUZZ_INTERVAL ( event_id VARCHAR2(30) NOT NULL, col_ym INTERVAL YEAR(4) TO MONTH, col_ds INTERVAL DAY(4) TO SECOND(6), - col_num NUMBER + col_num NUMBER, + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_INTERVAL ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -152,6 +159,13 @@ CREATE OR REPLACE PACKAGE olr_test.FUZZ_WKL AS p_node_id IN NUMBER DEFAULT 1, p_skip_lob IN NUMBER DEFAULT 0 -- 1 = skip LOB table operations ); + PROCEDURE run_forever( + p_rate_per_sec IN NUMBER DEFAULT 10, + p_seed IN NUMBER DEFAULT 1, + p_node_id IN NUMBER DEFAULT 1, + p_skip_lob IN NUMBER DEFAULT 0 + ); + PROCEDURE cleanup; END FUZZ_WKL; / @@ -840,6 +854,138 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS EXTRACT(HOUR FROM (SYSTIMESTAMP - v_start)) * 3600)); END; + -- ---- Cleanup: purge rows older than 24h ---- + + PROCEDURE cleanup IS + BEGIN + FOR t IN (SELECT table_name FROM user_tables + WHERE table_name LIKE 'FUZZ_%' + AND table_name != 'FUZZ_STATS') LOOP + EXECUTE IMMEDIATE 'DELETE FROM ' || t.table_name || + ' WHERE created_at < SYSDATE - 1'; + COMMIT; + END LOOP; + END; + + -- ---- Continuous run for soak testing ---- + + PROCEDURE run_forever( + p_rate_per_sec IN NUMBER DEFAULT 10, + p_seed IN NUMBER DEFAULT 1, + p_node_id IN NUMBER DEFAULT 1, + p_skip_lob IN NUMBER DEFAULT 0 + ) IS + v_start TIMESTAMP := SYSTIMESTAMP; + v_txn_dice PLS_INTEGER; + v_batch PLS_INTEGER; + v_seed_id PLS_INTEGER; + v_seed_region VARCHAR2(20); + v_cycle_start TIMESTAMP; + v_cycle_ops PLS_INTEGER; + v_sleep_sec NUMBER; + v_ops_before PLS_INTEGER; + BEGIN + -- Initialize (same as run) + g_node_id := p_node_id; + g_next_id := p_node_id; + g_event_seq := 0; + g_insert_cnt := 0; g_update_cnt := 0; g_delete_cnt := 0; + g_rollback_cnt := 0; g_lob_cnt := 0; g_total_ops := 0; + g_skip_lob := p_skip_lob; + + DBMS_RANDOM.SEED(p_seed); + + -- Seed initial data + v_seed_id := 0; + g_scalar_id_cnt := 0; g_lob_id_cnt := 0; g_wide_id_cnt := 0; + g_part_id_cnt := 0; g_maxstr_id_cnt := 0; g_interval_id_cnt := 0; + FOR i IN 1..50 LOOP + v_seed_id := next_id; + INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_flag) + VALUES (v_seed_id, 'SEED', DBMS_RANDOM.STRING('x', 20), 0); + track_id(g_scalar_ids, g_scalar_id_cnt, v_seed_id); + END LOOP; + IF g_skip_lob = 0 THEN + FOR i IN 1..5 LOOP + v_seed_id := next_id; + INSERT INTO olr_test.FUZZ_LOB (id, event_id, label, content) + VALUES (v_seed_id, 'SEED', 'seed', 'seed'); + track_id(g_lob_ids, g_lob_id_cnt, v_seed_id); + END LOOP; + END IF; + FOR i IN 1..20 LOOP + v_seed_id := next_id; + v_seed_region := REGIONS(rand_int(1, 5)); + INSERT INTO olr_test.FUZZ_PART (id, event_id, region, val, payload) + VALUES (v_seed_id, 'SEED', v_seed_region, 0, 'seed'); + track_id(g_part_ids, g_part_id_cnt, v_seed_id); + END LOOP; + FOR i IN 1..10 LOOP + INSERT INTO olr_test.FUZZ_NOPK (event_id, name, value, status) + VALUES ('SEED', 'seed', 0, 'ACTIVE'); + END LOOP; + COMMIT; + g_event_seq := 0; + g_insert_cnt := 0; g_total_ops := 0; + + -- Infinite loop with rate limiting + LOOP + v_cycle_start := SYSTIMESTAMP; + v_ops_before := g_total_ops; + + -- Do one transaction cycle + v_txn_dice := rand_int(1, 100); + + IF v_txn_dice <= 55 THEN + do_random_op; + COMMIT; + ELSIF v_txn_dice <= 70 THEN + v_batch := rand_int(2, 5); + FOR j IN 1..v_batch LOOP + do_random_op; + END LOOP; + COMMIT; + ELSIF v_txn_dice <= 80 THEN + do_random_op; + ROLLBACK; + g_rollback_cnt := g_rollback_cnt + 1; + ELSIF v_txn_dice <= 90 THEN + do_random_op; + SAVEPOINT sp_fuzz; + do_random_op; + ROLLBACK TO sp_fuzz; + g_rollback_cnt := g_rollback_cnt + 1; + do_random_op; + COMMIT; + ELSE + v_batch := rand_int(10, 30); + FOR j IN 1..v_batch LOOP + do_random_op; + END LOOP; + COMMIT; + END IF; + + -- Rate limiting: sleep to maintain target ops/sec + v_cycle_ops := g_total_ops - v_ops_before; + IF p_rate_per_sec > 0 AND v_cycle_ops > 0 THEN + v_sleep_sec := v_cycle_ops / p_rate_per_sec; + IF v_sleep_sec > 0.01 THEN + DBMS_SESSION.SLEEP(v_sleep_sec); + END IF; + ELSE + DBMS_SESSION.SLEEP(0.1); + END IF; + + -- Update stats every 100 ops + IF MOD(g_total_ops, 100) = 0 THEN + update_stats; + DBMS_OUTPUT.PUT_LINE('FUZZ_SOAK: node=' || g_node_id || + ' ops=' || g_total_ops || + ' last_event=N' || g_node_id || '_' || LPAD(g_event_seq, 8, '0')); + END IF; + END LOOP; + END; + END FUZZ_WKL; / diff --git a/tests/dbz-twin/rac/validator.py b/tests/dbz-twin/rac/validator.py index d17d85c0..7ffd33be 100644 --- a/tests/dbz-twin/rac/validator.py +++ b/tests/dbz-twin/rac/validator.py @@ -23,6 +23,10 @@ SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db') POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '10')) IDLE_TIMEOUT = int(os.environ.get('IDLE_TIMEOUT', '120')) +SOAK_MODE = os.environ.get('SOAK_MODE', '0') == '1' +PURGE_TTL_HOURS = int(os.environ.get('PURGE_TTL_HOURS', '24')) +VALIDATE_INTERVAL_SEC = int(os.environ.get('VALIDATE_INTERVAL_SEC', '300')) +STALL_TIMEOUT_SEC = int(os.environ.get('STALL_TIMEOUT_SEC', '300')) # LOB tables that use final-state replay for comparison. # With the hybrid setup (OLR for non-LOB + LogMiner for LOB), these tables @@ -114,279 +118,230 @@ def compare_values(lm_cols, olr_cols, table, section='after'): return diffs -def main(): - print(f"Validator starting", flush=True) - print(f" SQLite DB: {SQLITE_DB}", flush=True) - print(f" Poll interval: {POLL_INTERVAL}s", flush=True) - print(f" Idle timeout: {IDLE_TIMEOUT}s", flush=True) - - # Wait for database to exist - while not os.path.exists(SQLITE_DB): - time.sleep(2) - - conn = sqlite3.connect(SQLITE_DB) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA journal_mode=WAL") +def purge_old_events(conn, ttl_hours): + """Delete events older than ttl_hours from both tables. Returns count deleted.""" + cutoff = time.time() - ttl_hours * 3600 + lm_del = conn.execute( + "DELETE FROM lm_events WHERE consumed_at < ?", (cutoff,)).rowcount + olr_del = conn.execute( + "DELETE FROM olr_events WHERE consumed_at < ?", (cutoff,)).rowcount + if lm_del or olr_del: + conn.commit() + return lm_del + olr_del + + +def validate_cycle(conn, cursor_by_node, safe_frontier, widen=False): + """Run one validation cycle. Returns (validated, matched, mismatches, + missing_olr, missing_lm, tail_olr, tail_lm, lm_count, olr_count).""" + validated = matched = mismatches = 0 + missing_olr = missing_lm = tail_olr = tail_lm = 0 + + lm_count = conn.execute("SELECT COUNT(*) FROM lm_events").fetchone()[0] + olr_count = conn.execute("SELECT COUNT(*) FROM olr_events").fetchone()[0] + + # Find safe frontier per node: min(lm, olr) for each N{x} prefix. + node_frontiers = {} + for node_prefix in ('N1', 'N2'): + lm_node_max = conn.execute( + "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + olr_node_max = conn.execute( + "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + if lm_node_max and olr_node_max: + node_frontiers[node_prefix] = min(lm_node_max, olr_node_max) + + if not node_frontiers: + return (0, 0, 0, 0, 0, 0, 0, lm_count, olr_count, node_frontiers) + + if widen: + # Save safe frontier before widening + safe_frontier.update(node_frontiers) + for node_prefix in ('N1', 'N2'): + lm_n = conn.execute( + "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + olr_n = conn.execute( + "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + if lm_n or olr_n: + node_frontiers[node_prefix] = max(lm_n or '', olr_n or '') + + # Check if any node has new events beyond its cursor + any_new = any( + nf > cursor_by_node.get(np, '') + for np, nf in node_frontiers.items() + ) + if not any_new: + return (0, 0, 0, 0, 0, 0, 0, lm_count, olr_count, node_frontiers) + + # Fetch event_ids within each node's safe frontier + lm_ids = set() + olr_ids = set() + for node_prefix, nf in node_frontiers.items(): + node_cursor = cursor_by_node.get(node_prefix, '') + for r in conn.execute( + "SELECT DISTINCT event_id FROM lm_events " + "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", + (node_cursor, nf, f'{node_prefix}_%')).fetchall(): + lm_ids.add(r['event_id']) + for r in conn.execute( + "SELECT DISTINCT event_id FROM olr_events " + "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", + (node_cursor, nf, f'{node_prefix}_%')).fetchall(): + olr_ids.add(r['event_id']) + + all_ids = sorted(lm_ids | olr_ids) + + for eid in all_ids: + in_lm = eid in lm_ids + in_olr = eid in olr_ids + + # Check if this event is beyond the safe frontier (tail lag) + node_prefix = eid[:2] + is_tail = (safe_frontier + and node_prefix in safe_frontier + and eid > safe_frontier[node_prefix]) + + # Determine table from whichever side has the event + if in_lm: + tbl_row = conn.execute( + "SELECT table_name FROM lm_events WHERE event_id = ? LIMIT 1", + (eid,)).fetchone() + else: + tbl_row = conn.execute( + "SELECT table_name FROM olr_events WHERE event_id = ? LIMIT 1", + (eid,)).fetchone() + event_table = tbl_row['table_name'] if tbl_row else '?' + is_lob = event_table.split('.')[-1].upper() in LOB_TABLES + + if in_lm and not in_olr: + missing_olr += 1 + if is_tail: + tail_lm += 1 + else: + mismatches += 1 + print(f"[MISSING_OLR] {eid} ({event_table})", flush=True) + validated += 1 + continue + + if in_olr and not in_lm: + missing_lm += 1 + if is_tail: + tail_olr += 1 + else: + mismatches += 1 + print(f"[EXTRA_OLR] {eid} ({event_table})", flush=True) + validated += 1 + continue + + # Both sides have the event — compare. + lm_recs = conn.execute( + "SELECT * FROM lm_events WHERE event_id = ? ORDER BY seq", + (eid,) + ).fetchall() + olr_recs = conn.execute( + "SELECT * FROM olr_events WHERE event_id = ? ORDER BY seq", + (eid,) + ).fetchall() + + if is_lob: + lm_state, lm_exists = replay_final_state(lm_recs) + olr_state, olr_exists = replay_final_state(olr_recs) + + if lm_exists != olr_exists: + mismatches += 1 + print(f"[LOB_EXISTENCE] {eid} ({event_table}): " + f"LM exists={lm_exists} OLR exists={olr_exists}", + flush=True) + validated += 1 + else: + diffs = compare_values(lm_state, olr_state, + event_table, 'after') + if diffs: + mismatches += 1 + print(f"[LOB_VALUE_DIFF] {eid} ({event_table}):", + flush=True) + for d in diffs[:5]: + print(d, flush=True) + else: + matched += 1 + validated += 1 + continue + + # Non-LOB tables: compare per (event_id, seq) directly. + lm_by_seq = {r['seq']: r for r in lm_recs} + olr_by_seq = {r['seq']: r for r in olr_recs} + all_seqs = sorted(set(lm_by_seq.keys()) | set(olr_by_seq.keys())) + + for seq in all_seqs: + lm_r = lm_by_seq.get(seq) + olr_r = olr_by_seq.get(seq) + + if lm_r and not olr_r: + missing_olr += 1 + mismatches += 1 + print(f"[MISSING_OLR] {eid} seq={seq} " + f"({lm_r['op']} {lm_r['table_name']})", + flush=True) + validated += 1 + continue - cursor_by_node = {'N1': '', 'N2': ''} # Per-node watermark - safe_frontier = {} # Last frontier before idle-timeout widening - total_validated = 0 - total_matched = 0 - total_mismatches = 0 - total_missing_lm = 0 - total_missing_olr = 0 - total_tail_olr = 0 # OLR ahead of LM at drain time (not a bug) - total_tail_lm = 0 # LM ahead of OLR at drain time (not a bug) - last_new_events = time.time() - prev_lm_count = 0 - prev_olr_count = 0 + if olr_r and not lm_r: + missing_lm += 1 + mismatches += 1 + print(f"[EXTRA_OLR] {eid} seq={seq} " + f"({olr_r['op']} {olr_r['table_name']})", + flush=True) + validated += 1 + continue - try: - while True: - time.sleep(POLL_INTERVAL) - - # Get current counts - lm_count = conn.execute("SELECT COUNT(*) FROM lm_events").fetchone()[0] - olr_count = conn.execute("SELECT COUNT(*) FROM olr_events").fetchone()[0] - - # Check for new events (idle detection) - if lm_count != prev_lm_count or olr_count != prev_olr_count: - last_new_events = time.time() - prev_lm_count = lm_count - prev_olr_count = olr_count - - # Find safe frontier per node: min(lm, olr) for each N{x} prefix. - # Event_ids from two RAC nodes interleave non-monotonically in - # commit order, so a global frontier would validate events before - # the other side has delivered them. - node_frontiers = {} - for node_prefix in ('N1', 'N2'): - lm_node_max = conn.execute( - "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", - (f'{node_prefix}_%',)).fetchone()[0] - olr_node_max = conn.execute( - "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", - (f'{node_prefix}_%',)).fetchone()[0] - if lm_node_max and olr_node_max: - node_frontiers[node_prefix] = min(lm_node_max, olr_node_max) - - if not node_frontiers: + # Both have this seq — compare + if lm_r['table_name'] != olr_r['table_name'] or \ + lm_r['op'] != olr_r['op']: + mismatches += 1 + print(f"[MISMATCH] {eid} seq={seq}: " + f"LM={lm_r['op']} {lm_r['table_name']}, " + f"OLR={olr_r['op']} {olr_r['table_name']}", + flush=True) + validated += 1 continue - # Check if any node has new events beyond its cursor - any_new = any( - nf > cursor_by_node.get(np, '') - for np, nf in node_frontiers.items() - ) - if not any_new: - if time.time() - last_new_events > IDLE_TIMEOUT: - print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). " - f"Final validation pass...", flush=True) - # Save safe frontier before widening — events beyond this - # are tail lag (OLR or LM ahead), not real mismatches. - safe_frontier = dict(node_frontiers) - # Widen frontier to max of both sides per node to catch - # truly missing events (one side never delivered them). - for node_prefix in ('N1', 'N2'): - lm_n = conn.execute( - "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", - (f'{node_prefix}_%',)).fetchone()[0] - olr_n = conn.execute( - "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", - (f'{node_prefix}_%',)).fetchone()[0] - if lm_n or olr_n: - node_frontiers[node_prefix] = max(lm_n or '', olr_n or '') - # Re-check if there's anything new with widened frontier - any_new = any( - nf > cursor_by_node.get(np, '') - for np, nf in node_frontiers.items() - ) - if not any_new: - break - # Fall through to validate the widened range - else: - continue - - # Fetch event_ids within each node's safe frontier - lm_ids = set() - olr_ids = set() - for node_prefix, nf in node_frontiers.items(): - node_cursor = cursor_by_node.get(node_prefix, '') - for r in conn.execute( - "SELECT DISTINCT event_id FROM lm_events " - "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", - (node_cursor, nf, f'{node_prefix}_%')).fetchall(): - lm_ids.add(r['event_id']) - for r in conn.execute( - "SELECT DISTINCT event_id FROM olr_events " - "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", - (node_cursor, nf, f'{node_prefix}_%')).fetchall(): - olr_ids.add(r['event_id']) - - all_ids = sorted(lm_ids | olr_ids) - - for eid in all_ids: - in_lm = eid in lm_ids - in_olr = eid in olr_ids - - # Check if this event is beyond the safe frontier (tail lag) - node_prefix = eid[:2] - is_tail = (safe_frontier - and node_prefix in safe_frontier - and eid > safe_frontier[node_prefix]) - - # Determine table from whichever side has the event - if in_lm: - tbl_row = conn.execute( - "SELECT table_name FROM lm_events WHERE event_id = ? LIMIT 1", - (eid,)).fetchone() - else: - tbl_row = conn.execute( - "SELECT table_name FROM olr_events WHERE event_id = ? LIMIT 1", - (eid,)).fetchone() - event_table = tbl_row['table_name'] if tbl_row else '?' - is_lob = event_table.split('.')[-1].upper() in LOB_TABLES - - if in_lm and not in_olr: - total_missing_olr += 1 - if is_tail: - total_tail_lm += 1 - else: - total_mismatches += 1 - print(f"[MISSING_OLR] {eid} ({event_table})", flush=True) - total_validated += 1 - continue - - if in_olr and not in_lm: - total_missing_lm += 1 - if is_tail: - total_tail_olr += 1 - else: - total_mismatches += 1 - print(f"[EXTRA_OLR] {eid} ({event_table})", flush=True) - total_validated += 1 - continue - - # Both sides have the event — compare. - lm_recs = conn.execute( - "SELECT * FROM lm_events WHERE event_id = ? ORDER BY seq", - (eid,) - ).fetchall() - olr_recs = conn.execute( - "SELECT * FROM olr_events WHERE event_id = ? ORDER BY seq", - (eid,) - ).fetchall() - - if is_lob: - # LOB tables: replay ops into final state, compare end result. - # Both sides use LogMiner (expected=full LM, actual=LOB-only LM), - # so they should produce identical final state. - lm_state, lm_exists = replay_final_state(lm_recs) - olr_state, olr_exists = replay_final_state(olr_recs) - - if lm_exists != olr_exists: - total_mismatches += 1 - print(f"[LOB_EXISTENCE] {eid} ({event_table}): " - f"LM exists={lm_exists} OLR exists={olr_exists}", - flush=True) - total_validated += 1 - else: - diffs = compare_values(lm_state, olr_state, - event_table, 'after') - if diffs: - total_mismatches += 1 - print(f"[LOB_VALUE_DIFF] {eid} ({event_table}):", - flush=True) - for d in diffs[:5]: - print(d, flush=True) - else: - total_matched += 1 - total_validated += 1 - continue - - # Non-LOB tables: compare per (event_id, seq) directly. - # Seq numbers are absolute for non-LOB (no merge/phantom issues). - lm_by_seq = {r['seq']: r for r in lm_recs} - olr_by_seq = {r['seq']: r for r in olr_recs} - all_seqs = sorted(set(lm_by_seq.keys()) | set(olr_by_seq.keys())) - - for seq in all_seqs: - lm_r = lm_by_seq.get(seq) - olr_r = olr_by_seq.get(seq) - - if lm_r and not olr_r: - total_missing_olr += 1 - total_mismatches += 1 - print(f"[MISSING_OLR] {eid} seq={seq} " - f"({lm_r['op']} {lm_r['table_name']})", - flush=True) - total_validated += 1 - continue - - if olr_r and not lm_r: - total_missing_lm += 1 - total_mismatches += 1 - print(f"[EXTRA_OLR] {eid} seq={seq} " - f"({olr_r['op']} {olr_r['table_name']})", - flush=True) - total_validated += 1 - continue - - # Both have this seq — compare - if lm_r['table_name'] != olr_r['table_name'] or \ - lm_r['op'] != olr_r['op']: - total_mismatches += 1 - print(f"[MISMATCH] {eid} seq={seq}: " - f"LM={lm_r['op']} {lm_r['table_name']}, " - f"OLR={olr_r['op']} {olr_r['table_name']}", - flush=True) - total_validated += 1 - continue - - lm_evt = json.loads(lm_r['raw_json']) - olr_evt = json.loads(olr_r['raw_json']) - lm_after = normalize_columns(lm_evt.get('after')) - olr_after = normalize_columns(olr_evt.get('after')) - lm_before = normalize_columns(lm_evt.get('before')) - olr_before = normalize_columns(olr_evt.get('before')) - - diffs = compare_values(lm_after, olr_after, - lm_r['table_name'], 'after') - diffs.extend(compare_values(lm_before, olr_before, - lm_r['table_name'], 'before')) - if diffs: - total_mismatches += 1 - print(f"[VALUE_DIFF] {eid} seq={seq} " - f"({lm_r['op']} {lm_r['table_name']}):", - flush=True) - for d in diffs[:5]: - print(d, flush=True) - else: - total_matched += 1 - - total_validated += 1 - - # Advance per-node cursors - for node_prefix, nf in node_frontiers.items(): - cursor_by_node[node_prefix] = nf - - # Progress report - frontier_str = ','.join(f'{k}={v}' for k, v in sorted(cursor_by_node.items())) - tail_str = (f" tail_olr={total_tail_olr} tail_lm={total_tail_lm}" - if total_tail_olr or total_tail_lm else "") - print(f"[validator] validated={total_validated} matched={total_matched} " - f"mismatches={total_mismatches} " - f"missing_olr={total_missing_olr} extra_olr={total_missing_lm}" - f"{tail_str} " - f"lm_total={lm_count} olr_total={olr_count} " - f"frontier={frontier_str}", flush=True) - - except KeyboardInterrupt: - pass - finally: - conn.close() - - # Final summary + lm_evt = json.loads(lm_r['raw_json']) + olr_evt = json.loads(olr_r['raw_json']) + lm_after = normalize_columns(lm_evt.get('after')) + olr_after = normalize_columns(olr_evt.get('after')) + lm_before = normalize_columns(lm_evt.get('before')) + olr_before = normalize_columns(olr_evt.get('before')) + + diffs = compare_values(lm_after, olr_after, + lm_r['table_name'], 'after') + diffs.extend(compare_values(lm_before, olr_before, + lm_r['table_name'], 'before')) + if diffs: + mismatches += 1 + print(f"[VALUE_DIFF] {eid} seq={seq} " + f"({lm_r['op']} {lm_r['table_name']}):", + flush=True) + for d in diffs[:5]: + print(d, flush=True) + else: + matched += 1 + + validated += 1 + + # Advance per-node cursors + for node_prefix, nf in node_frontiers.items(): + cursor_by_node[node_prefix] = nf + + return (validated, matched, mismatches, missing_olr, missing_lm, + tail_olr, tail_lm, lm_count, olr_count, node_frontiers) + + +def print_summary(total_validated, total_matched, total_mismatches, + total_missing_olr, total_missing_lm, + total_tail_olr, total_tail_lm): + """Print final validation summary and return exit code.""" print(f"\n{'='*60}", flush=True) print(f" Fuzz Test Validation Summary", flush=True) print(f"{'='*60}", flush=True) @@ -402,12 +357,185 @@ def main(): if total_mismatches > 0: print(f"\n RESULT: FAIL ({total_mismatches} unexpected mismatches)", flush=True) - sys.exit(1) + return 1 else: print("\n RESULT: PASS", flush=True) if total_tail_olr + total_tail_lm > 0: print(f" ({total_tail_olr + total_tail_lm} tail events)", flush=True) - sys.exit(0) + return 0 + + +def main(): + print(f"Validator starting (soak_mode={SOAK_MODE})", flush=True) + print(f" SQLite DB: {SQLITE_DB}", flush=True) + print(f" Poll interval: {POLL_INTERVAL}s", flush=True) + if SOAK_MODE: + print(f" Validate interval: {VALIDATE_INTERVAL_SEC}s", flush=True) + print(f" Purge TTL: {PURGE_TTL_HOURS}h", flush=True) + print(f" Stall timeout: {STALL_TIMEOUT_SEC}s", flush=True) + else: + print(f" Idle timeout: {IDLE_TIMEOUT}s", flush=True) + + # Wait for database to exist + while not os.path.exists(SQLITE_DB): + time.sleep(2) + + conn = sqlite3.connect(SQLITE_DB, timeout=30) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA busy_timeout=30000") + + cursor_by_node = {'N1': '', 'N2': ''} + safe_frontier = {} + total_validated = 0 + total_matched = 0 + total_mismatches = 0 + total_missing_lm = 0 + total_missing_olr = 0 + total_tail_olr = 0 + total_tail_lm = 0 + last_new_events = time.time() + prev_lm_count = 0 + prev_olr_count = 0 + + if SOAK_MODE: + # Soak mode: continuous validate-purge cycles + cycle = 0 + try: + while True: + time.sleep(VALIDATE_INTERVAL_SEC) + cycle += 1 + cycle_start = time.monotonic() + + # Check for new events (stall detection) + lm_count = conn.execute( + "SELECT COUNT(*) FROM lm_events").fetchone()[0] + olr_count = conn.execute( + "SELECT COUNT(*) FROM olr_events").fetchone()[0] + + if lm_count != prev_lm_count or olr_count != prev_olr_count: + last_new_events = time.time() + prev_lm_count = lm_count + prev_olr_count = olr_count + elif time.time() - last_new_events > STALL_TIMEOUT_SEC: + # Stall detection + frontier_str = ','.join( + f'{k}={v}' for k, v in sorted(cursor_by_node.items())) + print(f"[STALL] No new events for {STALL_TIMEOUT_SEC}s. " + f"LM={lm_count} OLR={olr_count} " + f"frontier={frontier_str}", flush=True) + conn.close() + sys.exit(2) + + result = validate_cycle(conn, cursor_by_node, safe_frontier) + (v, m, mm, mo, ml, to_, tl, lmc, oc, _) = result + total_validated += v + total_matched += m + total_mismatches += mm + total_missing_olr += mo + total_missing_lm += ml + total_tail_olr += to_ + total_tail_lm += tl + + # Purge old events + purged = purge_old_events(conn, PURGE_TTL_HOURS) + + elapsed = time.monotonic() - cycle_start + frontier_str = ','.join( + f'{k}={v}' for k, v in sorted(cursor_by_node.items())) + purge_str = f" purged={purged}" if purged else "" + print(f"[SOAK] cycle={cycle} validated={v} " + f"mismatches={mm} total_validated={total_validated} " + f"total_mismatches={total_mismatches}{purge_str} " + f"lm={lmc} olr={oc} " + f"frontier={frontier_str} " + f"elapsed={elapsed:.1f}s", flush=True) + + if mm > 0: + print(f"[SOAK] FAIL: {mm} mismatches in cycle {cycle}", + flush=True) + print_summary(total_validated, total_matched, + total_mismatches, total_missing_olr, + total_missing_lm, total_tail_olr, + total_tail_lm) + conn.close() + sys.exit(1) + + except KeyboardInterrupt: + pass + finally: + conn.close() + + rc = print_summary(total_validated, total_matched, total_mismatches, + total_missing_olr, total_missing_lm, + total_tail_olr, total_tail_lm) + sys.exit(rc) + + else: + # Original one-shot mode: poll until idle, validate, exit + try: + while True: + time.sleep(POLL_INTERVAL) + + lm_count = conn.execute( + "SELECT COUNT(*) FROM lm_events").fetchone()[0] + olr_count = conn.execute( + "SELECT COUNT(*) FROM olr_events").fetchone()[0] + + if lm_count != prev_lm_count or olr_count != prev_olr_count: + last_new_events = time.time() + prev_lm_count = lm_count + prev_olr_count = olr_count + + # Try a validation cycle (safe frontier only) + result = validate_cycle(conn, cursor_by_node, safe_frontier) + (v, m, mm, mo, ml, to_, tl, lmc, oc, nf) = result + total_validated += v + total_matched += m + total_mismatches += mm + total_missing_olr += mo + total_missing_lm += ml + total_tail_olr += to_ + total_tail_lm += tl + + if v > 0: + frontier_str = ','.join( + f'{k}={v_}' for k, v_ in sorted(cursor_by_node.items())) + tail_str = (f" tail_olr={total_tail_olr} tail_lm={total_tail_lm}" + if total_tail_olr or total_tail_lm else "") + print(f"[validator] validated={total_validated} " + f"matched={total_matched} " + f"mismatches={total_mismatches} " + f"missing_olr={total_missing_olr} " + f"extra_olr={total_missing_lm}" + f"{tail_str} " + f"lm_total={lmc} olr_total={oc} " + f"frontier={frontier_str}", flush=True) + elif time.time() - last_new_events > IDLE_TIMEOUT: + # Idle timeout — do final widened pass + print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). " + f"Final validation pass...", flush=True) + result = validate_cycle( + conn, cursor_by_node, safe_frontier, widen=True) + (v, m, mm, mo, ml, to_, tl, lmc, oc, _) = result + total_validated += v + total_matched += m + total_mismatches += mm + total_missing_olr += mo + total_missing_lm += ml + total_tail_olr += to_ + total_tail_lm += tl + break + + except KeyboardInterrupt: + pass + finally: + conn.close() + + rc = print_summary(total_validated, total_matched, total_mismatches, + total_missing_olr, total_missing_lm, + total_tail_olr, total_tail_lm) + sys.exit(rc) if __name__ == '__main__': From 9afec13c4edd930d157a863f2f23622cb9cd3a9e Mon Sep 17 00:00:00 2001 From: Rophy Tsai Date: Sun, 12 Apr 2026 06:17:45 +0000 Subject: [PATCH 2/6] refactor: drop streaming soak mode, move archive cleanup to container Phase B (continuous soak) is being redesigned as periodic fuzz runs; remove the streaming implementation: - Drop run_forever() in FUZZ_WKL package - Drop SOAK_MODE path in validator.py (stall detection, continuous loop) - Drop soak subcommand and action_soak from fuzz-test.sh - Remove SOAK-TEST.md design doc Replace host-side archive cleanup loop with a docker-compose service: - New archive-cleanup/ directory (Dockerfile + crontab) - Alpine + openssh-client + supercronic v0.2.44 (SHA1 pinned) - Hourly find -mtime +1 -print -delete, deletions logged to stdout - Lifecycle tied to docker-compose up/down, visible via docker logs Phase A cleanup mechanisms retained: - created_at columns + FUZZ_WKL.cleanup() + DBMS_SCHEDULER job - Consumer seq dict pruning (10min interval, 24h TTL) - validator.py purge_old_events() called after each one-shot cycle - SQLite busy_timeout=30000 on both consumer and validator --- tests/dbz-twin/rac/SOAK-TEST.md | 222 ------------------ tests/dbz-twin/rac/archive-cleanup/Dockerfile | 14 ++ tests/dbz-twin/rac/archive-cleanup/crontab | 5 + tests/dbz-twin/rac/docker-compose-fuzz.yaml | 16 +- tests/dbz-twin/rac/fuzz-test.sh | 176 -------------- tests/dbz-twin/rac/perf/fuzz-workload.sql | 125 ---------- tests/dbz-twin/rac/validator.py | 196 +++++----------- 7 files changed, 90 insertions(+), 664 deletions(-) delete mode 100644 tests/dbz-twin/rac/SOAK-TEST.md create mode 100644 tests/dbz-twin/rac/archive-cleanup/Dockerfile create mode 100644 tests/dbz-twin/rac/archive-cleanup/crontab diff --git a/tests/dbz-twin/rac/SOAK-TEST.md b/tests/dbz-twin/rac/SOAK-TEST.md deleted file mode 100644 index 4bb1975d..00000000 --- a/tests/dbz-twin/rac/SOAK-TEST.md +++ /dev/null @@ -1,222 +0,0 @@ -# Soak Test Implementation Guide - -Upgrade the existing fuzz test into a continuous soak test that runs indefinitely -with 24h TTL-based storage cleanup. - -## Current State - -The fuzz test runs a finite workload, waits for drain, validates once, then tears down. -Key components already exist: - -- `fuzz-test.sh` — orchestration (up/down/run/validate/db-check) -- `kafka-consumer.py` — Kafka → SQLite consumer (infinite loop) -- `validator.py` — per-event comparison with per-node watermarks -- `fuzz-workload.sql` — PL/SQL package generating random DML with event_id tracking -- `docker-compose-fuzz.yaml` — Kafka, Debezium connectors, consumer, validator - -## Goal - -Run the full pipeline indefinitely: -1. Load generator produces DML at constant rate, non-stop -2. Consumer writes to SQLite non-stop -3. Validator periodically checks data up to a watermark, reports pass/fail, purges old data -4. All storage cleaned up on a 24h TTL -5. Runs forever unless stopped or a failure is detected - -## Storage TTL: 24h Everywhere - -All transient data uses the same 24h retention window: - -| Storage | Current | Change | -|---------|---------|--------| -| Kafka topics | 24h retention | No change needed | -| SQLite events | Grows forever | Purge events older than 24h after validation | -| Archive logs | No cleanup | Cron: `find /shared/redo/archivelog -mtime +1 -delete` | -| Oracle tables | Grows forever | DELETE rows with `created_at < SYSDATE - 1` | - -If any component falls behind by more than ~1 hour, treat it as a test failure -and stop the whole setup for investigation. The 24h buffer is deliberately generous. - -## Changes Required - -### 1. Workload Generator — Infinite Loop - -**File**: `perf/fuzz-workload.sql` (FUZZ_WKL package) - -Current: `run(p_duration_min NUMBER)` exits after N minutes. - -Change: Add `run_forever(p_rate_per_sec NUMBER)` procedure that: -- Runs DML in an infinite loop with pacing (e.g., 10 ops/sec) -- Calls `DBMS_SESSION.SLEEP()` between batches to maintain target rate -- Handles connection drops gracefully (reconnect and continue) -- Periodically logs throughput stats to `FUZZ_STATS` table - -**Invocation**: `fuzz-test.sh soak` starts the workload via SSH on both RAC nodes -as background processes. No fixed duration. - -### 2. Consumer — Add Timestamp Column - -**File**: `kafka-consumer.py` - -Current SQLite schema: -```sql -CREATE TABLE lm_events ( - event_id TEXT, seq INTEGER, table_name TEXT, op TEXT, - raw_json TEXT, consumed_at REAL, - PRIMARY KEY (event_id, seq) -); -``` - -`consumed_at` already exists (epoch float). No schema change needed — use it for TTL purge. - -**Memory concern**: The `lm_seq` / `olr_seq` dicts track event_id → next seq number -and grow unbounded. Add periodic cleanup: remove entries where the event_id's -`consumed_at` is older than 24h. The comment warns against trimming due to seq reset -bugs — the fix is to only trim entries outside the 24h window (no active events -will reference them). - -### 3. Validator — Periodic + Purge Mode - -**File**: `validator.py` - -Current: Polls until idle (120s no new events), then validates once and exits. - -Change to continuous mode: -- Run in an infinite loop with configurable interval (e.g., every 5 minutes) -- Each cycle: - 1. Compute per-node watermarks (same logic as today) - 2. Validate all events up to watermark - 3. Report results: `[SOAK] cycle=42 validated=12345 mismatches=0 elapsed=3.2s` - 4. **Purge**: DELETE from lm_events/olr_events WHERE consumed_at < (now - 24h) - 5. If any non-LOB mismatches found, write failure report and exit non-zero -- Never exit on idle — in soak mode, idle just means the workload is between batches - -**New env vars**: -- `SOAK_MODE=1` — enables continuous validation + purge (default: off, preserving current behavior) -- `PURGE_TTL_HOURS=24` — how old events must be before purge -- `VALIDATE_INTERVAL_SEC=300` — seconds between validation cycles - -### 4. Archive Log Cleanup (part of `fuzz-test.sh up`) - -**Where**: Started as a background job during `fuzz-test.sh up`, not soak-specific. -Useful for any fuzz run — even finite runs leave behind archive logs. - -Add to `fuzz-test.sh up` after all services are started: -```bash -# Background archive cleanup every hour (killed on `down`) -while true; do - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "find /shared/redo/archivelog -mtime +1 -delete" - sleep 3600 -done & -echo $! > "$WORK_DIR/archive_cleanup.pid" -``` - -Kill in `fuzz-test.sh down`: -```bash -[ -f "$WORK_DIR/archive_cleanup.pid" ] && kill "$(cat "$WORK_DIR/archive_cleanup.pid")" 2>/dev/null -``` - -### 5. Oracle Table Cleanup (part of `fuzz-test.sh up`) - -**File**: `perf/fuzz-workload.sql` - -Add a `cleanup()` procedure to FUZZ_WKL package: -```sql -PROCEDURE cleanup IS -BEGIN - FOR t IN (SELECT table_name FROM user_tables - WHERE table_name LIKE 'FUZZ_%' - AND table_name != 'FUZZ_STATS') LOOP - EXECUTE IMMEDIATE 'DELETE FROM ' || t.table_name || - ' WHERE created_at < SYSDATE - 1'; - COMMIT; - END LOOP; -END; -``` - -**Prerequisite**: Add `created_at DATE DEFAULT SYSDATE` column to all FUZZ_* tables -in the table creation DDL (already in `fuzz-test.sh up`). - -Schedule via Oracle DBMS_SCHEDULER job created during `fuzz-test.sh up`: -```sql -BEGIN - DBMS_SCHEDULER.CREATE_JOB( - job_name => 'FUZZ_CLEANUP', - job_type => 'PLSQL_BLOCK', - job_action => 'BEGIN FUZZ_WKL.cleanup; END;', - repeat_interval => 'FREQ=MINUTELY;INTERVAL=30', - enabled => TRUE - ); -END; -``` - -Drop in `fuzz-test.sh down` table cleanup. - -### 6. New Subcommand: `fuzz-test.sh soak` - -Orchestration for the continuous run: - -``` -fuzz-test.sh soak [rate-per-sec] -``` - -Steps: -1. Verify all services running (reuse Stage 1 from `run`) -2. Start archive log cleanup loop (background) -3. Start workload on both RAC nodes (background, infinite) -4. Print status line every 60s: - `[SOAK] uptime=2h13m LM=45230 OLR=45228 validated=44000 mismatches=0` -5. Monitor for failures: - - Validator exits non-zero → stop everything, print report - - Consumer dies → stop everything - - OLR container exits → stop everything -6. On SIGINT/SIGTERM: graceful shutdown (stop workload, drain, final validate) - -### 7. Health Monitoring - -Add a stall detector — if no new events arrive for 5 minutes, something is broken: - -In the validator's continuous loop, track `last_event_time`. If -`now - last_event_time > 300s` while the workload is running, report: -``` -[STALL] No new events for 5m. Last LM event: N1_00045230 at 10:15:02. Last OLR event: N1_00045228 at 10:15:01. -``` -Then exit non-zero to trigger the soak test shutdown. - -## Implementation Order - -**Phase A — Cleanup (part of fuzz environment, not soak-specific):** -1. **Add `created_at` column** to FUZZ_* table DDL + Oracle cleanup job (DBMS_SCHEDULER) -2. **Archive log cleanup** — background loop started by `fuzz-test.sh up` -3. **Consumer seq dict cleanup** — prune entries older than 24h -4. **Validator purge** — DELETE validated events older than 24h (enabled by default) - -Phase A can be tested with the existing finite workload — just run longer and -verify storage stays bounded. - -**Phase B — Soak mode (continuous operation):** -5. **Validator soak mode** — continuous validate-purge loop instead of exit-on-idle -6. **Workload `run_forever`** — infinite loop with rate limiting -7. **`fuzz-test.sh soak`** subcommand — ties everything together -8. **Health monitoring** — stall detection - -## Testing the Upgrade - -1. Run `fuzz-test.sh soak 5` (5 ops/sec) for 1 hour -2. Verify validator reports pass every 5 minutes -3. Verify SQLite size stays bounded (check after 30min) -4. Verify archive log directory doesn't grow past ~2 GB -5. Verify Oracle tablespace stays bounded -6. Kill OLR mid-run → confirm stall detection fires within 5 minutes -7. Run for 24+ hours → confirm TTL purge works across the full window - -## Files to Modify - -| File | Change | -|------|--------| -| `fuzz-test.sh` | Add `soak` subcommand | -| `validator.py` | Add SOAK_MODE, continuous loop, purge | -| `kafka-consumer.py` | Add seq dict cleanup for old entries | -| `perf/fuzz-workload.sql` | Add `run_forever()`, `cleanup()`, `created_at` columns | -| `docker-compose-fuzz.yaml` | Add SOAK_MODE env var to validator service | diff --git a/tests/dbz-twin/rac/archive-cleanup/Dockerfile b/tests/dbz-twin/rac/archive-cleanup/Dockerfile new file mode 100644 index 00000000..5b519b2e --- /dev/null +++ b/tests/dbz-twin/rac/archive-cleanup/Dockerfile @@ -0,0 +1,14 @@ +FROM alpine:3.20 + +ARG SUPERCRONIC_VERSION=v0.2.44 +ARG SUPERCRONIC_SHA=6eb0a8e1e6673675dc67668c1a9b6409f79c37bc +ARG SUPERCRONIC_URL=https://github.com/aptible/supercronic/releases/download/${SUPERCRONIC_VERSION}/supercronic-linux-amd64 + +RUN apk add --no-cache openssh-client curl \ + && curl -fsSL -o /usr/local/bin/supercronic "${SUPERCRONIC_URL}" \ + && echo "${SUPERCRONIC_SHA} /usr/local/bin/supercronic" | sha1sum -c - \ + && chmod +x /usr/local/bin/supercronic \ + && apk del curl + +ENTRYPOINT ["/usr/local/bin/supercronic", "-passthrough-logs"] +CMD ["/etc/crontab"] diff --git a/tests/dbz-twin/rac/archive-cleanup/crontab b/tests/dbz-twin/rac/archive-cleanup/crontab new file mode 100644 index 00000000..b39fba30 --- /dev/null +++ b/tests/dbz-twin/rac/archive-cleanup/crontab @@ -0,0 +1,5 @@ +# Archive log cleanup — delete archive logs older than 1 day on the RAC VM. +# Runs hourly. Each deleted file is printed to stdout (-> docker logs). +# +# Env vars (passed through by supercronic): VM_HOST, VM_USER +0 * * * * ssh -i /vm-key -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $VM_USER@$VM_HOST "find /shared/redo/archivelog -mtime +1 -print -delete" | awk 'NF{print "deleted: "$0}' diff --git a/tests/dbz-twin/rac/docker-compose-fuzz.yaml b/tests/dbz-twin/rac/docker-compose-fuzz.yaml index 4ce381d4..43026b9b 100644 --- a/tests/dbz-twin/rac/docker-compose-fuzz.yaml +++ b/tests/dbz-twin/rac/docker-compose-fuzz.yaml @@ -101,14 +101,22 @@ services: SQLITE_DB: /app/data/fuzz.db POLL_INTERVAL: "10" IDLE_TIMEOUT: "120" - SOAK_MODE: "${SOAK_MODE:-0}" - PURGE_TTL_HOURS: "${PURGE_TTL_HOURS:-24}" - VALIDATE_INTERVAL_SEC: "${VALIDATE_INTERVAL_SEC:-300}" - STALL_TIMEOUT_SEC: "${STALL_TIMEOUT_SEC:-300}" volumes: - ./validator.py:/app/validator.py:ro - fuzz-data:/app/data + archive-cleanup: + build: + context: ./archive-cleanup + image: fuzz-archive-cleanup:latest + container_name: fuzz-archive-cleanup + environment: + VM_HOST: ${VM_HOST:?VM_HOST is required} + VM_USER: ${VM_USER:-root} + volumes: + - ${VM_KEY:?VM_KEY is required}:/vm-key:ro + - ./archive-cleanup/crontab:/etc/crontab:ro + volumes: dbz-logminer-data: dbz-olr-data: diff --git a/tests/dbz-twin/rac/fuzz-test.sh b/tests/dbz-twin/rac/fuzz-test.sh index 240c8c62..2ff3ae06 100755 --- a/tests/dbz-twin/rac/fuzz-test.sh +++ b/tests/dbz-twin/rac/fuzz-test.sh @@ -10,7 +10,6 @@ # Actions: # up Start infrastructure (Kafka, Debezium, consumer, validator, OLR) # run [duration-min] Deploy fuzz workload and run for N minutes (default: 30) -# soak [rate-per-sec] Start continuous soak test (default rate: 10 ops/sec) # status Show consumer/validator status and OLR memory # validate Run validator (wait for idle timeout, report results) # logs [component] Show logs (kafka, logminer, olr, lob-logminer, consumer, validator, olr-vm) @@ -228,17 +227,7 @@ action_up() { sleep 2 done - # Start background archive log cleanup (hourly, killed on `down`) mkdir -p "$WORK_DIR" - ( - while true; do - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "find /shared/redo/archivelog -mtime +1 -delete" 2>/dev/null || true - sleep 3600 - done - ) & - echo $! > "$WORK_DIR/archive_cleanup.pid" - echo " Archive cleanup: started (PID $!, hourly)" # Create Oracle cleanup scheduler job (purges FUZZ_* rows older than 24h) cat > "$WORK_DIR/create_cleanup_job.sql" <<'SQL' @@ -484,19 +473,6 @@ action_logs() { action_down() { echo "=== Stopping fuzz test infrastructure ===" - # Kill background archive cleanup - if [[ -f "$WORK_DIR/archive_cleanup.pid" ]]; then - kill "$(cat "$WORK_DIR/archive_cleanup.pid")" 2>/dev/null || true - rm -f "$WORK_DIR/archive_cleanup.pid" - echo " Archive cleanup: stopped" - fi - - # Kill soak workload processes - for pidfile in "$WORK_DIR"/soak_node*.pid; do - [[ -f "$pidfile" ]] && kill "$(cat "$pidfile")" 2>/dev/null || true - done - rm -f "$WORK_DIR"/soak_node*.pid - # Drop Oracle cleanup scheduler job local drop_sql="$WORK_DIR/drop_cleanup_job.sql" if [[ -d "$WORK_DIR" ]]; then @@ -509,9 +485,6 @@ SQL _exec_user "$drop_sql" > /dev/null 2>&1 || true fi - # Stop soak validator (named container, not part of compose up) - docker rm -f fuzz-validator 2>/dev/null || true - docker compose -f "$COMPOSE_FILE" down -v 2>/dev/null ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ "podman stop -t5 $OLR_CONTAINER 2>/dev/null; podman rm $OLR_CONTAINER 2>/dev/null; true" @@ -540,158 +513,9 @@ action_db_check() { python3 "$SCRIPT_DIR/db-check.py" } -action_soak() { - local rate="${1:-10}" - local skip_lob="${SKIP_LOB:-0}" - - echo "=== Starting soak test (rate=${rate} ops/sec) ===" - - # Verify services are running - for svc in fuzz-kafka fuzz-dbz-logminer fuzz-dbz-olr fuzz-dbz-lob-logminer fuzz-consumer; do - if ! docker ps --format '{{.Names}}' | grep -q "^${svc}$"; then - echo "ERROR: $svc is not running. Run './fuzz-test.sh up' first." >&2 - exit 1 - fi - done - if ! ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman ps --format '{{.Names}}' | grep -q $OLR_CONTAINER" 2>/dev/null; then - echo "ERROR: OLR container not running on VM." >&2 - exit 1 - fi - echo " All services: OK" - - mkdir -p "$WORK_DIR" - - # Start validator in soak mode (continuous validation + purge) - echo " Starting validator in soak mode..." - docker rm -f fuzz-validator 2>/dev/null || true - docker compose -f "$COMPOSE_FILE" run -d \ - --name fuzz-validator \ - -e SOAK_MODE=1 \ - -e PURGE_TTL_HOURS="${PURGE_TTL_HOURS:-24}" \ - -e VALIDATE_INTERVAL_SEC="${VALIDATE_INTERVAL_SEC:-300}" \ - -e STALL_TIMEOUT_SEC="${STALL_TIMEOUT_SEC:-300}" \ - validator > /dev/null 2>&1 - echo " Validator: started (soak mode)" - - # Log switch to flush redo - local log_switch_sql="$WORK_DIR/log_switch.sql" - cat > "$log_switch_sql" <<'SQL' -SET FEEDBACK OFF -ALTER SYSTEM SWITCH ALL LOGFILE; -BEGIN DBMS_SESSION.SLEEP(2); END; -/ -EXIT -SQL - _exec_sysdba "$log_switch_sql" > /dev/null - - # Create runner scripts for infinite workload - cat > "$WORK_DIR/soak_node1.sql" < ${rate}, p_seed => 42, p_node_id => 1, p_skip_lob => ${skip_lob}); -EXIT; -SQL - cat > "$WORK_DIR/soak_node2.sql" < ${rate}, p_seed => 137, p_node_id => 2, p_skip_lob => ${skip_lob}); -EXIT; -SQL - - _vm_copy_in "$WORK_DIR/soak_node1.sql" "/tmp/soak_node1.sql" "$RAC_NODE1" - _vm_copy_in "$WORK_DIR/soak_node2.sql" "/tmp/soak_node2.sql" "$RAC_NODE2" - - # Start workload on both nodes (background) - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman exec $RAC_NODE1 su - oracle -c 'export ORACLE_SID=$ORACLE_SID1; sqlplus -S $DB_CONN1 @/tmp/soak_node1.sql'" \ - > "$WORK_DIR/soak_out1.log" 2>&1 & - local pid1=$! - echo $pid1 > "$WORK_DIR/soak_node1.pid" - - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman exec $RAC_NODE2 su - oracle -c 'export ORACLE_SID=$ORACLE_SID2; sqlplus -S $DB_CONN2 @/tmp/soak_node2.sql'" \ - > "$WORK_DIR/soak_out2.log" 2>&1 & - local pid2=$! - echo $pid2 > "$WORK_DIR/soak_node2.pid" - - echo " Workload: started on both nodes (PIDs: $pid1, $pid2)" - echo "" - echo "=== Soak test running. Monitor with: ./fuzz-test.sh status ===" - echo "=== Stop with: ./fuzz-test.sh down ===" - - # Graceful shutdown handler - _soak_shutdown() { - echo "" - echo "=== Shutting down soak test ===" - kill $pid1 $pid2 2>/dev/null || true - wait $pid1 $pid2 2>/dev/null || true - echo " Workload: stopped" - echo " Run './fuzz-test.sh down' to tear down infrastructure." - } - trap _soak_shutdown INT TERM - - # Monitor loop: print status + detect failures - local start_time=$SECONDS - while true; do - sleep 60 - - # Check workload processes - local n1_ok=true n2_ok=true - kill -0 $pid1 2>/dev/null || n1_ok=false - kill -0 $pid2 2>/dev/null || n2_ok=false - - # Check validator - local val_ok=true - docker ps --format '{{.Names}}' | grep -q "^fuzz-validator$" || val_ok=false - - # Check OLR - local olr_ok=true - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman ps --format '{{.Names}}' | grep -q $OLR_CONTAINER" 2>/dev/null || olr_ok=false - - # Get status - local uptime_sec=$(( SECONDS - start_time )) - local uptime_h=$(( uptime_sec / 3600 )) - local uptime_m=$(( (uptime_sec % 3600) / 60 )) - local mem=$(_olr_memory_mb) - local consumer_line - consumer_line=$(docker logs --tail 1 fuzz-consumer 2>/dev/null | grep -o '\[consumer\].*' || echo "?") - local validator_line - validator_line=$(docker logs --tail 1 fuzz-validator 2>/dev/null | grep -o '\[SOAK\].*' || echo "?") - - printf "[SOAK] uptime=%dh%02dm OLR=%sMB %s %s\n" \ - "$uptime_h" "$uptime_m" "$mem" "$consumer_line" "$validator_line" - - # Detect failures - if ! $val_ok; then - echo "" - echo "=== FAIL: Validator exited ===" - docker logs --tail 20 fuzz-validator 2>/dev/null - _soak_shutdown - exit 1 - fi - if ! $olr_ok; then - echo "" - echo "=== FAIL: OLR container exited ===" - ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" "podman logs --tail 20 $OLR_CONTAINER" 2>/dev/null - _soak_shutdown - exit 1 - fi - if ! $n1_ok && ! $n2_ok; then - echo "" - echo "=== WARN: Both workload processes exited ===" - echo " Node 1 log tail:" - tail -5 "$WORK_DIR/soak_out1.log" 2>/dev/null | sed 's/^/ /' - echo " Node 2 log tail:" - tail -5 "$WORK_DIR/soak_out2.log" 2>/dev/null | sed 's/^/ /' - _soak_shutdown - exit 1 - fi - done -} - case "$ACTION" in up) action_up ;; run) action_run "$@" ;; - soak) action_soak "$@" ;; status) action_status ;; validate) action_validate ;; db-check) action_db_check ;; diff --git a/tests/dbz-twin/rac/perf/fuzz-workload.sql b/tests/dbz-twin/rac/perf/fuzz-workload.sql index ea02237b..9488c317 100644 --- a/tests/dbz-twin/rac/perf/fuzz-workload.sql +++ b/tests/dbz-twin/rac/perf/fuzz-workload.sql @@ -159,12 +159,6 @@ CREATE OR REPLACE PACKAGE olr_test.FUZZ_WKL AS p_node_id IN NUMBER DEFAULT 1, p_skip_lob IN NUMBER DEFAULT 0 -- 1 = skip LOB table operations ); - PROCEDURE run_forever( - p_rate_per_sec IN NUMBER DEFAULT 10, - p_seed IN NUMBER DEFAULT 1, - p_node_id IN NUMBER DEFAULT 1, - p_skip_lob IN NUMBER DEFAULT 0 - ); PROCEDURE cleanup; END FUZZ_WKL; / @@ -867,125 +861,6 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS END LOOP; END; - -- ---- Continuous run for soak testing ---- - - PROCEDURE run_forever( - p_rate_per_sec IN NUMBER DEFAULT 10, - p_seed IN NUMBER DEFAULT 1, - p_node_id IN NUMBER DEFAULT 1, - p_skip_lob IN NUMBER DEFAULT 0 - ) IS - v_start TIMESTAMP := SYSTIMESTAMP; - v_txn_dice PLS_INTEGER; - v_batch PLS_INTEGER; - v_seed_id PLS_INTEGER; - v_seed_region VARCHAR2(20); - v_cycle_start TIMESTAMP; - v_cycle_ops PLS_INTEGER; - v_sleep_sec NUMBER; - v_ops_before PLS_INTEGER; - BEGIN - -- Initialize (same as run) - g_node_id := p_node_id; - g_next_id := p_node_id; - g_event_seq := 0; - g_insert_cnt := 0; g_update_cnt := 0; g_delete_cnt := 0; - g_rollback_cnt := 0; g_lob_cnt := 0; g_total_ops := 0; - g_skip_lob := p_skip_lob; - - DBMS_RANDOM.SEED(p_seed); - - -- Seed initial data - v_seed_id := 0; - g_scalar_id_cnt := 0; g_lob_id_cnt := 0; g_wide_id_cnt := 0; - g_part_id_cnt := 0; g_maxstr_id_cnt := 0; g_interval_id_cnt := 0; - FOR i IN 1..50 LOOP - v_seed_id := next_id; - INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_flag) - VALUES (v_seed_id, 'SEED', DBMS_RANDOM.STRING('x', 20), 0); - track_id(g_scalar_ids, g_scalar_id_cnt, v_seed_id); - END LOOP; - IF g_skip_lob = 0 THEN - FOR i IN 1..5 LOOP - v_seed_id := next_id; - INSERT INTO olr_test.FUZZ_LOB (id, event_id, label, content) - VALUES (v_seed_id, 'SEED', 'seed', 'seed'); - track_id(g_lob_ids, g_lob_id_cnt, v_seed_id); - END LOOP; - END IF; - FOR i IN 1..20 LOOP - v_seed_id := next_id; - v_seed_region := REGIONS(rand_int(1, 5)); - INSERT INTO olr_test.FUZZ_PART (id, event_id, region, val, payload) - VALUES (v_seed_id, 'SEED', v_seed_region, 0, 'seed'); - track_id(g_part_ids, g_part_id_cnt, v_seed_id); - END LOOP; - FOR i IN 1..10 LOOP - INSERT INTO olr_test.FUZZ_NOPK (event_id, name, value, status) - VALUES ('SEED', 'seed', 0, 'ACTIVE'); - END LOOP; - COMMIT; - g_event_seq := 0; - g_insert_cnt := 0; g_total_ops := 0; - - -- Infinite loop with rate limiting - LOOP - v_cycle_start := SYSTIMESTAMP; - v_ops_before := g_total_ops; - - -- Do one transaction cycle - v_txn_dice := rand_int(1, 100); - - IF v_txn_dice <= 55 THEN - do_random_op; - COMMIT; - ELSIF v_txn_dice <= 70 THEN - v_batch := rand_int(2, 5); - FOR j IN 1..v_batch LOOP - do_random_op; - END LOOP; - COMMIT; - ELSIF v_txn_dice <= 80 THEN - do_random_op; - ROLLBACK; - g_rollback_cnt := g_rollback_cnt + 1; - ELSIF v_txn_dice <= 90 THEN - do_random_op; - SAVEPOINT sp_fuzz; - do_random_op; - ROLLBACK TO sp_fuzz; - g_rollback_cnt := g_rollback_cnt + 1; - do_random_op; - COMMIT; - ELSE - v_batch := rand_int(10, 30); - FOR j IN 1..v_batch LOOP - do_random_op; - END LOOP; - COMMIT; - END IF; - - -- Rate limiting: sleep to maintain target ops/sec - v_cycle_ops := g_total_ops - v_ops_before; - IF p_rate_per_sec > 0 AND v_cycle_ops > 0 THEN - v_sleep_sec := v_cycle_ops / p_rate_per_sec; - IF v_sleep_sec > 0.01 THEN - DBMS_SESSION.SLEEP(v_sleep_sec); - END IF; - ELSE - DBMS_SESSION.SLEEP(0.1); - END IF; - - -- Update stats every 100 ops - IF MOD(g_total_ops, 100) = 0 THEN - update_stats; - DBMS_OUTPUT.PUT_LINE('FUZZ_SOAK: node=' || g_node_id || - ' ops=' || g_total_ops || - ' last_event=N' || g_node_id || '_' || LPAD(g_event_seq, 8, '0')); - END IF; - END LOOP; - END; - END FUZZ_WKL; / diff --git a/tests/dbz-twin/rac/validator.py b/tests/dbz-twin/rac/validator.py index 7ffd33be..55ecb0c0 100644 --- a/tests/dbz-twin/rac/validator.py +++ b/tests/dbz-twin/rac/validator.py @@ -23,10 +23,7 @@ SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db') POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '10')) IDLE_TIMEOUT = int(os.environ.get('IDLE_TIMEOUT', '120')) -SOAK_MODE = os.environ.get('SOAK_MODE', '0') == '1' PURGE_TTL_HOURS = int(os.environ.get('PURGE_TTL_HOURS', '24')) -VALIDATE_INTERVAL_SEC = int(os.environ.get('VALIDATE_INTERVAL_SEC', '300')) -STALL_TIMEOUT_SEC = int(os.environ.get('STALL_TIMEOUT_SEC', '300')) # LOB tables that use final-state replay for comparison. # With the hybrid setup (OLR for non-LOB + LogMiner for LOB), these tables @@ -366,15 +363,11 @@ def print_summary(total_validated, total_matched, total_mismatches, def main(): - print(f"Validator starting (soak_mode={SOAK_MODE})", flush=True) + print(f"Validator starting", flush=True) print(f" SQLite DB: {SQLITE_DB}", flush=True) print(f" Poll interval: {POLL_INTERVAL}s", flush=True) - if SOAK_MODE: - print(f" Validate interval: {VALIDATE_INTERVAL_SEC}s", flush=True) - print(f" Purge TTL: {PURGE_TTL_HOURS}h", flush=True) - print(f" Stall timeout: {STALL_TIMEOUT_SEC}s", flush=True) - else: - print(f" Idle timeout: {IDLE_TIMEOUT}s", flush=True) + print(f" Idle timeout: {IDLE_TIMEOUT}s", flush=True) + print(f" Purge TTL: {PURGE_TTL_HOURS}h", flush=True) # Wait for database to exist while not os.path.exists(SQLITE_DB): @@ -398,98 +391,55 @@ def main(): prev_lm_count = 0 prev_olr_count = 0 - if SOAK_MODE: - # Soak mode: continuous validate-purge cycles - cycle = 0 - try: - while True: - time.sleep(VALIDATE_INTERVAL_SEC) - cycle += 1 - cycle_start = time.monotonic() - - # Check for new events (stall detection) - lm_count = conn.execute( - "SELECT COUNT(*) FROM lm_events").fetchone()[0] - olr_count = conn.execute( - "SELECT COUNT(*) FROM olr_events").fetchone()[0] - - if lm_count != prev_lm_count or olr_count != prev_olr_count: - last_new_events = time.time() - prev_lm_count = lm_count - prev_olr_count = olr_count - elif time.time() - last_new_events > STALL_TIMEOUT_SEC: - # Stall detection - frontier_str = ','.join( - f'{k}={v}' for k, v in sorted(cursor_by_node.items())) - print(f"[STALL] No new events for {STALL_TIMEOUT_SEC}s. " - f"LM={lm_count} OLR={olr_count} " - f"frontier={frontier_str}", flush=True) - conn.close() - sys.exit(2) - - result = validate_cycle(conn, cursor_by_node, safe_frontier) - (v, m, mm, mo, ml, to_, tl, lmc, oc, _) = result - total_validated += v - total_matched += m - total_mismatches += mm - total_missing_olr += mo - total_missing_lm += ml - total_tail_olr += to_ - total_tail_lm += tl - - # Purge old events + # Poll until idle, validate, exit + try: + while True: + time.sleep(POLL_INTERVAL) + + lm_count = conn.execute( + "SELECT COUNT(*) FROM lm_events").fetchone()[0] + olr_count = conn.execute( + "SELECT COUNT(*) FROM olr_events").fetchone()[0] + + if lm_count != prev_lm_count or olr_count != prev_olr_count: + last_new_events = time.time() + prev_lm_count = lm_count + prev_olr_count = olr_count + + # Try a validation cycle (safe frontier only) + result = validate_cycle(conn, cursor_by_node, safe_frontier) + (v, m, mm, mo, ml, to_, tl, lmc, oc, nf) = result + total_validated += v + total_matched += m + total_mismatches += mm + total_missing_olr += mo + total_missing_lm += ml + total_tail_olr += to_ + total_tail_lm += tl + + if v > 0: + # Purge old events after each validation cycle purged = purge_old_events(conn, PURGE_TTL_HOURS) - - elapsed = time.monotonic() - cycle_start frontier_str = ','.join( - f'{k}={v}' for k, v in sorted(cursor_by_node.items())) + f'{k}={v_}' for k, v_ in sorted(cursor_by_node.items())) + tail_str = (f" tail_olr={total_tail_olr} tail_lm={total_tail_lm}" + if total_tail_olr or total_tail_lm else "") purge_str = f" purged={purged}" if purged else "" - print(f"[SOAK] cycle={cycle} validated={v} " - f"mismatches={mm} total_validated={total_validated} " - f"total_mismatches={total_mismatches}{purge_str} " - f"lm={lmc} olr={oc} " - f"frontier={frontier_str} " - f"elapsed={elapsed:.1f}s", flush=True) - - if mm > 0: - print(f"[SOAK] FAIL: {mm} mismatches in cycle {cycle}", - flush=True) - print_summary(total_validated, total_matched, - total_mismatches, total_missing_olr, - total_missing_lm, total_tail_olr, - total_tail_lm) - conn.close() - sys.exit(1) - - except KeyboardInterrupt: - pass - finally: - conn.close() - - rc = print_summary(total_validated, total_matched, total_mismatches, - total_missing_olr, total_missing_lm, - total_tail_olr, total_tail_lm) - sys.exit(rc) - - else: - # Original one-shot mode: poll until idle, validate, exit - try: - while True: - time.sleep(POLL_INTERVAL) - - lm_count = conn.execute( - "SELECT COUNT(*) FROM lm_events").fetchone()[0] - olr_count = conn.execute( - "SELECT COUNT(*) FROM olr_events").fetchone()[0] - - if lm_count != prev_lm_count or olr_count != prev_olr_count: - last_new_events = time.time() - prev_lm_count = lm_count - prev_olr_count = olr_count - - # Try a validation cycle (safe frontier only) - result = validate_cycle(conn, cursor_by_node, safe_frontier) - (v, m, mm, mo, ml, to_, tl, lmc, oc, nf) = result + print(f"[validator] validated={total_validated} " + f"matched={total_matched} " + f"mismatches={total_mismatches} " + f"missing_olr={total_missing_olr} " + f"extra_olr={total_missing_lm}" + f"{tail_str}{purge_str} " + f"lm_total={lmc} olr_total={oc} " + f"frontier={frontier_str}", flush=True) + elif time.time() - last_new_events > IDLE_TIMEOUT: + # Idle timeout — do final widened pass + print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). " + f"Final validation pass...", flush=True) + result = validate_cycle( + conn, cursor_by_node, safe_frontier, widen=True) + (v, m, mm, mo, ml, to_, tl, lmc, oc, _) = result total_validated += v total_matched += m total_mismatches += mm @@ -497,45 +447,17 @@ def main(): total_missing_lm += ml total_tail_olr += to_ total_tail_lm += tl + break + + except KeyboardInterrupt: + pass + finally: + conn.close() - if v > 0: - frontier_str = ','.join( - f'{k}={v_}' for k, v_ in sorted(cursor_by_node.items())) - tail_str = (f" tail_olr={total_tail_olr} tail_lm={total_tail_lm}" - if total_tail_olr or total_tail_lm else "") - print(f"[validator] validated={total_validated} " - f"matched={total_matched} " - f"mismatches={total_mismatches} " - f"missing_olr={total_missing_olr} " - f"extra_olr={total_missing_lm}" - f"{tail_str} " - f"lm_total={lmc} olr_total={oc} " - f"frontier={frontier_str}", flush=True) - elif time.time() - last_new_events > IDLE_TIMEOUT: - # Idle timeout — do final widened pass - print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). " - f"Final validation pass...", flush=True) - result = validate_cycle( - conn, cursor_by_node, safe_frontier, widen=True) - (v, m, mm, mo, ml, to_, tl, lmc, oc, _) = result - total_validated += v - total_matched += m - total_mismatches += mm - total_missing_olr += mo - total_missing_lm += ml - total_tail_olr += to_ - total_tail_lm += tl - break - - except KeyboardInterrupt: - pass - finally: - conn.close() - - rc = print_summary(total_validated, total_matched, total_mismatches, - total_missing_olr, total_missing_lm, - total_tail_olr, total_tail_lm) - sys.exit(rc) + rc = print_summary(total_validated, total_matched, total_mismatches, + total_missing_olr, total_missing_lm, + total_tail_olr, total_tail_lm) + sys.exit(rc) if __name__ == '__main__': From 159597e0d6c948a621c247306fb7c45647b8e820 Mon Sep 17 00:00:00 2001 From: Rophy Tsai Date: Sun, 12 Apr 2026 17:17:30 +0000 Subject: [PATCH 3/6] chore: in-workload fuzz cleanup + drop DBMS_SCHEDULER job - Move FUZZ_* table cleanup into workload PL/SQL (every 5min) so DELETEs flow through CDC and get validated like normal DML. - Remove DBMS_SCHEDULER FUZZ_CLEANUP job from up/down (replaced by in-workload cleanup). - Fix grep -q + pipefail SIGPIPE false-failure in OLR/Debezium readiness waits. --- tests/dbz-twin/rac/fuzz-test.sh | 39 +++-------------------- tests/dbz-twin/rac/perf/fuzz-workload.sql | 14 ++++++++ 2 files changed, 18 insertions(+), 35 deletions(-) diff --git a/tests/dbz-twin/rac/fuzz-test.sh b/tests/dbz-twin/rac/fuzz-test.sh index 2ff3ae06..80f5815e 100755 --- a/tests/dbz-twin/rac/fuzz-test.sh +++ b/tests/dbz-twin/rac/fuzz-test.sh @@ -204,7 +204,7 @@ action_up() { echo " Waiting for OLR..." for i in $(seq 1 90); do if ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman logs $OLR_CONTAINER 2>&1 | grep -q 'processing redo log'"; then + "podman logs $OLR_CONTAINER 2>&1 | grep 'processing redo log' > /dev/null"; then echo " OLR: ready" break fi @@ -216,9 +216,9 @@ action_up() { echo " Waiting for Debezium connectors..." for i in $(seq 1 90); do LM_OK=false; OLR_OK=false; LOB_LM_OK=false - docker logs fuzz-dbz-logminer 2>&1 | grep -q "Starting streaming" && LM_OK=true - docker logs fuzz-dbz-olr 2>&1 | grep -q "streaming client started\|Starting streaming" && OLR_OK=true - docker logs fuzz-dbz-lob-logminer 2>&1 | grep -q "Starting streaming" && LOB_LM_OK=true + docker logs fuzz-dbz-logminer 2>&1 | grep "Starting streaming" > /dev/null && LM_OK=true + docker logs fuzz-dbz-olr 2>&1 | grep -E "streaming client started|Starting streaming" > /dev/null && OLR_OK=true + docker logs fuzz-dbz-lob-logminer 2>&1 | grep "Starting streaming" > /dev/null && LOB_LM_OK=true if $LM_OK && $OLR_OK && $LOB_LM_OK; then echo " Debezium: ready (3 connectors)" break @@ -229,25 +229,6 @@ action_up() { mkdir -p "$WORK_DIR" - # Create Oracle cleanup scheduler job (purges FUZZ_* rows older than 24h) - cat > "$WORK_DIR/create_cleanup_job.sql" <<'SQL' -SET FEEDBACK OFF -BEGIN - BEGIN DBMS_SCHEDULER.DROP_JOB('FUZZ_CLEANUP', TRUE); EXCEPTION WHEN OTHERS THEN NULL; END; - DBMS_SCHEDULER.CREATE_JOB( - job_name => 'FUZZ_CLEANUP', - job_type => 'PLSQL_BLOCK', - job_action => 'BEGIN FUZZ_WKL.cleanup; END;', - repeat_interval => 'FREQ=MINUTELY;INTERVAL=30', - enabled => TRUE - ); -END; -/ -EXIT -SQL - _exec_user "$WORK_DIR/create_cleanup_job.sql" > /dev/null 2>&1 || true - echo " Oracle cleanup job: created (every 30min)" - echo "" echo " OLR memory: $(_olr_memory_mb) MB" echo "" @@ -473,18 +454,6 @@ action_logs() { action_down() { echo "=== Stopping fuzz test infrastructure ===" - # Drop Oracle cleanup scheduler job - local drop_sql="$WORK_DIR/drop_cleanup_job.sql" - if [[ -d "$WORK_DIR" ]]; then - cat > "$drop_sql" <<'SQL' -SET FEEDBACK OFF -BEGIN DBMS_SCHEDULER.DROP_JOB('FUZZ_CLEANUP', TRUE); EXCEPTION WHEN OTHERS THEN NULL; END; -/ -EXIT -SQL - _exec_user "$drop_sql" > /dev/null 2>&1 || true - fi - docker compose -f "$COMPOSE_FILE" down -v 2>/dev/null ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ "podman stop -t5 $OLR_CONTAINER 2>/dev/null; podman rm $OLR_CONTAINER 2>/dev/null; true" diff --git a/tests/dbz-twin/rac/perf/fuzz-workload.sql b/tests/dbz-twin/rac/perf/fuzz-workload.sql index 9488c317..2767fd08 100644 --- a/tests/dbz-twin/rac/perf/fuzz-workload.sql +++ b/tests/dbz-twin/rac/perf/fuzz-workload.sql @@ -726,6 +726,7 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS ) IS v_start TIMESTAMP := SYSTIMESTAMP; v_deadline TIMESTAMP := SYSTIMESTAMP + NUMTODSINTERVAL(p_duration_secs, 'SECOND'); + v_next_cleanup TIMESTAMP := SYSTIMESTAMP + INTERVAL '5' MINUTE; v_txn_dice PLS_INTEGER; v_batch PLS_INTEGER; v_seed_id PLS_INTEGER; @@ -829,6 +830,15 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS IF MOD(g_total_ops, 100) = 0 THEN update_stats; END IF; + + -- Periodic housekeeping: DELETE rows older than 24h every 5 min. + -- Cleanup DELETEs become ordinary DML — captured by both LM and OLR, + -- validated like any other event. In short runs the threshold is + -- never reached, so nothing is deleted. + IF SYSTIMESTAMP > v_next_cleanup THEN + cleanup; + v_next_cleanup := SYSTIMESTAMP + INTERVAL '5' MINUTE; + END IF; END LOOP; -- Final commit + stats @@ -851,13 +861,17 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS -- ---- Cleanup: purge rows older than 24h ---- PROCEDURE cleanup IS + v_deleted PLS_INTEGER; BEGIN FOR t IN (SELECT table_name FROM user_tables WHERE table_name LIKE 'FUZZ_%' AND table_name != 'FUZZ_STATS') LOOP EXECUTE IMMEDIATE 'DELETE FROM ' || t.table_name || ' WHERE created_at < SYSDATE - 1'; + v_deleted := SQL%ROWCOUNT; COMMIT; + g_delete_cnt := g_delete_cnt + v_deleted; + g_total_ops := g_total_ops + v_deleted; END LOOP; END; From 433e01613d7575919280844d8aa691f0f53d2c57 Mon Sep 17 00:00:00 2001 From: Rophy Tsai Date: Mon, 13 Apr 2026 05:07:24 +0000 Subject: [PATCH 4/6] =?UTF-8?q?feat:=20soak-mode=20prerequisites=20?= =?UTF-8?q?=E2=80=94=20idempotent=20workload=20+=20resumable=20validator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - fuzz-workload.sql: make FUZZ_WKL.run() idempotent across cycles. Repopulate per-node tracked-ID arrays from existing rows (parity- filtered), continue g_next_id from MAX(id) preserving parity, and continue g_event_seq from MAX numeric tail of existing event_ids so cycle N+1 event_ids never collide with cycle N. Seed INSERTs only run on cold start (empty tables). - validator.py: accept START_CURSOR env and emit '[validator] final_cursor=...' (safe frontier only). Lets a soak loop resume past already-validated events without re-scanning. - fuzz-test.sh: forward START_CURSOR to the validator container; dump sqlplus output when FUZZ_DONE summary is missing (diagnostic). Verified: 3 back-to-back cycles with no teardown, 32k events, 0 mismatches, cursor advances each cycle. --- tests/dbz-twin/rac/fuzz-test.sh | 13 +- tests/dbz-twin/rac/perf/fuzz-workload.sql | 137 +++++++++++++++++----- tests/dbz-twin/rac/validator.py | 30 ++++- 3 files changed, 144 insertions(+), 36 deletions(-) diff --git a/tests/dbz-twin/rac/fuzz-test.sh b/tests/dbz-twin/rac/fuzz-test.sh index 80f5815e..b54facc5 100755 --- a/tests/dbz-twin/rac/fuzz-test.sh +++ b/tests/dbz-twin/rac/fuzz-test.sh @@ -308,12 +308,14 @@ SQL if [[ -n "$done1" ]]; then echo " Node 1: $done1" else - echo " Node 1: workload finished (no summary line)" + echo " Node 1: workload finished (no FUZZ_DONE line) — sqlplus output:" + sed 's/^/ | /' "$work_dir/fuzz_out1.log" fi if [[ -n "$done2" ]]; then echo " Node 2: $done2" else - echo " Node 2: workload finished (no summary line)" + echo " Node 2: workload finished (no FUZZ_DONE line) — sqlplus output:" + sed 's/^/ | /' "$work_dir/fuzz_out2.log" fi if [[ $rc1 -ne 0 || $rc2 -ne 0 ]]; then @@ -402,9 +404,12 @@ action_validate() { final_counts=$(docker logs --tail 1 fuzz-consumer 2>/dev/null | grep -o '\[consumer\].*' || echo "unknown") echo " Consumer idle for 30s. Last status: $final_counts" - # Start validator (uses 'validate' profile) + # Start validator (uses 'validate' profile). + # START_CURSOR env (optional) is forwarded to the container so soak loops + # can resume past already-validated events without re-scanning. local exit_code=0 - docker compose -f "$COMPOSE_FILE" run --rm validator || exit_code=$? + docker compose -f "$COMPOSE_FILE" run --rm \ + -e "START_CURSOR=${START_CURSOR:-}" validator || exit_code=$? echo "" echo " OLR memory: $(_olr_memory_mb) MB" diff --git a/tests/dbz-twin/rac/perf/fuzz-workload.sql b/tests/dbz-twin/rac/perf/fuzz-workload.sql index 2767fd08..bd8dd3a1 100644 --- a/tests/dbz-twin/rac/perf/fuzz-workload.sql +++ b/tests/dbz-twin/rac/perf/fuzz-workload.sql @@ -731,51 +731,126 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS v_batch PLS_INTEGER; v_seed_id PLS_INTEGER; v_seed_region VARCHAR2(20); + v_max_id PLS_INTEGER; + v_eid_prefix VARCHAR2(5); + v_max_seq PLS_INTEGER; BEGIN -- Initialize g_node_id := p_node_id; - g_next_id := p_node_id; -- 1 for node 1 (odd), 2 for node 2 (even) g_event_seq := 0; g_insert_cnt := 0; g_update_cnt := 0; g_delete_cnt := 0; g_rollback_cnt := 0; g_lob_cnt := 0; g_total_ops := 0; g_skip_lob := p_skip_lob; + g_scalar_id_cnt := 0; g_lob_id_cnt := 0; g_wide_id_cnt := 0; + g_part_id_cnt := 0; g_maxstr_id_cnt := 0; g_interval_id_cnt := 0; DBMS_RANDOM.SEED(p_seed); - -- Seed initial data (need rows before we can UPDATE/DELETE). - -- event_id='SEED' so the consumer skips these (they may arrive - -- before LogMiner starts streaming). - v_seed_id := 0; - g_scalar_id_cnt := 0; g_lob_id_cnt := 0; g_wide_id_cnt := 0; - g_part_id_cnt := 0; g_maxstr_id_cnt := 0; g_interval_id_cnt := 0; - FOR i IN 1..50 LOOP - v_seed_id := next_id; - INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_flag) - VALUES (v_seed_id, 'SEED', DBMS_RANDOM.STRING('x', 20), 0); - track_id(g_scalar_ids, g_scalar_id_cnt, v_seed_id); + -- Repopulate per-node tracked-ID arrays from any carryover rows. + -- Soak mode reuses tables across cycles; session arrays do not + -- persist. Only IDs with this node's parity are eligible (same + -- invariant next_id maintains for new INSERTs). + FOR r IN (SELECT id FROM olr_test.FUZZ_SCALAR + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_scalar_ids, g_scalar_id_cnt, r.id); + END LOOP; + FOR r IN (SELECT id FROM olr_test.FUZZ_LOB + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_lob_ids, g_lob_id_cnt, r.id); + END LOOP; + FOR r IN (SELECT id FROM olr_test.FUZZ_WIDE + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_wide_ids, g_wide_id_cnt, r.id); + END LOOP; + FOR r IN (SELECT id FROM olr_test.FUZZ_PART + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_part_ids, g_part_id_cnt, r.id); END LOOP; - IF g_skip_lob = 0 THEN - FOR i IN 1..5 LOOP + FOR r IN (SELECT id FROM olr_test.FUZZ_MAXSTR + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_maxstr_ids, g_maxstr_id_cnt, r.id); + END LOOP; + FOR r IN (SELECT id FROM olr_test.FUZZ_INTERVAL + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_interval_ids, g_interval_id_cnt, r.id); + END LOOP; + + -- Continue ID sequence from current global max, preserving parity. + -- Oracle MOD(-1,2)=-1 so a negative NVL default would break the + -- parity check; handle NULL (empty tables) separately. + SELECT MAX(id) INTO v_max_id FROM ( + SELECT id FROM olr_test.FUZZ_SCALAR + UNION ALL SELECT id FROM olr_test.FUZZ_LOB + UNION ALL SELECT id FROM olr_test.FUZZ_WIDE + UNION ALL SELECT id FROM olr_test.FUZZ_PART + UNION ALL SELECT id FROM olr_test.FUZZ_MAXSTR + UNION ALL SELECT id FROM olr_test.FUZZ_INTERVAL); + IF v_max_id IS NULL THEN + g_next_id := p_node_id; -- cold start: 1 or 2 + ELSIF MOD(v_max_id, 2) = MOD(p_node_id, 2) THEN + g_next_id := v_max_id + 2; + ELSE + g_next_id := v_max_id + 1; + END IF; + + -- Continue event_id sequence across soak cycles (session-local + -- g_event_seq would otherwise restart at 1 each cycle and collide + -- with prior cycles' event_ids on the same node). + v_eid_prefix := 'N' || p_node_id || '_'; + SELECT NVL(MAX(TO_NUMBER(SUBSTR(event_id, LENGTH(v_eid_prefix) + 1))), 0) + INTO v_max_seq FROM ( + SELECT event_id FROM olr_test.FUZZ_SCALAR + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_LOB + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_WIDE + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_PART + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_NOPK + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_MAXSTR + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_INTERVAL + WHERE event_id LIKE v_eid_prefix || '________'); + g_event_seq := v_max_seq; + + -- Seed initial data only on cold start (empty tables). UPDATE/DELETE + -- need existing rows; event_id='SEED' so the consumer skips these + -- (they may arrive before LogMiner streaming starts). On soak + -- re-entry (non-empty tables) seeding is skipped. + IF g_scalar_id_cnt = 0 THEN + v_seed_id := 0; + FOR i IN 1..50 LOOP v_seed_id := next_id; - INSERT INTO olr_test.FUZZ_LOB (id, event_id, label, content) - VALUES (v_seed_id, 'SEED', 'seed', 'seed'); - track_id(g_lob_ids, g_lob_id_cnt, v_seed_id); + INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_flag) + VALUES (v_seed_id, 'SEED', DBMS_RANDOM.STRING('x', 20), 0); + track_id(g_scalar_ids, g_scalar_id_cnt, v_seed_id); END LOOP; + IF g_skip_lob = 0 THEN + FOR i IN 1..5 LOOP + v_seed_id := next_id; + INSERT INTO olr_test.FUZZ_LOB (id, event_id, label, content) + VALUES (v_seed_id, 'SEED', 'seed', 'seed'); + track_id(g_lob_ids, g_lob_id_cnt, v_seed_id); + END LOOP; + END IF; + FOR i IN 1..20 LOOP + v_seed_id := next_id; + v_seed_region := REGIONS(rand_int(1, 5)); + INSERT INTO olr_test.FUZZ_PART (id, event_id, region, val, payload) + VALUES (v_seed_id, 'SEED', v_seed_region, 0, 'seed'); + track_id(g_part_ids, g_part_id_cnt, v_seed_id); + END LOOP; + FOR i IN 1..10 LOOP + INSERT INTO olr_test.FUZZ_NOPK (event_id, name, value, status) + VALUES ('SEED', 'seed', 0, 'ACTIVE'); + END LOOP; + COMMIT; END IF; - FOR i IN 1..20 LOOP - v_seed_id := next_id; - v_seed_region := REGIONS(rand_int(1, 5)); - INSERT INTO olr_test.FUZZ_PART (id, event_id, region, val, payload) - VALUES (v_seed_id, 'SEED', v_seed_region, 0, 'seed'); - track_id(g_part_ids, g_part_id_cnt, v_seed_id); - END LOOP; - FOR i IN 1..10 LOOP - INSERT INTO olr_test.FUZZ_NOPK (event_id, name, value, status) - VALUES ('SEED', 'seed', 0, 'ACTIVE'); - END LOOP; - COMMIT; - -- Reset counters so tracked events start fresh - g_event_seq := 0; + -- Reset op counters so tracked events start fresh. Do NOT reset + -- g_event_seq here: seed INSERTs use literal 'SEED' (not + -- next_event_id), and in soak mode it was just restored from DB. g_insert_cnt := 0; g_total_ops := 0; -- Main loop diff --git a/tests/dbz-twin/rac/validator.py b/tests/dbz-twin/rac/validator.py index 55ecb0c0..0a34df9f 100644 --- a/tests/dbz-twin/rac/validator.py +++ b/tests/dbz-twin/rac/validator.py @@ -24,6 +24,22 @@ POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '10')) IDLE_TIMEOUT = int(os.environ.get('IDLE_TIMEOUT', '120')) PURGE_TTL_HOURS = int(os.environ.get('PURGE_TTL_HOURS', '24')) +START_CURSOR = os.environ.get('START_CURSOR', '') + + +def parse_cursor(s): + """Parse 'N1=event_id,N2=event_id' into dict. Empty input -> empty dict.""" + out = {} + for part in s.split(','): + part = part.strip() + if '=' in part: + k, v = part.split('=', 1) + out[k.strip()] = v.strip() + return out + + +def format_cursor(d): + return ','.join(f'{k}={v}' for k, v in sorted(d.items())) # LOB tables that use final-state replay for comparison. # With the hybrid setup (OLR for non-LOB + LogMiner for LOB), these tables @@ -379,7 +395,14 @@ def main(): conn.execute("PRAGMA busy_timeout=30000") cursor_by_node = {'N1': '', 'N2': ''} - safe_frontier = {} + seeded = parse_cursor(START_CURSOR) + for k, v in seeded.items(): + if k in cursor_by_node: + cursor_by_node[k] = v + if seeded: + print(f"[validator] resuming from cursor: " + f"{format_cursor(cursor_by_node)}", flush=True) + safe_frontier = dict(cursor_by_node) total_validated = 0 total_matched = 0 total_mismatches = 0 @@ -454,6 +477,11 @@ def main(): finally: conn.close() + # Emit resumable cursor for soak loop (safe frontier only — never widened, + # so late-arriving lagging-side events are re-picked-up next cycle). + final = safe_frontier if any(safe_frontier.values()) else cursor_by_node + print(f"[validator] final_cursor={format_cursor(final)}", flush=True) + rc = print_summary(total_validated, total_matched, total_mismatches, total_missing_olr, total_missing_lm, total_tail_olr, total_tail_lm) From d938471d70aab4b2b3e277484d90c46c25d414d5 Mon Sep 17 00:00:00 2001 From: Rophy Tsai Date: Wed, 29 Apr 2026 20:54:53 +0000 Subject: [PATCH 5/6] feat: add soak.sh driver for back-to-back fuzz cycles --- tests/dbz-twin/rac/soak.sh | 42 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100755 tests/dbz-twin/rac/soak.sh diff --git a/tests/dbz-twin/rac/soak.sh b/tests/dbz-twin/rac/soak.sh new file mode 100755 index 00000000..610b9b45 --- /dev/null +++ b/tests/dbz-twin/rac/soak.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# Soak driver: back-to-back fuzz cycles until DURATION_SEC elapses or a cycle fails. +# Assumes `fuzz-test.sh up` was already run. +set -e +cd "$(dirname "$0")" +source /home/rophy/projects/OpenLogReplicator/tests/environments/rac/vm-env.sh + +DURATION_SEC="${DURATION_SEC:-28800}" # 8h default +CYCLE_MIN="${CYCLE_MIN:-10}" +LOG_DIR="${LOG_DIR:-./soak-logs/$(date +%Y%m%d-%H%M%S)}" +mkdir -p "$LOG_DIR" + +deadline=$(( $(date +%s) + DURATION_SEC )) +cursor="" +cycle=0 + +echo "soak start: duration=${DURATION_SEC}s cycle=${CYCLE_MIN}min log_dir=$LOG_DIR" | tee "$LOG_DIR/summary.log" + +while [[ $(date +%s) -lt $deadline ]]; do + cycle=$((cycle + 1)) + num=$(printf "%03d" "$cycle") + ts=$(date -Iseconds) + echo "[$ts] cycle $num: run ${CYCLE_MIN}min" | tee -a "$LOG_DIR/summary.log" + + if ! ./fuzz-test.sh run "$CYCLE_MIN" > "$LOG_DIR/cycle-${num}-run.log" 2>&1; then + echo "[$(date -Iseconds)] cycle $num: RUN FAILED" | tee -a "$LOG_DIR/summary.log" + exit 1 + fi + + if ! START_CURSOR="$cursor" ./fuzz-test.sh validate > "$LOG_DIR/cycle-${num}-validate.log" 2>&1; then + echo "[$(date -Iseconds)] cycle $num: VALIDATE FAILED" | tee -a "$LOG_DIR/summary.log" + tail -40 "$LOG_DIR/cycle-${num}-validate.log" | tee -a "$LOG_DIR/summary.log" + exit 1 + fi + + new_cursor=$(grep 'final_cursor=' "$LOG_DIR/cycle-${num}-validate.log" | tail -1 | sed 's/.*final_cursor=//') + summary=$(grep -E "Total validated|Matched|Mismatches|RESULT" "$LOG_DIR/cycle-${num}-validate.log" | tr '\n' ' ') + echo "[$(date -Iseconds)] cycle $num: cursor=$new_cursor | $summary" | tee -a "$LOG_DIR/summary.log" + cursor="$new_cursor" +done + +echo "[$(date -Iseconds)] soak complete: $cycle cycles" | tee -a "$LOG_DIR/summary.log" From c95225ce07dac4473bacf765b2d701d6a20491c9 Mon Sep 17 00:00:00 2001 From: Rophy Tsai Date: Wed, 29 Apr 2026 22:24:27 +0000 Subject: [PATCH 6/6] fix: use relative path for vm-env.sh in soak.sh --- tests/dbz-twin/rac/soak.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dbz-twin/rac/soak.sh b/tests/dbz-twin/rac/soak.sh index 610b9b45..47ff52b1 100755 --- a/tests/dbz-twin/rac/soak.sh +++ b/tests/dbz-twin/rac/soak.sh @@ -3,7 +3,7 @@ # Assumes `fuzz-test.sh up` was already run. set -e cd "$(dirname "$0")" -source /home/rophy/projects/OpenLogReplicator/tests/environments/rac/vm-env.sh +source ../../environments/rac/vm-env.sh DURATION_SEC="${DURATION_SEC:-28800}" # 8h default CYCLE_MIN="${CYCLE_MIN:-10}"