diff --git a/tests/dbz-twin/rac/archive-cleanup/Dockerfile b/tests/dbz-twin/rac/archive-cleanup/Dockerfile new file mode 100644 index 00000000..5b519b2e --- /dev/null +++ b/tests/dbz-twin/rac/archive-cleanup/Dockerfile @@ -0,0 +1,14 @@ +FROM alpine:3.20 + +ARG SUPERCRONIC_VERSION=v0.2.44 +ARG SUPERCRONIC_SHA=6eb0a8e1e6673675dc67668c1a9b6409f79c37bc +ARG SUPERCRONIC_URL=https://github.com/aptible/supercronic/releases/download/${SUPERCRONIC_VERSION}/supercronic-linux-amd64 + +RUN apk add --no-cache openssh-client curl \ + && curl -fsSL -o /usr/local/bin/supercronic "${SUPERCRONIC_URL}" \ + && echo "${SUPERCRONIC_SHA} /usr/local/bin/supercronic" | sha1sum -c - \ + && chmod +x /usr/local/bin/supercronic \ + && apk del curl + +ENTRYPOINT ["/usr/local/bin/supercronic", "-passthrough-logs"] +CMD ["/etc/crontab"] diff --git a/tests/dbz-twin/rac/archive-cleanup/crontab b/tests/dbz-twin/rac/archive-cleanup/crontab new file mode 100644 index 00000000..b39fba30 --- /dev/null +++ b/tests/dbz-twin/rac/archive-cleanup/crontab @@ -0,0 +1,5 @@ +# Archive log cleanup — delete archive logs older than 1 day on the RAC VM. +# Runs hourly. Each deleted file is printed to stdout (-> docker logs). +# +# Env vars (passed through by supercronic): VM_HOST, VM_USER +0 * * * * ssh -i /vm-key -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $VM_USER@$VM_HOST "find /shared/redo/archivelog -mtime +1 -print -delete" | awk 'NF{print "deleted: "$0}' diff --git a/tests/dbz-twin/rac/docker-compose-fuzz.yaml b/tests/dbz-twin/rac/docker-compose-fuzz.yaml index 999fa9df..43026b9b 100644 --- a/tests/dbz-twin/rac/docker-compose-fuzz.yaml +++ b/tests/dbz-twin/rac/docker-compose-fuzz.yaml @@ -80,6 +80,7 @@ services: LM_TOPIC: lm-events OLR_TOPIC: olr-events OLR_LOB_TOPIC: olr-lob-events + PURGE_TTL_HOURS: "${PURGE_TTL_HOURS:-24}" volumes: - ./kafka-consumer.py:/app/kafka-consumer.py:ro - fuzz-data:/app/data @@ -104,6 +105,18 @@ services: - ./validator.py:/app/validator.py:ro - fuzz-data:/app/data + archive-cleanup: + build: + context: ./archive-cleanup + image: fuzz-archive-cleanup:latest + container_name: fuzz-archive-cleanup + environment: + VM_HOST: ${VM_HOST:?VM_HOST is required} + VM_USER: ${VM_USER:-root} + volumes: + - ${VM_KEY:?VM_KEY is required}:/vm-key:ro + - ./archive-cleanup/crontab:/etc/crontab:ro + volumes: dbz-logminer-data: dbz-olr-data: diff --git a/tests/dbz-twin/rac/fuzz-test.sh b/tests/dbz-twin/rac/fuzz-test.sh index 8959bbe4..b54facc5 100755 --- a/tests/dbz-twin/rac/fuzz-test.sh +++ b/tests/dbz-twin/rac/fuzz-test.sh @@ -49,6 +49,7 @@ DB_CONN2="${DB_CONN2:-olr_test/olr_test@//racnodep2:1521/ORCLPDB}" OLR_CONTAINER="olr-debezium" COMPOSE_FILE="$SCRIPT_DIR/docker-compose-fuzz.yaml" +WORK_DIR="/tmp/fuzz_soak_state" # ---- SSH helpers ---- _vm_sqlplus() { @@ -203,7 +204,7 @@ action_up() { echo " Waiting for OLR..." for i in $(seq 1 90); do if ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ - "podman logs $OLR_CONTAINER 2>&1 | grep -q 'processing redo log'"; then + "podman logs $OLR_CONTAINER 2>&1 | grep 'processing redo log' > /dev/null"; then echo " OLR: ready" break fi @@ -215,9 +216,9 @@ action_up() { echo " Waiting for Debezium connectors..." for i in $(seq 1 90); do LM_OK=false; OLR_OK=false; LOB_LM_OK=false - docker logs fuzz-dbz-logminer 2>&1 | grep -q "Starting streaming" && LM_OK=true - docker logs fuzz-dbz-olr 2>&1 | grep -q "streaming client started\|Starting streaming" && OLR_OK=true - docker logs fuzz-dbz-lob-logminer 2>&1 | grep -q "Starting streaming" && LOB_LM_OK=true + docker logs fuzz-dbz-logminer 2>&1 | grep "Starting streaming" > /dev/null && LM_OK=true + docker logs fuzz-dbz-olr 2>&1 | grep -E "streaming client started|Starting streaming" > /dev/null && OLR_OK=true + docker logs fuzz-dbz-lob-logminer 2>&1 | grep "Starting streaming" > /dev/null && LOB_LM_OK=true if $LM_OK && $OLR_OK && $LOB_LM_OK; then echo " Debezium: ready (3 connectors)" break @@ -226,6 +227,8 @@ action_up() { sleep 2 done + mkdir -p "$WORK_DIR" + echo "" echo " OLR memory: $(_olr_memory_mb) MB" echo "" @@ -305,12 +308,14 @@ SQL if [[ -n "$done1" ]]; then echo " Node 1: $done1" else - echo " Node 1: workload finished (no summary line)" + echo " Node 1: workload finished (no FUZZ_DONE line) — sqlplus output:" + sed 's/^/ | /' "$work_dir/fuzz_out1.log" fi if [[ -n "$done2" ]]; then echo " Node 2: $done2" else - echo " Node 2: workload finished (no summary line)" + echo " Node 2: workload finished (no FUZZ_DONE line) — sqlplus output:" + sed 's/^/ | /' "$work_dir/fuzz_out2.log" fi if [[ $rc1 -ne 0 || $rc2 -ne 0 ]]; then @@ -399,9 +404,12 @@ action_validate() { final_counts=$(docker logs --tail 1 fuzz-consumer 2>/dev/null | grep -o '\[consumer\].*' || echo "unknown") echo " Consumer idle for 30s. Last status: $final_counts" - # Start validator (uses 'validate' profile) + # Start validator (uses 'validate' profile). + # START_CURSOR env (optional) is forwarded to the container so soak loops + # can resume past already-validated events without re-scanning. local exit_code=0 - docker compose -f "$COMPOSE_FILE" run --rm validator || exit_code=$? + docker compose -f "$COMPOSE_FILE" run --rm \ + -e "START_CURSOR=${START_CURSOR:-}" validator || exit_code=$? echo "" echo " OLR memory: $(_olr_memory_mb) MB" @@ -450,9 +458,11 @@ action_logs() { action_down() { echo "=== Stopping fuzz test infrastructure ===" + docker compose -f "$COMPOSE_FILE" down -v 2>/dev/null ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \ "podman stop -t5 $OLR_CONTAINER 2>/dev/null; podman rm $OLR_CONTAINER 2>/dev/null; true" + rm -rf "$WORK_DIR" echo " Done." } diff --git a/tests/dbz-twin/rac/kafka-consumer.py b/tests/dbz-twin/rac/kafka-consumer.py index 086cc514..7378b3da 100644 --- a/tests/dbz-twin/rac/kafka-consumer.py +++ b/tests/dbz-twin/rac/kafka-consumer.py @@ -20,6 +20,7 @@ KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP', 'localhost:9092') SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db') +PURGE_TTL_HOURS = int(os.environ.get('PURGE_TTL_HOURS', '24')) OP_MAP = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE'} @@ -30,9 +31,10 @@ def init_db(db_path): """Create SQLite database and tables.""" os.makedirs(os.path.dirname(db_path) or '.', exist_ok=True) - conn = sqlite3.connect(db_path) + conn = sqlite3.connect(db_path, timeout=30) conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA busy_timeout=30000") conn.execute(""" CREATE TABLE IF NOT EXISTS lm_events ( event_id TEXT NOT NULL, @@ -171,6 +173,7 @@ def main(): batch = [] batch_start = time.time() last_report = time.time() + last_seq_cleanup = time.time() try: while True: @@ -226,11 +229,29 @@ def main(): batch = [] batch_start = time.time() - # Seq maps grow with each unique event_id but entries are small - # (string key -> int value). For a 60-min run with ~30K events, - # this is ~2MB. Do not trim — trimming caused seq reset bugs - # where later events for the same event_id got seq=0 again, - # overwriting earlier events via INSERT OR REPLACE. + # Purge seq dict entries for events older than TTL. + # Safe because events outside the 24h window will never receive + # new Kafka messages — they're well past Kafka's retention. + now = time.time() + if now - last_seq_cleanup >= 600: # every 10 minutes + cutoff = now - PURGE_TTL_HOURS * 3600 + for tbl, seq_map in [('lm_events', lm_seq), + ('olr_events', olr_seq)]: + old_eids = set() + rows = conn.execute( + f"SELECT DISTINCT event_id FROM {tbl} " + "WHERE consumed_at < ?", (cutoff,)).fetchall() + old_eids = {r[0] for r in rows} + pruned = 0 + for eid in old_eids: + if eid in seq_map: + del seq_map[eid] + pruned += 1 + if pruned: + print(f"[consumer] Pruned {pruned} seq entries " + f"from {tbl} (older than {PURGE_TTL_HOURS}h)", + flush=True) + last_seq_cleanup = now # Report progress every 30 seconds now = time.time() diff --git a/tests/dbz-twin/rac/perf/fuzz-workload.sql b/tests/dbz-twin/rac/perf/fuzz-workload.sql index f3089ca7..bd8dd3a1 100644 --- a/tests/dbz-twin/rac/perf/fuzz-workload.sql +++ b/tests/dbz-twin/rac/perf/fuzz-workload.sql @@ -40,7 +40,8 @@ CREATE TABLE olr_test.FUZZ_SCALAR ( col_date DATE, col_ts TIMESTAMP(6), col_raw RAW(200), - col_flag NUMBER(1) DEFAULT 0 + col_flag NUMBER(1) DEFAULT 0, + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_SCALAR ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -57,7 +58,8 @@ CREATE TABLE olr_test.FUZZ_WIDE ( n06 NUMBER, n07 NUMBER, n08 NUMBER, n09 NUMBER, n10 NUMBER, d01 DATE, d02 DATE, d03 DATE, t01 TIMESTAMP, t02 TIMESTAMP, t03 TIMESTAMP, - r01 RAW(50), r02 RAW(50), r03 RAW(50) + r01 RAW(50), r02 RAW(50), r03 RAW(50), + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_WIDE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -69,7 +71,8 @@ CREATE TABLE olr_test.FUZZ_LOB ( event_id VARCHAR2(30) NOT NULL, label VARCHAR2(50), content CLOB, - bin_data BLOB + bin_data BLOB, + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_LOB ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -81,7 +84,8 @@ CREATE TABLE olr_test.FUZZ_PART ( event_id VARCHAR2(30) NOT NULL, region VARCHAR2(20), val NUMBER, - payload VARCHAR2(500) + payload VARCHAR2(500), + created_at DATE DEFAULT SYSDATE ) PARTITION BY LIST (region) ( PARTITION p_east VALUES ('EAST'), PARTITION p_west VALUES ('WEST'), @@ -99,7 +103,8 @@ CREATE TABLE olr_test.FUZZ_NOPK ( name VARCHAR2(100), value NUMBER, status VARCHAR2(20), - ts TIMESTAMP DEFAULT SYSTIMESTAMP + ts TIMESTAMP DEFAULT SYSTIMESTAMP, + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_NOPK ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -111,7 +116,8 @@ CREATE TABLE olr_test.FUZZ_MAXSTR ( event_id VARCHAR2(30) NOT NULL, col_long1 VARCHAR2(4000), col_long2 VARCHAR2(4000), - col_short VARCHAR2(10) + col_short VARCHAR2(10), + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_MAXSTR ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -123,7 +129,8 @@ CREATE TABLE olr_test.FUZZ_INTERVAL ( event_id VARCHAR2(30) NOT NULL, col_ym INTERVAL YEAR(4) TO MONTH, col_ds INTERVAL DAY(4) TO SECOND(6), - col_num NUMBER + col_num NUMBER, + created_at DATE DEFAULT SYSDATE ); ALTER TABLE olr_test.FUZZ_INTERVAL ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; @@ -152,6 +159,7 @@ CREATE OR REPLACE PACKAGE olr_test.FUZZ_WKL AS p_node_id IN NUMBER DEFAULT 1, p_skip_lob IN NUMBER DEFAULT 0 -- 1 = skip LOB table operations ); + PROCEDURE cleanup; END FUZZ_WKL; / @@ -718,55 +726,131 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS ) IS v_start TIMESTAMP := SYSTIMESTAMP; v_deadline TIMESTAMP := SYSTIMESTAMP + NUMTODSINTERVAL(p_duration_secs, 'SECOND'); + v_next_cleanup TIMESTAMP := SYSTIMESTAMP + INTERVAL '5' MINUTE; v_txn_dice PLS_INTEGER; v_batch PLS_INTEGER; v_seed_id PLS_INTEGER; v_seed_region VARCHAR2(20); + v_max_id PLS_INTEGER; + v_eid_prefix VARCHAR2(5); + v_max_seq PLS_INTEGER; BEGIN -- Initialize g_node_id := p_node_id; - g_next_id := p_node_id; -- 1 for node 1 (odd), 2 for node 2 (even) g_event_seq := 0; g_insert_cnt := 0; g_update_cnt := 0; g_delete_cnt := 0; g_rollback_cnt := 0; g_lob_cnt := 0; g_total_ops := 0; g_skip_lob := p_skip_lob; + g_scalar_id_cnt := 0; g_lob_id_cnt := 0; g_wide_id_cnt := 0; + g_part_id_cnt := 0; g_maxstr_id_cnt := 0; g_interval_id_cnt := 0; DBMS_RANDOM.SEED(p_seed); - -- Seed initial data (need rows before we can UPDATE/DELETE). - -- event_id='SEED' so the consumer skips these (they may arrive - -- before LogMiner starts streaming). - v_seed_id := 0; - g_scalar_id_cnt := 0; g_lob_id_cnt := 0; g_wide_id_cnt := 0; - g_part_id_cnt := 0; g_maxstr_id_cnt := 0; g_interval_id_cnt := 0; - FOR i IN 1..50 LOOP - v_seed_id := next_id; - INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_flag) - VALUES (v_seed_id, 'SEED', DBMS_RANDOM.STRING('x', 20), 0); - track_id(g_scalar_ids, g_scalar_id_cnt, v_seed_id); + -- Repopulate per-node tracked-ID arrays from any carryover rows. + -- Soak mode reuses tables across cycles; session arrays do not + -- persist. Only IDs with this node's parity are eligible (same + -- invariant next_id maintains for new INSERTs). + FOR r IN (SELECT id FROM olr_test.FUZZ_SCALAR + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_scalar_ids, g_scalar_id_cnt, r.id); + END LOOP; + FOR r IN (SELECT id FROM olr_test.FUZZ_LOB + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_lob_ids, g_lob_id_cnt, r.id); + END LOOP; + FOR r IN (SELECT id FROM olr_test.FUZZ_WIDE + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_wide_ids, g_wide_id_cnt, r.id); + END LOOP; + FOR r IN (SELECT id FROM olr_test.FUZZ_PART + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_part_ids, g_part_id_cnt, r.id); + END LOOP; + FOR r IN (SELECT id FROM olr_test.FUZZ_MAXSTR + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_maxstr_ids, g_maxstr_id_cnt, r.id); END LOOP; - IF g_skip_lob = 0 THEN - FOR i IN 1..5 LOOP + FOR r IN (SELECT id FROM olr_test.FUZZ_INTERVAL + WHERE MOD(id, 2) = MOD(p_node_id, 2)) LOOP + track_id(g_interval_ids, g_interval_id_cnt, r.id); + END LOOP; + + -- Continue ID sequence from current global max, preserving parity. + -- Oracle MOD(-1,2)=-1 so a negative NVL default would break the + -- parity check; handle NULL (empty tables) separately. + SELECT MAX(id) INTO v_max_id FROM ( + SELECT id FROM olr_test.FUZZ_SCALAR + UNION ALL SELECT id FROM olr_test.FUZZ_LOB + UNION ALL SELECT id FROM olr_test.FUZZ_WIDE + UNION ALL SELECT id FROM olr_test.FUZZ_PART + UNION ALL SELECT id FROM olr_test.FUZZ_MAXSTR + UNION ALL SELECT id FROM olr_test.FUZZ_INTERVAL); + IF v_max_id IS NULL THEN + g_next_id := p_node_id; -- cold start: 1 or 2 + ELSIF MOD(v_max_id, 2) = MOD(p_node_id, 2) THEN + g_next_id := v_max_id + 2; + ELSE + g_next_id := v_max_id + 1; + END IF; + + -- Continue event_id sequence across soak cycles (session-local + -- g_event_seq would otherwise restart at 1 each cycle and collide + -- with prior cycles' event_ids on the same node). + v_eid_prefix := 'N' || p_node_id || '_'; + SELECT NVL(MAX(TO_NUMBER(SUBSTR(event_id, LENGTH(v_eid_prefix) + 1))), 0) + INTO v_max_seq FROM ( + SELECT event_id FROM olr_test.FUZZ_SCALAR + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_LOB + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_WIDE + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_PART + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_NOPK + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_MAXSTR + WHERE event_id LIKE v_eid_prefix || '________' + UNION ALL SELECT event_id FROM olr_test.FUZZ_INTERVAL + WHERE event_id LIKE v_eid_prefix || '________'); + g_event_seq := v_max_seq; + + -- Seed initial data only on cold start (empty tables). UPDATE/DELETE + -- need existing rows; event_id='SEED' so the consumer skips these + -- (they may arrive before LogMiner streaming starts). On soak + -- re-entry (non-empty tables) seeding is skipped. + IF g_scalar_id_cnt = 0 THEN + v_seed_id := 0; + FOR i IN 1..50 LOOP v_seed_id := next_id; - INSERT INTO olr_test.FUZZ_LOB (id, event_id, label, content) - VALUES (v_seed_id, 'SEED', 'seed', 'seed'); - track_id(g_lob_ids, g_lob_id_cnt, v_seed_id); + INSERT INTO olr_test.FUZZ_SCALAR (id, event_id, col_varchar, col_flag) + VALUES (v_seed_id, 'SEED', DBMS_RANDOM.STRING('x', 20), 0); + track_id(g_scalar_ids, g_scalar_id_cnt, v_seed_id); END LOOP; + IF g_skip_lob = 0 THEN + FOR i IN 1..5 LOOP + v_seed_id := next_id; + INSERT INTO olr_test.FUZZ_LOB (id, event_id, label, content) + VALUES (v_seed_id, 'SEED', 'seed', 'seed'); + track_id(g_lob_ids, g_lob_id_cnt, v_seed_id); + END LOOP; + END IF; + FOR i IN 1..20 LOOP + v_seed_id := next_id; + v_seed_region := REGIONS(rand_int(1, 5)); + INSERT INTO olr_test.FUZZ_PART (id, event_id, region, val, payload) + VALUES (v_seed_id, 'SEED', v_seed_region, 0, 'seed'); + track_id(g_part_ids, g_part_id_cnt, v_seed_id); + END LOOP; + FOR i IN 1..10 LOOP + INSERT INTO olr_test.FUZZ_NOPK (event_id, name, value, status) + VALUES ('SEED', 'seed', 0, 'ACTIVE'); + END LOOP; + COMMIT; END IF; - FOR i IN 1..20 LOOP - v_seed_id := next_id; - v_seed_region := REGIONS(rand_int(1, 5)); - INSERT INTO olr_test.FUZZ_PART (id, event_id, region, val, payload) - VALUES (v_seed_id, 'SEED', v_seed_region, 0, 'seed'); - track_id(g_part_ids, g_part_id_cnt, v_seed_id); - END LOOP; - FOR i IN 1..10 LOOP - INSERT INTO olr_test.FUZZ_NOPK (event_id, name, value, status) - VALUES ('SEED', 'seed', 0, 'ACTIVE'); - END LOOP; - COMMIT; - -- Reset counters so tracked events start fresh - g_event_seq := 0; + -- Reset op counters so tracked events start fresh. Do NOT reset + -- g_event_seq here: seed INSERTs use literal 'SEED' (not + -- next_event_id), and in soak mode it was just restored from DB. g_insert_cnt := 0; g_total_ops := 0; -- Main loop @@ -821,6 +905,15 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS IF MOD(g_total_ops, 100) = 0 THEN update_stats; END IF; + + -- Periodic housekeeping: DELETE rows older than 24h every 5 min. + -- Cleanup DELETEs become ordinary DML — captured by both LM and OLR, + -- validated like any other event. In short runs the threshold is + -- never reached, so nothing is deleted. + IF SYSTIMESTAMP > v_next_cleanup THEN + cleanup; + v_next_cleanup := SYSTIMESTAMP + INTERVAL '5' MINUTE; + END IF; END LOOP; -- Final commit + stats @@ -840,6 +933,23 @@ CREATE OR REPLACE PACKAGE BODY olr_test.FUZZ_WKL AS EXTRACT(HOUR FROM (SYSTIMESTAMP - v_start)) * 3600)); END; + -- ---- Cleanup: purge rows older than 24h ---- + + PROCEDURE cleanup IS + v_deleted PLS_INTEGER; + BEGIN + FOR t IN (SELECT table_name FROM user_tables + WHERE table_name LIKE 'FUZZ_%' + AND table_name != 'FUZZ_STATS') LOOP + EXECUTE IMMEDIATE 'DELETE FROM ' || t.table_name || + ' WHERE created_at < SYSDATE - 1'; + v_deleted := SQL%ROWCOUNT; + COMMIT; + g_delete_cnt := g_delete_cnt + v_deleted; + g_total_ops := g_total_ops + v_deleted; + END LOOP; + END; + END FUZZ_WKL; / diff --git a/tests/dbz-twin/rac/soak.sh b/tests/dbz-twin/rac/soak.sh new file mode 100755 index 00000000..47ff52b1 --- /dev/null +++ b/tests/dbz-twin/rac/soak.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# Soak driver: back-to-back fuzz cycles until DURATION_SEC elapses or a cycle fails. +# Assumes `fuzz-test.sh up` was already run. +set -e +cd "$(dirname "$0")" +source ../../environments/rac/vm-env.sh + +DURATION_SEC="${DURATION_SEC:-28800}" # 8h default +CYCLE_MIN="${CYCLE_MIN:-10}" +LOG_DIR="${LOG_DIR:-./soak-logs/$(date +%Y%m%d-%H%M%S)}" +mkdir -p "$LOG_DIR" + +deadline=$(( $(date +%s) + DURATION_SEC )) +cursor="" +cycle=0 + +echo "soak start: duration=${DURATION_SEC}s cycle=${CYCLE_MIN}min log_dir=$LOG_DIR" | tee "$LOG_DIR/summary.log" + +while [[ $(date +%s) -lt $deadline ]]; do + cycle=$((cycle + 1)) + num=$(printf "%03d" "$cycle") + ts=$(date -Iseconds) + echo "[$ts] cycle $num: run ${CYCLE_MIN}min" | tee -a "$LOG_DIR/summary.log" + + if ! ./fuzz-test.sh run "$CYCLE_MIN" > "$LOG_DIR/cycle-${num}-run.log" 2>&1; then + echo "[$(date -Iseconds)] cycle $num: RUN FAILED" | tee -a "$LOG_DIR/summary.log" + exit 1 + fi + + if ! START_CURSOR="$cursor" ./fuzz-test.sh validate > "$LOG_DIR/cycle-${num}-validate.log" 2>&1; then + echo "[$(date -Iseconds)] cycle $num: VALIDATE FAILED" | tee -a "$LOG_DIR/summary.log" + tail -40 "$LOG_DIR/cycle-${num}-validate.log" | tee -a "$LOG_DIR/summary.log" + exit 1 + fi + + new_cursor=$(grep 'final_cursor=' "$LOG_DIR/cycle-${num}-validate.log" | tail -1 | sed 's/.*final_cursor=//') + summary=$(grep -E "Total validated|Matched|Mismatches|RESULT" "$LOG_DIR/cycle-${num}-validate.log" | tr '\n' ' ') + echo "[$(date -Iseconds)] cycle $num: cursor=$new_cursor | $summary" | tee -a "$LOG_DIR/summary.log" + cursor="$new_cursor" +done + +echo "[$(date -Iseconds)] soak complete: $cycle cycles" | tee -a "$LOG_DIR/summary.log" diff --git a/tests/dbz-twin/rac/validator.py b/tests/dbz-twin/rac/validator.py index d17d85c0..0a34df9f 100644 --- a/tests/dbz-twin/rac/validator.py +++ b/tests/dbz-twin/rac/validator.py @@ -23,6 +23,23 @@ SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db') POLL_INTERVAL = int(os.environ.get('POLL_INTERVAL', '10')) IDLE_TIMEOUT = int(os.environ.get('IDLE_TIMEOUT', '120')) +PURGE_TTL_HOURS = int(os.environ.get('PURGE_TTL_HOURS', '24')) +START_CURSOR = os.environ.get('START_CURSOR', '') + + +def parse_cursor(s): + """Parse 'N1=event_id,N2=event_id' into dict. Empty input -> empty dict.""" + out = {} + for part in s.split(','): + part = part.strip() + if '=' in part: + k, v = part.split('=', 1) + out[k.strip()] = v.strip() + return out + + +def format_cursor(d): + return ','.join(f'{k}={v}' for k, v in sorted(d.items())) # LOB tables that use final-state replay for comparison. # With the hybrid setup (OLR for non-LOB + LogMiner for LOB), these tables @@ -114,300 +131,361 @@ def compare_values(lm_cols, olr_cols, table, section='after'): return diffs +def purge_old_events(conn, ttl_hours): + """Delete events older than ttl_hours from both tables. Returns count deleted.""" + cutoff = time.time() - ttl_hours * 3600 + lm_del = conn.execute( + "DELETE FROM lm_events WHERE consumed_at < ?", (cutoff,)).rowcount + olr_del = conn.execute( + "DELETE FROM olr_events WHERE consumed_at < ?", (cutoff,)).rowcount + if lm_del or olr_del: + conn.commit() + return lm_del + olr_del + + +def validate_cycle(conn, cursor_by_node, safe_frontier, widen=False): + """Run one validation cycle. Returns (validated, matched, mismatches, + missing_olr, missing_lm, tail_olr, tail_lm, lm_count, olr_count).""" + validated = matched = mismatches = 0 + missing_olr = missing_lm = tail_olr = tail_lm = 0 + + lm_count = conn.execute("SELECT COUNT(*) FROM lm_events").fetchone()[0] + olr_count = conn.execute("SELECT COUNT(*) FROM olr_events").fetchone()[0] + + # Find safe frontier per node: min(lm, olr) for each N{x} prefix. + node_frontiers = {} + for node_prefix in ('N1', 'N2'): + lm_node_max = conn.execute( + "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + olr_node_max = conn.execute( + "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + if lm_node_max and olr_node_max: + node_frontiers[node_prefix] = min(lm_node_max, olr_node_max) + + if not node_frontiers: + return (0, 0, 0, 0, 0, 0, 0, lm_count, olr_count, node_frontiers) + + if widen: + # Save safe frontier before widening + safe_frontier.update(node_frontiers) + for node_prefix in ('N1', 'N2'): + lm_n = conn.execute( + "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + olr_n = conn.execute( + "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", + (f'{node_prefix}_%',)).fetchone()[0] + if lm_n or olr_n: + node_frontiers[node_prefix] = max(lm_n or '', olr_n or '') + + # Check if any node has new events beyond its cursor + any_new = any( + nf > cursor_by_node.get(np, '') + for np, nf in node_frontiers.items() + ) + if not any_new: + return (0, 0, 0, 0, 0, 0, 0, lm_count, olr_count, node_frontiers) + + # Fetch event_ids within each node's safe frontier + lm_ids = set() + olr_ids = set() + for node_prefix, nf in node_frontiers.items(): + node_cursor = cursor_by_node.get(node_prefix, '') + for r in conn.execute( + "SELECT DISTINCT event_id FROM lm_events " + "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", + (node_cursor, nf, f'{node_prefix}_%')).fetchall(): + lm_ids.add(r['event_id']) + for r in conn.execute( + "SELECT DISTINCT event_id FROM olr_events " + "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", + (node_cursor, nf, f'{node_prefix}_%')).fetchall(): + olr_ids.add(r['event_id']) + + all_ids = sorted(lm_ids | olr_ids) + + for eid in all_ids: + in_lm = eid in lm_ids + in_olr = eid in olr_ids + + # Check if this event is beyond the safe frontier (tail lag) + node_prefix = eid[:2] + is_tail = (safe_frontier + and node_prefix in safe_frontier + and eid > safe_frontier[node_prefix]) + + # Determine table from whichever side has the event + if in_lm: + tbl_row = conn.execute( + "SELECT table_name FROM lm_events WHERE event_id = ? LIMIT 1", + (eid,)).fetchone() + else: + tbl_row = conn.execute( + "SELECT table_name FROM olr_events WHERE event_id = ? LIMIT 1", + (eid,)).fetchone() + event_table = tbl_row['table_name'] if tbl_row else '?' + is_lob = event_table.split('.')[-1].upper() in LOB_TABLES + + if in_lm and not in_olr: + missing_olr += 1 + if is_tail: + tail_lm += 1 + else: + mismatches += 1 + print(f"[MISSING_OLR] {eid} ({event_table})", flush=True) + validated += 1 + continue + + if in_olr and not in_lm: + missing_lm += 1 + if is_tail: + tail_olr += 1 + else: + mismatches += 1 + print(f"[EXTRA_OLR] {eid} ({event_table})", flush=True) + validated += 1 + continue + + # Both sides have the event — compare. + lm_recs = conn.execute( + "SELECT * FROM lm_events WHERE event_id = ? ORDER BY seq", + (eid,) + ).fetchall() + olr_recs = conn.execute( + "SELECT * FROM olr_events WHERE event_id = ? ORDER BY seq", + (eid,) + ).fetchall() + + if is_lob: + lm_state, lm_exists = replay_final_state(lm_recs) + olr_state, olr_exists = replay_final_state(olr_recs) + + if lm_exists != olr_exists: + mismatches += 1 + print(f"[LOB_EXISTENCE] {eid} ({event_table}): " + f"LM exists={lm_exists} OLR exists={olr_exists}", + flush=True) + validated += 1 + else: + diffs = compare_values(lm_state, olr_state, + event_table, 'after') + if diffs: + mismatches += 1 + print(f"[LOB_VALUE_DIFF] {eid} ({event_table}):", + flush=True) + for d in diffs[:5]: + print(d, flush=True) + else: + matched += 1 + validated += 1 + continue + + # Non-LOB tables: compare per (event_id, seq) directly. + lm_by_seq = {r['seq']: r for r in lm_recs} + olr_by_seq = {r['seq']: r for r in olr_recs} + all_seqs = sorted(set(lm_by_seq.keys()) | set(olr_by_seq.keys())) + + for seq in all_seqs: + lm_r = lm_by_seq.get(seq) + olr_r = olr_by_seq.get(seq) + + if lm_r and not olr_r: + missing_olr += 1 + mismatches += 1 + print(f"[MISSING_OLR] {eid} seq={seq} " + f"({lm_r['op']} {lm_r['table_name']})", + flush=True) + validated += 1 + continue + + if olr_r and not lm_r: + missing_lm += 1 + mismatches += 1 + print(f"[EXTRA_OLR] {eid} seq={seq} " + f"({olr_r['op']} {olr_r['table_name']})", + flush=True) + validated += 1 + continue + + # Both have this seq — compare + if lm_r['table_name'] != olr_r['table_name'] or \ + lm_r['op'] != olr_r['op']: + mismatches += 1 + print(f"[MISMATCH] {eid} seq={seq}: " + f"LM={lm_r['op']} {lm_r['table_name']}, " + f"OLR={olr_r['op']} {olr_r['table_name']}", + flush=True) + validated += 1 + continue + + lm_evt = json.loads(lm_r['raw_json']) + olr_evt = json.loads(olr_r['raw_json']) + lm_after = normalize_columns(lm_evt.get('after')) + olr_after = normalize_columns(olr_evt.get('after')) + lm_before = normalize_columns(lm_evt.get('before')) + olr_before = normalize_columns(olr_evt.get('before')) + + diffs = compare_values(lm_after, olr_after, + lm_r['table_name'], 'after') + diffs.extend(compare_values(lm_before, olr_before, + lm_r['table_name'], 'before')) + if diffs: + mismatches += 1 + print(f"[VALUE_DIFF] {eid} seq={seq} " + f"({lm_r['op']} {lm_r['table_name']}):", + flush=True) + for d in diffs[:5]: + print(d, flush=True) + else: + matched += 1 + + validated += 1 + + # Advance per-node cursors + for node_prefix, nf in node_frontiers.items(): + cursor_by_node[node_prefix] = nf + + return (validated, matched, mismatches, missing_olr, missing_lm, + tail_olr, tail_lm, lm_count, olr_count, node_frontiers) + + +def print_summary(total_validated, total_matched, total_mismatches, + total_missing_olr, total_missing_lm, + total_tail_olr, total_tail_lm): + """Print final validation summary and return exit code.""" + print(f"\n{'='*60}", flush=True) + print(f" Fuzz Test Validation Summary", flush=True) + print(f"{'='*60}", flush=True) + print(f" Total validated: {total_validated}", flush=True) + print(f" Matched: {total_matched}", flush=True) + print(f" Mismatches: {total_mismatches}", flush=True) + print(f" Missing from OLR: {total_missing_olr}", flush=True) + print(f" Extra in OLR: {total_missing_lm}", flush=True) + if total_tail_olr or total_tail_lm: + print(f" Tail (OLR ahead): {total_tail_olr}", flush=True) + print(f" Tail (LM ahead): {total_tail_lm}", flush=True) + + if total_mismatches > 0: + print(f"\n RESULT: FAIL ({total_mismatches} unexpected mismatches)", + flush=True) + return 1 + else: + print("\n RESULT: PASS", flush=True) + if total_tail_olr + total_tail_lm > 0: + print(f" ({total_tail_olr + total_tail_lm} tail events)", flush=True) + return 0 + + def main(): print(f"Validator starting", flush=True) print(f" SQLite DB: {SQLITE_DB}", flush=True) print(f" Poll interval: {POLL_INTERVAL}s", flush=True) print(f" Idle timeout: {IDLE_TIMEOUT}s", flush=True) + print(f" Purge TTL: {PURGE_TTL_HOURS}h", flush=True) # Wait for database to exist while not os.path.exists(SQLITE_DB): time.sleep(2) - conn = sqlite3.connect(SQLITE_DB) + conn = sqlite3.connect(SQLITE_DB, timeout=30) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") - - cursor_by_node = {'N1': '', 'N2': ''} # Per-node watermark - safe_frontier = {} # Last frontier before idle-timeout widening + conn.execute("PRAGMA busy_timeout=30000") + + cursor_by_node = {'N1': '', 'N2': ''} + seeded = parse_cursor(START_CURSOR) + for k, v in seeded.items(): + if k in cursor_by_node: + cursor_by_node[k] = v + if seeded: + print(f"[validator] resuming from cursor: " + f"{format_cursor(cursor_by_node)}", flush=True) + safe_frontier = dict(cursor_by_node) total_validated = 0 total_matched = 0 total_mismatches = 0 total_missing_lm = 0 total_missing_olr = 0 - total_tail_olr = 0 # OLR ahead of LM at drain time (not a bug) - total_tail_lm = 0 # LM ahead of OLR at drain time (not a bug) + total_tail_olr = 0 + total_tail_lm = 0 last_new_events = time.time() prev_lm_count = 0 prev_olr_count = 0 + # Poll until idle, validate, exit try: while True: time.sleep(POLL_INTERVAL) - # Get current counts - lm_count = conn.execute("SELECT COUNT(*) FROM lm_events").fetchone()[0] - olr_count = conn.execute("SELECT COUNT(*) FROM olr_events").fetchone()[0] + lm_count = conn.execute( + "SELECT COUNT(*) FROM lm_events").fetchone()[0] + olr_count = conn.execute( + "SELECT COUNT(*) FROM olr_events").fetchone()[0] - # Check for new events (idle detection) if lm_count != prev_lm_count or olr_count != prev_olr_count: last_new_events = time.time() prev_lm_count = lm_count prev_olr_count = olr_count - # Find safe frontier per node: min(lm, olr) for each N{x} prefix. - # Event_ids from two RAC nodes interleave non-monotonically in - # commit order, so a global frontier would validate events before - # the other side has delivered them. - node_frontiers = {} - for node_prefix in ('N1', 'N2'): - lm_node_max = conn.execute( - "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", - (f'{node_prefix}_%',)).fetchone()[0] - olr_node_max = conn.execute( - "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", - (f'{node_prefix}_%',)).fetchone()[0] - if lm_node_max and olr_node_max: - node_frontiers[node_prefix] = min(lm_node_max, olr_node_max) - - if not node_frontiers: - continue - - # Check if any node has new events beyond its cursor - any_new = any( - nf > cursor_by_node.get(np, '') - for np, nf in node_frontiers.items() - ) - if not any_new: - if time.time() - last_new_events > IDLE_TIMEOUT: - print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). " - f"Final validation pass...", flush=True) - # Save safe frontier before widening — events beyond this - # are tail lag (OLR or LM ahead), not real mismatches. - safe_frontier = dict(node_frontiers) - # Widen frontier to max of both sides per node to catch - # truly missing events (one side never delivered them). - for node_prefix in ('N1', 'N2'): - lm_n = conn.execute( - "SELECT MAX(event_id) FROM lm_events WHERE event_id LIKE ?", - (f'{node_prefix}_%',)).fetchone()[0] - olr_n = conn.execute( - "SELECT MAX(event_id) FROM olr_events WHERE event_id LIKE ?", - (f'{node_prefix}_%',)).fetchone()[0] - if lm_n or olr_n: - node_frontiers[node_prefix] = max(lm_n or '', olr_n or '') - # Re-check if there's anything new with widened frontier - any_new = any( - nf > cursor_by_node.get(np, '') - for np, nf in node_frontiers.items() - ) - if not any_new: - break - # Fall through to validate the widened range - else: - continue - - # Fetch event_ids within each node's safe frontier - lm_ids = set() - olr_ids = set() - for node_prefix, nf in node_frontiers.items(): - node_cursor = cursor_by_node.get(node_prefix, '') - for r in conn.execute( - "SELECT DISTINCT event_id FROM lm_events " - "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", - (node_cursor, nf, f'{node_prefix}_%')).fetchall(): - lm_ids.add(r['event_id']) - for r in conn.execute( - "SELECT DISTINCT event_id FROM olr_events " - "WHERE event_id > ? AND event_id <= ? AND event_id LIKE ?", - (node_cursor, nf, f'{node_prefix}_%')).fetchall(): - olr_ids.add(r['event_id']) - - all_ids = sorted(lm_ids | olr_ids) - - for eid in all_ids: - in_lm = eid in lm_ids - in_olr = eid in olr_ids - - # Check if this event is beyond the safe frontier (tail lag) - node_prefix = eid[:2] - is_tail = (safe_frontier - and node_prefix in safe_frontier - and eid > safe_frontier[node_prefix]) - - # Determine table from whichever side has the event - if in_lm: - tbl_row = conn.execute( - "SELECT table_name FROM lm_events WHERE event_id = ? LIMIT 1", - (eid,)).fetchone() - else: - tbl_row = conn.execute( - "SELECT table_name FROM olr_events WHERE event_id = ? LIMIT 1", - (eid,)).fetchone() - event_table = tbl_row['table_name'] if tbl_row else '?' - is_lob = event_table.split('.')[-1].upper() in LOB_TABLES - - if in_lm and not in_olr: - total_missing_olr += 1 - if is_tail: - total_tail_lm += 1 - else: - total_mismatches += 1 - print(f"[MISSING_OLR] {eid} ({event_table})", flush=True) - total_validated += 1 - continue - - if in_olr and not in_lm: - total_missing_lm += 1 - if is_tail: - total_tail_olr += 1 - else: - total_mismatches += 1 - print(f"[EXTRA_OLR] {eid} ({event_table})", flush=True) - total_validated += 1 - continue - - # Both sides have the event — compare. - lm_recs = conn.execute( - "SELECT * FROM lm_events WHERE event_id = ? ORDER BY seq", - (eid,) - ).fetchall() - olr_recs = conn.execute( - "SELECT * FROM olr_events WHERE event_id = ? ORDER BY seq", - (eid,) - ).fetchall() - - if is_lob: - # LOB tables: replay ops into final state, compare end result. - # Both sides use LogMiner (expected=full LM, actual=LOB-only LM), - # so they should produce identical final state. - lm_state, lm_exists = replay_final_state(lm_recs) - olr_state, olr_exists = replay_final_state(olr_recs) - - if lm_exists != olr_exists: - total_mismatches += 1 - print(f"[LOB_EXISTENCE] {eid} ({event_table}): " - f"LM exists={lm_exists} OLR exists={olr_exists}", - flush=True) - total_validated += 1 - else: - diffs = compare_values(lm_state, olr_state, - event_table, 'after') - if diffs: - total_mismatches += 1 - print(f"[LOB_VALUE_DIFF] {eid} ({event_table}):", - flush=True) - for d in diffs[:5]: - print(d, flush=True) - else: - total_matched += 1 - total_validated += 1 - continue - - # Non-LOB tables: compare per (event_id, seq) directly. - # Seq numbers are absolute for non-LOB (no merge/phantom issues). - lm_by_seq = {r['seq']: r for r in lm_recs} - olr_by_seq = {r['seq']: r for r in olr_recs} - all_seqs = sorted(set(lm_by_seq.keys()) | set(olr_by_seq.keys())) - - for seq in all_seqs: - lm_r = lm_by_seq.get(seq) - olr_r = olr_by_seq.get(seq) - - if lm_r and not olr_r: - total_missing_olr += 1 - total_mismatches += 1 - print(f"[MISSING_OLR] {eid} seq={seq} " - f"({lm_r['op']} {lm_r['table_name']})", - flush=True) - total_validated += 1 - continue - - if olr_r and not lm_r: - total_missing_lm += 1 - total_mismatches += 1 - print(f"[EXTRA_OLR] {eid} seq={seq} " - f"({olr_r['op']} {olr_r['table_name']})", - flush=True) - total_validated += 1 - continue - - # Both have this seq — compare - if lm_r['table_name'] != olr_r['table_name'] or \ - lm_r['op'] != olr_r['op']: - total_mismatches += 1 - print(f"[MISMATCH] {eid} seq={seq}: " - f"LM={lm_r['op']} {lm_r['table_name']}, " - f"OLR={olr_r['op']} {olr_r['table_name']}", - flush=True) - total_validated += 1 - continue - - lm_evt = json.loads(lm_r['raw_json']) - olr_evt = json.loads(olr_r['raw_json']) - lm_after = normalize_columns(lm_evt.get('after')) - olr_after = normalize_columns(olr_evt.get('after')) - lm_before = normalize_columns(lm_evt.get('before')) - olr_before = normalize_columns(olr_evt.get('before')) - - diffs = compare_values(lm_after, olr_after, - lm_r['table_name'], 'after') - diffs.extend(compare_values(lm_before, olr_before, - lm_r['table_name'], 'before')) - if diffs: - total_mismatches += 1 - print(f"[VALUE_DIFF] {eid} seq={seq} " - f"({lm_r['op']} {lm_r['table_name']}):", - flush=True) - for d in diffs[:5]: - print(d, flush=True) - else: - total_matched += 1 - - total_validated += 1 - - # Advance per-node cursors - for node_prefix, nf in node_frontiers.items(): - cursor_by_node[node_prefix] = nf - - # Progress report - frontier_str = ','.join(f'{k}={v}' for k, v in sorted(cursor_by_node.items())) - tail_str = (f" tail_olr={total_tail_olr} tail_lm={total_tail_lm}" - if total_tail_olr or total_tail_lm else "") - print(f"[validator] validated={total_validated} matched={total_matched} " - f"mismatches={total_mismatches} " - f"missing_olr={total_missing_olr} extra_olr={total_missing_lm}" - f"{tail_str} " - f"lm_total={lm_count} olr_total={olr_count} " - f"frontier={frontier_str}", flush=True) + # Try a validation cycle (safe frontier only) + result = validate_cycle(conn, cursor_by_node, safe_frontier) + (v, m, mm, mo, ml, to_, tl, lmc, oc, nf) = result + total_validated += v + total_matched += m + total_mismatches += mm + total_missing_olr += mo + total_missing_lm += ml + total_tail_olr += to_ + total_tail_lm += tl + + if v > 0: + # Purge old events after each validation cycle + purged = purge_old_events(conn, PURGE_TTL_HOURS) + frontier_str = ','.join( + f'{k}={v_}' for k, v_ in sorted(cursor_by_node.items())) + tail_str = (f" tail_olr={total_tail_olr} tail_lm={total_tail_lm}" + if total_tail_olr or total_tail_lm else "") + purge_str = f" purged={purged}" if purged else "" + print(f"[validator] validated={total_validated} " + f"matched={total_matched} " + f"mismatches={total_mismatches} " + f"missing_olr={total_missing_olr} " + f"extra_olr={total_missing_lm}" + f"{tail_str}{purge_str} " + f"lm_total={lmc} olr_total={oc} " + f"frontier={frontier_str}", flush=True) + elif time.time() - last_new_events > IDLE_TIMEOUT: + # Idle timeout — do final widened pass + print(f"[validator] Idle timeout ({IDLE_TIMEOUT}s). " + f"Final validation pass...", flush=True) + result = validate_cycle( + conn, cursor_by_node, safe_frontier, widen=True) + (v, m, mm, mo, ml, to_, tl, lmc, oc, _) = result + total_validated += v + total_matched += m + total_mismatches += mm + total_missing_olr += mo + total_missing_lm += ml + total_tail_olr += to_ + total_tail_lm += tl + break except KeyboardInterrupt: pass finally: conn.close() - # Final summary - print(f"\n{'='*60}", flush=True) - print(f" Fuzz Test Validation Summary", flush=True) - print(f"{'='*60}", flush=True) - print(f" Total validated: {total_validated}", flush=True) - print(f" Matched: {total_matched}", flush=True) - print(f" Mismatches: {total_mismatches}", flush=True) - print(f" Missing from OLR: {total_missing_olr}", flush=True) - print(f" Extra in OLR: {total_missing_lm}", flush=True) - if total_tail_olr or total_tail_lm: - print(f" Tail (OLR ahead): {total_tail_olr}", flush=True) - print(f" Tail (LM ahead): {total_tail_lm}", flush=True) + # Emit resumable cursor for soak loop (safe frontier only — never widened, + # so late-arriving lagging-side events are re-picked-up next cycle). + final = safe_frontier if any(safe_frontier.values()) else cursor_by_node + print(f"[validator] final_cursor={format_cursor(final)}", flush=True) - if total_mismatches > 0: - print(f"\n RESULT: FAIL ({total_mismatches} unexpected mismatches)", - flush=True) - sys.exit(1) - else: - print("\n RESULT: PASS", flush=True) - if total_tail_olr + total_tail_lm > 0: - print(f" ({total_tail_olr + total_tail_lm} tail events)", flush=True) - sys.exit(0) + rc = print_summary(total_validated, total_matched, total_mismatches, + total_missing_olr, total_missing_lm, + total_tail_olr, total_tail_lm) + sys.exit(rc) if __name__ == '__main__':