Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions tests/dbz-twin/rac/archive-cleanup/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM alpine:3.20

ARG SUPERCRONIC_VERSION=v0.2.44
ARG SUPERCRONIC_SHA=6eb0a8e1e6673675dc67668c1a9b6409f79c37bc
ARG SUPERCRONIC_URL=https://github.com/aptible/supercronic/releases/download/${SUPERCRONIC_VERSION}/supercronic-linux-amd64

RUN apk add --no-cache openssh-client curl \
&& curl -fsSL -o /usr/local/bin/supercronic "${SUPERCRONIC_URL}" \
&& echo "${SUPERCRONIC_SHA} /usr/local/bin/supercronic" | sha1sum -c - \
&& chmod +x /usr/local/bin/supercronic \
&& apk del curl

ENTRYPOINT ["/usr/local/bin/supercronic", "-passthrough-logs"]
CMD ["/etc/crontab"]
5 changes: 5 additions & 0 deletions tests/dbz-twin/rac/archive-cleanup/crontab
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Archive log cleanup — delete archive logs older than 1 day on the RAC VM.
# Runs hourly. Each deleted file is printed to stdout (-> docker logs).
#
# Env vars (passed through by supercronic): VM_HOST, VM_USER
0 * * * * ssh -i /vm-key -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $VM_USER@$VM_HOST "find /shared/redo/archivelog -mtime +1 -print -delete" | awk 'NF{print "deleted: "$0}'
13 changes: 13 additions & 0 deletions tests/dbz-twin/rac/docker-compose-fuzz.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ services:
LM_TOPIC: lm-events
OLR_TOPIC: olr-events
OLR_LOB_TOPIC: olr-lob-events
PURGE_TTL_HOURS: "${PURGE_TTL_HOURS:-24}"
volumes:
- ./kafka-consumer.py:/app/kafka-consumer.py:ro
- fuzz-data:/app/data
Expand All @@ -104,6 +105,18 @@ services:
- ./validator.py:/app/validator.py:ro
- fuzz-data:/app/data

archive-cleanup:
build:
context: ./archive-cleanup
image: fuzz-archive-cleanup:latest
container_name: fuzz-archive-cleanup
environment:
VM_HOST: ${VM_HOST:?VM_HOST is required}
VM_USER: ${VM_USER:-root}
volumes:
- ${VM_KEY:?VM_KEY is required}:/vm-key:ro
- ./archive-cleanup/crontab:/etc/crontab:ro

volumes:
dbz-logminer-data:
dbz-olr-data:
Expand Down
26 changes: 18 additions & 8 deletions tests/dbz-twin/rac/fuzz-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ DB_CONN2="${DB_CONN2:-olr_test/olr_test@//racnodep2:1521/ORCLPDB}"

OLR_CONTAINER="olr-debezium"
COMPOSE_FILE="$SCRIPT_DIR/docker-compose-fuzz.yaml"
WORK_DIR="/tmp/fuzz_soak_state"

# ---- SSH helpers ----
_vm_sqlplus() {
Expand Down Expand Up @@ -203,7 +204,7 @@ action_up() {
echo " Waiting for OLR..."
for i in $(seq 1 90); do
if ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \
"podman logs $OLR_CONTAINER 2>&1 | grep -q 'processing redo log'"; then
"podman logs $OLR_CONTAINER 2>&1 | grep 'processing redo log' > /dev/null"; then
echo " OLR: ready"
break
fi
Expand All @@ -215,9 +216,9 @@ action_up() {
echo " Waiting for Debezium connectors..."
for i in $(seq 1 90); do
LM_OK=false; OLR_OK=false; LOB_LM_OK=false
docker logs fuzz-dbz-logminer 2>&1 | grep -q "Starting streaming" && LM_OK=true
docker logs fuzz-dbz-olr 2>&1 | grep -q "streaming client started\|Starting streaming" && OLR_OK=true
docker logs fuzz-dbz-lob-logminer 2>&1 | grep -q "Starting streaming" && LOB_LM_OK=true
docker logs fuzz-dbz-logminer 2>&1 | grep "Starting streaming" > /dev/null && LM_OK=true
docker logs fuzz-dbz-olr 2>&1 | grep -E "streaming client started|Starting streaming" > /dev/null && OLR_OK=true
docker logs fuzz-dbz-lob-logminer 2>&1 | grep "Starting streaming" > /dev/null && LOB_LM_OK=true
if $LM_OK && $OLR_OK && $LOB_LM_OK; then
echo " Debezium: ready (3 connectors)"
break
Expand All @@ -226,6 +227,8 @@ action_up() {
sleep 2
done

mkdir -p "$WORK_DIR"

echo ""
echo " OLR memory: $(_olr_memory_mb) MB"
echo ""
Expand Down Expand Up @@ -305,12 +308,14 @@ SQL
if [[ -n "$done1" ]]; then
echo " Node 1: $done1"
else
echo " Node 1: workload finished (no summary line)"
echo " Node 1: workload finished (no FUZZ_DONE line) — sqlplus output:"
sed 's/^/ | /' "$work_dir/fuzz_out1.log"
fi
if [[ -n "$done2" ]]; then
echo " Node 2: $done2"
else
echo " Node 2: workload finished (no summary line)"
echo " Node 2: workload finished (no FUZZ_DONE line) — sqlplus output:"
sed 's/^/ | /' "$work_dir/fuzz_out2.log"
fi

if [[ $rc1 -ne 0 || $rc2 -ne 0 ]]; then
Expand Down Expand Up @@ -399,9 +404,12 @@ action_validate() {
final_counts=$(docker logs --tail 1 fuzz-consumer 2>/dev/null | grep -o '\[consumer\].*' || echo "unknown")
echo " Consumer idle for 30s. Last status: $final_counts"

# Start validator (uses 'validate' profile)
# Start validator (uses 'validate' profile).
# START_CURSOR env (optional) is forwarded to the container so soak loops
# can resume past already-validated events without re-scanning.
local exit_code=0
docker compose -f "$COMPOSE_FILE" run --rm validator || exit_code=$?
docker compose -f "$COMPOSE_FILE" run --rm \
-e "START_CURSOR=${START_CURSOR:-}" validator || exit_code=$?
echo ""
echo " OLR memory: $(_olr_memory_mb) MB"

Expand Down Expand Up @@ -450,9 +458,11 @@ action_logs() {

action_down() {
echo "=== Stopping fuzz test infrastructure ==="

docker compose -f "$COMPOSE_FILE" down -v 2>/dev/null
ssh $_SSH_OPTS "${VM_USER}@${VM_HOST}" \
"podman stop -t5 $OLR_CONTAINER 2>/dev/null; podman rm $OLR_CONTAINER 2>/dev/null; true"
rm -rf "$WORK_DIR"
echo " Done."
}

Expand Down
33 changes: 27 additions & 6 deletions tests/dbz-twin/rac/kafka-consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP', 'localhost:9092')
SQLITE_DB = os.environ.get('SQLITE_DB', '/app/data/fuzz.db')
PURGE_TTL_HOURS = int(os.environ.get('PURGE_TTL_HOURS', '24'))

OP_MAP = {'c': 'INSERT', 'u': 'UPDATE', 'd': 'DELETE'}

Expand All @@ -30,9 +31,10 @@
def init_db(db_path):
"""Create SQLite database and tables."""
os.makedirs(os.path.dirname(db_path) or '.', exist_ok=True)
conn = sqlite3.connect(db_path)
conn = sqlite3.connect(db_path, timeout=30)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA busy_timeout=30000")
conn.execute("""
CREATE TABLE IF NOT EXISTS lm_events (
event_id TEXT NOT NULL,
Expand Down Expand Up @@ -171,6 +173,7 @@ def main():
batch = []
batch_start = time.time()
last_report = time.time()
last_seq_cleanup = time.time()

try:
while True:
Expand Down Expand Up @@ -226,11 +229,29 @@ def main():
batch = []
batch_start = time.time()

# Seq maps grow with each unique event_id but entries are small
# (string key -> int value). For a 60-min run with ~30K events,
# this is ~2MB. Do not trim — trimming caused seq reset bugs
# where later events for the same event_id got seq=0 again,
# overwriting earlier events via INSERT OR REPLACE.
# Purge seq dict entries for events older than TTL.
# Safe because events outside the 24h window will never receive
# new Kafka messages — they're well past Kafka's retention.
now = time.time()
if now - last_seq_cleanup >= 600: # every 10 minutes
cutoff = now - PURGE_TTL_HOURS * 3600
for tbl, seq_map in [('lm_events', lm_seq),
('olr_events', olr_seq)]:
old_eids = set()
rows = conn.execute(
f"SELECT DISTINCT event_id FROM {tbl} "
"WHERE consumed_at < ?", (cutoff,)).fetchall()
old_eids = {r[0] for r in rows}
pruned = 0
for eid in old_eids:
if eid in seq_map:
del seq_map[eid]
pruned += 1
if pruned:
print(f"[consumer] Pruned {pruned} seq entries "
f"from {tbl} (older than {PURGE_TTL_HOURS}h)",
flush=True)
last_seq_cleanup = now

# Report progress every 30 seconds
now = time.time()
Expand Down
Loading
Loading