diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index dd929ed0cb098..7f97d650f2a69 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -264,7 +264,7 @@ steps: - common-ancestor - id: workload-replay - label: "Workload Replay (1% initial data)" + label: "Workload Replay (10% initial data)" depends_on: build-x86_64 timeout_in_minutes: 240 parallelism: 5 @@ -273,7 +273,7 @@ steps: composition: workload-replay run: benchmark args: - - --factor-initial-data=0.01 + - --factor-initial-data=0.1 - --compare-against - common-ancestor agents: diff --git a/ci/release-qualification/pipeline.template.yml b/ci/release-qualification/pipeline.template.yml index f703e3f536f3f..40915b622c92d 100644 --- a/ci/release-qualification/pipeline.template.yml +++ b/ci/release-qualification/pipeline.template.yml @@ -195,7 +195,7 @@ steps: - 1200 - id: long-workload-replay - label: "Long Workload Replay (10% initial data)" + label: "Long Workload Replay (100% initial data)" depends_on: build-x86_64 timeout_in_minutes: 1200 parallelism: 3 @@ -204,7 +204,7 @@ steps: composition: workload-replay run: benchmark args: - - --factor-initial-data=0.1 + - --factor-initial-data=1 - --runtime=3600 - --compare-against - common-ancestor diff --git a/misc/python/materialize/feature_benchmark/benchmark.py b/misc/python/materialize/feature_benchmark/benchmark.py index 39f5c78358de3..cbfa3c73907c9 100644 --- a/misc/python/materialize/feature_benchmark/benchmark.py +++ b/misc/python/materialize/feature_benchmark/benchmark.py @@ -65,6 +65,9 @@ def create_scenario_instance(self) -> Scenario: elif float(self._scale) > 0: scale = float(self._scale) + if self._scenario_cls.MAX_SCALE is not None: + scale = min(scale, self._scenario_cls.MAX_SCALE) + scenario_class = self._scenario_cls return scenario_class( scale=scale, diff --git a/misc/python/materialize/feature_benchmark/scenario.py b/misc/python/materialize/feature_benchmark/scenario.py index 2bbcf9e7819c1..7fb3b5bb82f08 100644 --- a/misc/python/materialize/feature_benchmark/scenario.py +++ b/misc/python/materialize/feature_benchmark/scenario.py @@ -21,6 +21,7 @@ class RootScenario: SCALE: float = 6 FIXED_SCALE: bool = False # Will --scale=N have effect on the scenario + MAX_SCALE: float | None = None # Cap scale to this value when set RELATIVE_THRESHOLD: dict[MeasurementType, float] = { MeasurementType.WALLCLOCK: 0.10, # Increased the other measurements since they are easy to regress now diff --git a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py index 48760b7e62efd..e4cf263d58c71 100644 --- a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py +++ b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py @@ -2205,6 +2205,8 @@ class StartupLoaded(Scenario): """Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something""" SCALE = 1 # 10 objects of each kind + # Can not scale to 100s of objects + MAX_SCALE = 1.5 def shared(self) -> Action: return TdAction( diff --git a/misc/python/materialize/workload_replay/column.py b/misc/python/materialize/workload_replay/column.py index 262c7d4d71fca..5e12a96c9b9e8 100644 --- a/misc/python/materialize/workload_replay/column.py +++ b/misc/python/materialize/workload_replay/column.py @@ -22,9 +22,9 @@ from pg8000.native import literal from materialize.workload_replay.util import ( - long_tail_choice, long_tail_float, long_tail_int, + long_tail_rank, long_tail_text, ) @@ -41,8 +41,9 @@ def __init__( self.default = default self.chars = string.ascii_letters + string.digits self.data_shape = data_shape - if data_shape: - assert typ in ("text", "bytea"), f"Can't create text shape for type {typ}" + + self._years = list(range(2019, 2026)) + self._seq_counter = 0 self._hot_strings = [ f"{name}_a", @@ -56,6 +57,43 @@ def __init__( "NULL", ] + def _shaped_text(self, rng: random.Random) -> str | None: + """Generate text according to data_shape, or None if not applicable.""" + if self.data_shape == "datetime": + return self._random_datetime(rng) + elif self.data_shape == "random": + length = rng.randrange(5, 40) + return "".join(rng.choice(self.chars) for _ in range(length)) + elif self.data_shape == "uuid": + return str(uuid.UUID(int=rng.getrandbits(128), version=4)) + elif self.data_shape == "sequential": + self._seq_counter += 1 + return f"{self.name}_{self._seq_counter}" + elif self.data_shape == "zipfian": + rank = long_tail_rank(n=10000, a=1.3, rng=rng) + return f"{self.name}_{rank}" + elif self.data_shape is not None and self.data_shape != "duration": + raise ValueError(f"Unhandled data_shape {self.data_shape!r}") + return None + + def _shaped_float(self, rng: random.Random) -> float | None: + """Generate a float according to data_shape, or None if not applicable.""" + if self.data_shape == "duration": + return round(rng.uniform(10.0, 1800.0), 2) + return None + + def _random_date(self, rng: random.Random) -> str: + """Generate a uniformly random date string.""" + year = rng.choice(self._years) + return f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}" + + def _random_datetime(self, rng: random.Random) -> str: + """Generate a uniformly random datetime string.""" + return ( + f"{self._random_date(rng)}" + f"T{rng.randrange(0, 24):02}:{rng.randrange(0, 60):02}:{rng.randrange(0, 60):02}Z" + ) + def avro_type(self) -> str | list[str]: """Return the Avro type for this column.""" result = self.typ @@ -96,21 +134,21 @@ def kafka_value(self, rng: random.Random) -> Any: return long_tail_int(0, 18446744073709551615, rng=rng) elif self.typ in ("float", "double precision", "numeric"): + shaped = self._shaped_float(rng) + if shaped is not None: + return shaped return long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng) elif self.typ in ("text", "bytea"): - if self.data_shape == "datetime": - year = long_tail_choice( - [2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng - ) - return literal( - f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}T{rng.randrange(0, 23):02}:{rng.randrange(0, 59):02}:{rng.randrange(0, 59):02}Z" - ) - elif self.data_shape: - raise ValueError(f"Unhandled text shape {self.data_shape}") + shaped = self._shaped_text(rng) + if shaped is not None: + return literal(shaped) return literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) elif self.typ in ("character", "character varying"): + shaped = self._shaped_text(rng) + if shaped is not None: + return literal(shaped) return literal(long_tail_text(self.chars, 10, self._hot_strings, rng=rng)) elif self.typ == "uuid": @@ -123,23 +161,15 @@ def kafka_value(self, rng: random.Random) -> Any: return json.dumps(result) elif self.typ in ("timestamp with time zone", "timestamp without time zone"): - now = 1700000000000 # doesn't need to be exact - if rng.random() < 0.9: - return now + long_tail_int(-86_400_000, 86_400_000, rng=rng) - else: - return rng.randrange(0, 9223372036854775807) + # Epoch millis spread uniformly across 2019–2025 + # 2019-01-01 = 1546300800000, 2026-01-01 = 1767225600000 + return rng.randrange(1546300800000, 1767225600000) elif self.typ == "mz_timestamp": - year = long_tail_choice( - [2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng - ) - return literal(f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}") + return literal(self._random_date(rng)) elif self.typ == "date": - year = long_tail_choice( - [2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng - ) - return literal(f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}") + return literal(self._random_date(rng)) elif self.typ == "time": if rng.random() < 0.8: @@ -150,19 +180,22 @@ def kafka_value(self, rng: random.Random) -> Any: ) elif self.typ == "int2range": - a = str(long_tail_int(-32768, 32767, rng=rng)) - b = str(long_tail_int(-32768, 32767, rng=rng)) - return literal(f"[{a},{b})") + a = long_tail_int(-32768, 32767, rng=rng) + b = long_tail_int(-32768, 32767, rng=rng) + lo, hi = min(a, b), max(a, b) + return literal(f"[{lo},{hi})") elif self.typ == "int4range": - a = str(long_tail_int(-2147483648, 2147483647, rng=rng)) - b = str(long_tail_int(-2147483648, 2147483647, rng=rng)) - return literal(f"[{a},{b})") + a = long_tail_int(-2147483648, 2147483647, rng=rng) + b = long_tail_int(-2147483648, 2147483647, rng=rng) + lo, hi = min(a, b), max(a, b) + return literal(f"[{lo},{hi})") elif self.typ == "int8range": - a = str(long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)) - b = str(long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)) - return literal(f"[{a},{b})") + a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) + b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) + lo, hi = min(a, b), max(a, b) + return literal(f"[{lo},{hi})") elif self.typ == "map": return { @@ -216,27 +249,23 @@ def value(self, rng: random.Random, in_query: bool = True) -> Any: return str(val) if in_query else val elif self.typ in ("float", "double precision", "numeric"): + shaped = self._shaped_float(rng) + if shaped is not None: + return str(shaped) if in_query else shaped val = long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng) return str(val) if in_query else val elif self.typ in ("text", "bytea"): - if self.data_shape == "datetime": - year = long_tail_choice( - [2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng - ) - s = ( - f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}" - f"T{rng.randrange(0, 23):02}:{rng.randrange(0, 59):02}:{rng.randrange(0, 59):02}Z" - ) - return literal(s) if in_query else s - - elif self.data_shape: - raise ValueError(f"Unhandled text shape {self.data_shape}") - + shaped = self._shaped_text(rng) + if shaped is not None: + return literal(shaped) if in_query else shaped s = long_tail_text(self.chars, 100, self._hot_strings, rng=rng) return literal(s) if in_query else s elif self.typ in ("character", "character varying"): + shaped = self._shaped_text(rng) + if shaped is not None: + return literal(shaped) if in_query else shaped s = long_tail_text(self.chars, 10, self._hot_strings, rng=rng) return literal(s) if in_query else s @@ -254,24 +283,15 @@ def value(self, rng: random.Random, in_query: bool = True) -> Any: return json.dumps(obj) elif self.typ in ("timestamp with time zone", "timestamp without time zone"): - year = long_tail_choice( - [2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng - ) - s = f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}" + s = self._random_date(rng) return literal(s) if in_query else s elif self.typ == "mz_timestamp": - year = long_tail_choice( - [2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng - ) - s = f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}" + s = self._random_date(rng) return literal(s) if in_query else s elif self.typ == "date": - year = long_tail_choice( - [2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng - ) - s = f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}" + s = self._random_date(rng) return literal(s) if in_query else s elif self.typ == "time": @@ -288,21 +308,24 @@ def value(self, rng: random.Random, in_query: bool = True) -> Any: return literal(s) if in_query else s elif self.typ == "int2range": - a = str(long_tail_int(-32768, 32767, rng=rng)) - b = str(long_tail_int(-32768, 32767, rng=rng)) - s = f"[{a},{b})" + a = long_tail_int(-32768, 32767, rng=rng) + b = long_tail_int(-32768, 32767, rng=rng) + lo, hi = min(a, b), max(a, b) + s = f"[{lo},{hi})" return literal(s) if in_query else s elif self.typ == "int4range": - a = str(long_tail_int(-2147483648, 2147483647, rng=rng)) - b = str(long_tail_int(-2147483648, 2147483647, rng=rng)) - s = f"[{a},{b})" + a = long_tail_int(-2147483648, 2147483647, rng=rng) + b = long_tail_int(-2147483648, 2147483647, rng=rng) + lo, hi = min(a, b), max(a, b) + s = f"[{lo},{hi})" return literal(s) if in_query else s elif self.typ == "int8range": - a = str(long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)) - b = str(long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)) - s = f"[{a},{b})" + a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) + b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) + lo, hi = min(a, b), max(a, b) + s = f"[{lo},{hi})" return literal(s) if in_query else s elif self.typ == "map": diff --git a/misc/python/materialize/workload_replay/data.py b/misc/python/materialize/workload_replay/data.py index 9173f78e5e5ac..140f0726ee037 100644 --- a/misc/python/materialize/workload_replay/data.py +++ b/misc/python/materialize/workload_replay/data.py @@ -14,8 +14,11 @@ from __future__ import annotations import asyncio +import os import random import threading +import time +from concurrent.futures import Future, ProcessPoolExecutor, as_completed from typing import Any import psycopg @@ -25,96 +28,272 @@ from materialize.util import PropagatingThread from materialize.workload_replay.column import Column from materialize.workload_replay.config import SEED_RANGE -from materialize.workload_replay.ingest import ingest, ingest_webhook +from materialize.workload_replay.ingest import ( + delivery_report, + get_kafka_objects, + ingest, + ingest_webhook, +) +from materialize.workload_replay.util import ( + get_kafka_topic, + get_mysql_reference_db_table, + get_postgres_reference_db_schema_table, +) + +_NUM_WORKERS = min(os.cpu_count() or 4, 16) +_CHUNK_ROWS = 100_000 + + +# Subprocess workers (run in forked children, must use only picklable args) +def _copy_chunk( + conn_params: dict[str, Any], + table_fqn: list[str], + column_dicts: list[dict[str, Any]], + num_rows: int, + rng_seed: int, +) -> int: + """Generate random data and COPY to Postgres or Materialize.""" + rng = random.Random(rng_seed) + columns = [ + Column(c["name"], c["type"], c["nullable"], c["default"], c.get("data_shape")) + for c in column_dicts + ] + col_names = [c.name for c in columns] + + conn = psycopg.connect(**conn_params) + conn.autocommit = True + table_ident = SQL(".").join(map(Identifier, table_fqn)) + copy_stmt = SQL("COPY {} ({}) FROM STDIN").format( + table_ident, + SQL(", ").join(map(Identifier, col_names)), + ) -def create_initial_data_requiring_mz( - c: Composition, - workload: dict[str, Any], - factor_initial_data: float, - rng: random.Random, -) -> bool: - """Create initial data that requires Materialize to be running (tables, webhooks).""" batch_size = 10000 - created_data = False - - conn = psycopg.connect( - host="127.0.0.1", - port=c.port("materialized", 6877), - user="mz_system", - password="materialize", - dbname="materialize", + with conn.cursor() as cur: + for start in range(0, num_rows, batch_size): + batch_rows = min(batch_size, num_rows - start) + with cur.copy(copy_stmt) as copy: + for _ in range(batch_rows): + row = [c.value(rng, in_query=False) for c in columns] + copy.write_row(row) + + conn.close() + return num_rows + + +def _kafka_chunk( + kafka_port: int, + sr_port: int, + topic: str, + debezium: bool, + column_dicts: list[dict[str, Any]], + num_rows: int, + rng_seed: int, +) -> int: + """Generate random data and produce to Kafka.""" + rng = random.Random(rng_seed) + columns = [ + Column(c["name"], c["type"], c["nullable"], c["default"], c.get("data_shape")) + for c in column_dicts + ] + + producer, serializer, key_serializer, sctx, ksctx, col_names = get_kafka_objects( + topic, + tuple(columns), + debezium, + sr_port, + kafka_port, ) - conn.autocommit = True - for db, schemas in workload["databases"].items(): - for schema, items in schemas.items(): - for name, table in items["tables"].items(): - num_rows = int(table["rows"] * factor_initial_data) - if not num_rows: - continue - - data_columns = [ - Column( - col["name"], - col["type"], - col["nullable"], - col["default"], - col.get("data_shape"), + now_ms = int(time.time() * 1000) + source_struct: dict[str, Any] | None = None + if debezium: + source_struct = { + "version": "0", + "connector": "mysql", + "name": "materialize-generator", + "ts_ms": now_ms, + "snapshot": None, + "db": "db", + "sequence": None, + "table": topic.split(".")[-1], + "server_id": 0, + "gtid": None, + "file": "binlog.000001", + "pos": 0, + "row": 0, + "thread": None, + "query": None, + } + + producer.poll(0) + for _ in range(num_rows): + row = [col.kafka_value(rng) for col in columns] + while True: + try: + if debezium: + after_value = dict(zip(col_names, row)) + envelope_value = { + "before": None, + "after": after_value, + "source": source_struct, + "op": "c", + "ts_ms": now_ms, + "transaction": None, + } + producer.produce( + topic=topic, + key=key_serializer(after_value, ksctx), + value=serializer(envelope_value, sctx), + on_delivery=delivery_report, + ) + else: + key_dict = {col_names[0]: row[0]} + value_dict = dict(zip(col_names[1:], row[1:])) + producer.produce( + topic=topic, + key=key_serializer(key_dict, ksctx), + value=serializer(value_dict, sctx), + on_delivery=delivery_report, ) - for col in table["columns"] - ] + break + except BufferError: + producer.poll(0.01) + producer.poll(0) - print(f"Creating {num_rows} rows for {db}.{schema}.{name}:") + producer.flush() + return num_rows - col_names = [col.name for col in data_columns] - with conn.cursor() as cur: - for start in range(0, num_rows, batch_size): - progress = min(start + batch_size, num_rows) - print( - f"{progress}/{num_rows} ({progress / num_rows:.1%})", - end="\r", - flush=True, - ) +def _mysql_chunk( + conn_params: dict[str, Any], + table: str, + column_dicts: list[dict[str, Any]], + num_rows: int, + rng_seed: int, +) -> int: + """Generate random data and INSERT into MySQL.""" + import pymysql - copy_stmt = SQL("COPY {}.{}.{} ({}) FROM STDIN").format( - Identifier(db), - Identifier(schema), - Identifier(name), - SQL(", ").join(map(Identifier, col_names)), - ) + rng = random.Random(rng_seed) + columns = [ + Column(c["name"], c["type"], c["nullable"], c["default"], c.get("data_shape")) + for c in column_dicts + ] - with cur.copy(copy_stmt) as copy: - batch_rows = min(batch_size, num_rows - start) - for _ in range(batch_rows): - row = [ - col.value(rng, in_query=False) - for col in data_columns - ] - copy.write_row(row) - created_data = True + conn = pymysql.connect(**conn_params) - for name, source in items["sources"].items(): - if source["type"] == "webhook": - num_rows = int(source["messages_total"] * factor_initial_data) + batch_size = 10000 + for start in range(0, num_rows, batch_size): + batch_rows = min(batch_size, num_rows - start) + rows_sql = [] + for _ in range(batch_rows): + row = [col.value(rng) for col in columns] + rows_sql.append("(" + ", ".join(row) + ")") + stmt = f"INSERT INTO {table} VALUES " + ", ".join(rows_sql) + with conn.cursor() as cur: + cur.execute(stmt) + + conn.close() + return num_rows + + +def _submit_chunks( + pool: ProcessPoolExecutor, + worker_fn: Any, + args: tuple[Any, ...], + column_dicts: list[dict[str, Any]], + num_rows: int, + pretty_name: str, + rng: random.Random, + futures: dict[Future[int], str], + totals: dict[str, int], +) -> None: + """Split *num_rows* into _CHUNK_ROWS-sized pieces and submit them.""" + totals[pretty_name] = totals.get(pretty_name, 0) + num_rows + remaining = num_rows + while remaining > 0: + n = min(_CHUNK_ROWS, remaining) + seed = rng.randrange(SEED_RANGE) + f = pool.submit(worker_fn, *args, column_dicts, n, seed) + futures[f] = pretty_name + remaining -= n + + +def _await_futures( + futures: dict[Future[int], str], + totals: dict[str, int], +) -> None: + completed: dict[str, int] = {} + last_pct: dict[str, int] = {} + for future in as_completed(futures): + name = futures[future] + completed[name] = completed.get(name, 0) + future.result() + done, total = completed[name], totals[name] + bucket = int(done * 100 / total) // 5 + if bucket > last_pct.get(name, -1): + last_pct[name] = bucket + print(f" {name}: {done:,}/{total:,} ({done / total:.1%})") + + +def create_initial_data_requiring_mz( + c: Composition, + workload: dict[str, Any], + factor_initial_data: float, + rng: random.Random, +) -> bool: + """Create initial data that requires Materialize to be running (tables, webhooks).""" + mz_conn = { + "host": "127.0.0.1", + "port": c.port("materialized", 6877), + "user": "mz_system", + "password": "materialize", + "dbname": "materialize", + } + + futures: dict[Future[int], str] = {} + totals: dict[str, int] = {} + webhook_items: list[tuple[str, str, str, dict[str, Any], int]] = [] + + with ProcessPoolExecutor(max_workers=_NUM_WORKERS) as pool: + for db, schemas in workload["databases"].items(): + for schema, items in schemas.items(): + for name, table in items["tables"].items(): + num_rows = int(table["rows"] * factor_initial_data) if not num_rows: continue - print(f"Creating {num_rows} rows for {db}.{schema}.{name}:") - asyncio.run( - ingest_webhook( - c, - db, - schema, - name, - source, - num_rows, - print_progress=True, - ) + _submit_chunks( + pool, + _copy_chunk, + (mz_conn, [db, schema, name]), + table["columns"], + num_rows, + f"{db}.{schema}.{name}", + rng, + futures, + totals, ) - created_data = True - conn.close() - return created_data + for name, source in items["sources"].items(): + if source["type"] == "webhook": + num_rows = int(source["messages_total"] * factor_initial_data) + if num_rows: + webhook_items.append((db, schema, name, source, num_rows)) + + if not futures and not webhook_items: + return False + + if futures: + print(f"Creating {sum(totals.values()):,} rows across {len(totals)} tables") + _await_futures(futures, totals) + + for db, schema, name, source, num_rows in webhook_items: + print(f"Creating {num_rows} rows for {db}.{schema}.{name}:") + asyncio.run( + ingest_webhook(c, db, schema, name, source, num_rows, print_progress=True) + ) + + return True def create_initial_data_external( @@ -125,78 +304,150 @@ def create_initial_data_external( ) -> bool: """Create initial data in external systems (Postgres, MySQL, Kafka, SQL Server).""" batch_size = 10000 - created_data = False - for db, schemas in workload["databases"].items(): - for schema, items in schemas.items(): - for name, source in items["sources"].items(): - if source["type"] != "webhook" and not source.get("children", {}): - num_rows = int(source["messages_total"] * factor_initial_data) - if not num_rows: - continue - data_columns = [ - Column( - col["name"], - col["type"], - col["nullable"], - col["default"], - col.get("data_shape"), - ) - for col in source["columns"] - ] - print(f"Creating {num_rows} rows for {db}.{schema}.{name}:") - for start in range(0, num_rows, batch_size): - progress = min(start + batch_size, num_rows) - print( - f"{progress}/{num_rows} ({progress / num_rows:.1%})", - end="\r", - flush=True, - ) - ingest( - c, - source, - source, - data_columns, - min(batch_size, num_rows - start), - rng, + + futures: dict[Future[int], str] = {} + totals: dict[str, int] = {} + # SQL Server uses c.testdrive() which isn't picklable, so stays sequential. + sequential: list[ + tuple[dict[str, Any], dict[str, Any], list[dict[str, Any]], int, str] + ] = [] + + # Lazily resolved ports — only looked up when a source of that type is seen. + _ports: dict[str, int] = {} + + def port(service: str) -> int: + if service not in _ports: + _ports[service] = c.default_port(service) + return _ports[service] + + with ProcessPoolExecutor(max_workers=_NUM_WORKERS) as pool: + for db, schemas in workload["databases"].items(): + for schema, items in schemas.items(): + for name, source in items["sources"].items(): + children: list[tuple[dict[str, Any], str]] = [] + if source["type"] != "webhook" and not source.get("children", {}): + children.append((source, f"{db}.{schema}.{name}")) + else: + for cn, child in source.get("children", {}).items(): + children.append((child, f"{db}.{schema}.{name}->{cn}")) + + for child, pretty_name in children: + num_rows = int( + (child.get("messages_total", child.get("rows", 0))) + * factor_initial_data ) - created_data = True - print() - else: - for child_name, child in source.get("children", {}).items(): - num_rows = int(child["messages_total"] * factor_initial_data) if not num_rows: continue - data_columns = [ - Column( - col["name"], - col["type"], - col["nullable"], - col["default"], - col.get("data_shape"), + + st = source["type"] + if st == "postgres": + ref_db, ref_s, ref_t = ( + get_postgres_reference_db_schema_table(child) ) - for col in child["columns"] - ] - print( - f"Creating {num_rows} rows for {db}.{schema}.{name}->{child_name}:" - ) - for start in range(0, num_rows, batch_size): - progress = min(start + batch_size, num_rows) - print( - f"{progress}/{num_rows} ({progress / num_rows:.1%})", - end="\r", - flush=True, + conn = { + "host": "127.0.0.1", + "port": port("postgres"), + "user": "postgres", + "password": "postgres", + "dbname": ref_db, + } + _submit_chunks( + pool, + _copy_chunk, + (conn, [ref_s, ref_t]), + child["columns"], + num_rows, + pretty_name, + rng, + futures, + totals, ) - ingest( - c, - child, - source, - data_columns, - min(batch_size, num_rows - start), + elif st == "kafka": + topic = get_kafka_topic(source) + debezium = "ENVELOPE DEBEZIUM" in child["create_sql"] + _submit_chunks( + pool, + _kafka_chunk, + ( + port("kafka"), + port("schema-registry"), + topic, + debezium, + ), + child["columns"], + num_rows, + pretty_name, rng, + futures, + totals, ) - created_data = True - print() - return created_data + elif st == "mysql": + from materialize.mzcompose.services.mysql import MySql + + ref_database, ref_table = get_mysql_reference_db_table( + child + ) + conn = { + "host": "127.0.0.1", + "user": "root", + "password": MySql.DEFAULT_ROOT_PASSWORD, + "database": ref_database, + "port": port("mysql"), + "autocommit": False, + } + _submit_chunks( + pool, + _mysql_chunk, + (conn, ref_table), + child["columns"], + num_rows, + pretty_name, + rng, + futures, + totals, + ) + elif st == "load-generator": + pass + else: + # sql-server etc. — sequential fallback + sequential.append( + (child, source, child["columns"], num_rows, pretty_name) + ) + + if not futures and not sequential: + return False + + if futures: + print( + f"Creating {sum(totals.values()):,} rows across {len(totals)} sources" + ) + _await_futures(futures, totals) + + for child, source, cols, num_rows, pretty_name in sequential: + data_columns = [ + Column( + col["name"], + col["type"], + col["nullable"], + col["default"], + col.get("data_shape"), + ) + for col in cols + ] + print(f"Creating {num_rows} rows for {pretty_name}:") + for start in range(0, num_rows, batch_size): + progress = min(start + batch_size, num_rows) + print( + f"{progress}/{num_rows} ({progress / num_rows:.1%})", + end="\r", + flush=True, + ) + ingest( + c, child, source, data_columns, min(batch_size, num_rows - start), rng + ) + print() + + return True def create_ingestions( @@ -337,14 +588,7 @@ def continuous_ingestion_source( ) stats["total"] += 1 - ingest( - c, - source, - source, - data_columns, - batch_size, - rng, - ) + ingest(c, source, source, data_columns, batch_size, rng) after = time.time() if after > next_time: @@ -432,12 +676,7 @@ def continuous_ingestion_child( stats["total"] += 1 ingest( - c, - child, - source, - data_columns, - batch_size, - rng, + c, child, source, data_columns, batch_size, rng ) after = time.time() diff --git a/misc/python/materialize/workload_replay/executor.py b/misc/python/materialize/workload_replay/executor.py index 15ddbc7a2e10e..6cbe384c6823f 100644 --- a/misc/python/materialize/workload_replay/executor.py +++ b/misc/python/materialize/workload_replay/executor.py @@ -53,6 +53,35 @@ from materialize.workload_replay.util import print_workload_stats, resolve_tag +def wait_for_freshness(c: Composition) -> None: + print("Waiting for freshness") + time.sleep(10) + prev_lagging: set[str] = set() + while True: + lagging: set[str] = { + entry[0] + for entry in c.sql_query( + """ + SELECT o.name + FROM mz_internal.mz_materialization_lag l + JOIN mz_objects o ON o.id = l.object_id + WHERE o.name NOT LIKE 'mz_%' + AND o.id NOT IN (SELECT id FROM mz_sinks) + AND (l.global_lag IS NULL OR l.global_lag > INTERVAL '10 seconds') + ORDER BY l.global_lag DESC NULLS FIRST + LIMIT 5;""" + ) + } + if lagging: + if lagging != prev_lagging: + print(f" Lagging: {', '.join(sorted(lagging))}") + prev_lagging = lagging + time.sleep(5) + else: + break + print("Freshness complete") + + def test( c: Composition, workload: dict[str, Any], @@ -116,17 +145,17 @@ def test( if not early_initial_data: run_create_objects_part_2(c, services, workload, verbose) stats["object_creation"] = time.time() - start_time - created_data = False - if initial_data: - print("Creating initial data") - stats["initial_data"] = {"docker": [], "time": 0.0} - stats_thread = PropagatingThread( - target=docker_stats, - name="docker-stats", - args=(stats["initial_data"]["docker"], stop_event), - ) - stats_thread.start() - try: + stats["initial_data"] = {"docker": [], "time": 0.0} + stats_thread = PropagatingThread( + target=docker_stats, + name="docker-stats", + args=(stats["initial_data"]["docker"], stop_event), + ) + stats_thread.start() + try: + created_data = False + if initial_data: + print("Creating initial data") start_time = time.time() created_data = create_initial_data_external( c, @@ -137,6 +166,7 @@ def test( if early_initial_data: obj_start = time.time() run_create_objects_part_2(c, services, workload, verbose) + stats["initial_data"]["sources_created_at"] = time.time() stats["object_creation"] += time.time() - obj_start created_data_requiring_mz = create_initial_data_requiring_mz( c, @@ -146,84 +176,60 @@ def test( ) created_data = created_data or created_data_requiring_mz stats["initial_data"]["time"] = time.time() - start_time - if not created_data: - del stats["initial_data"] - finally: - stop_event.set() - stats_thread.join() - stop_event.clear() - elif early_initial_data: - start_time = time.time() - run_create_objects_part_2(c, services, workload, verbose) - stats["object_creation"] += time.time() - start_time + elif early_initial_data: + start_time = time.time() + run_create_objects_part_2(c, services, workload, verbose) + stats["object_creation"] += time.time() - start_time - # Wait for all user objects to hydrate before starting queries. - print("Waiting for hydration") - prev_not_hydrated: list[str] = [] - while True: - not_hydrated: list[str] = [ - entry[0] - for entry in c.sql_query( - """ - SELECT DISTINCT name - FROM ( - SELECT o.name - FROM mz_objects o - JOIN mz_internal.mz_hydration_statuses h - ON o.id = h.object_id - WHERE NOT h.hydrated - AND o.name NOT LIKE 'mz_%' - AND o.id NOT IN (SELECT id FROM mz_sinks) + # Wait for all user objects to hydrate before starting queries. + print("Waiting for hydration") + prev_not_hydrated: list[str] = [] + while True: + not_hydrated: list[str] = [ + entry[0] + for entry in c.sql_query( + """ + SELECT DISTINCT name + FROM ( + SELECT o.name + FROM mz_objects o + JOIN mz_internal.mz_hydration_statuses h + ON o.id = h.object_id + WHERE NOT h.hydrated + AND o.name NOT LIKE 'mz_%' + AND o.id NOT IN (SELECT id FROM mz_sinks) - UNION ALL + UNION ALL - SELECT o.name - FROM mz_objects o - JOIN mz_internal.mz_compute_hydration_statuses h - ON o.id = h.object_id - WHERE NOT h.hydrated - AND o.name NOT LIKE 'mz_%' - AND o.id NOT IN (SELECT id FROM mz_sinks) - ) x - ORDER BY 1;""" - ) - ] - if not_hydrated: - if not_hydrated != prev_not_hydrated: - print(f" Not yet hydrated: {', '.join(not_hydrated)}") - prev_not_hydrated = not_hydrated - time.sleep(1) - else: - break - print("Hydration complete") + SELECT o.name + FROM mz_objects o + JOIN mz_internal.mz_compute_hydration_statuses h + ON o.id = h.object_id + WHERE NOT h.hydrated + AND o.name NOT LIKE 'mz_%' + AND o.id NOT IN (SELECT id FROM mz_sinks) + ) x + ORDER BY 1;""" + ) + ] + if not_hydrated: + if not_hydrated != prev_not_hydrated: + print(f" Not yet hydrated: {', '.join(not_hydrated)}") + prev_not_hydrated = not_hydrated + time.sleep(1) + else: + break + print("Hydration complete") + + wait_for_freshness(c) + finally: + stop_event.set() + stats_thread.join() + stop_event.clear() + + if not created_data: + del stats["initial_data"] - # Wait for all user materializations to be caught up (fresh). - # Sleep first so the system has time to start processing imported data; - # otherwise frontiers haven't advanced yet and everything looks fresh. - print("Waiting for freshness") - time.sleep(10) - while True: - lagging: list[tuple[str, str]] = [ - (entry[0], entry[1]) - for entry in c.sql_query( - """ - SELECT o.name, COALESCE(l.global_lag, INTERVAL '999 hours')::text - FROM mz_internal.mz_materialization_lag l - JOIN mz_objects o ON o.id = l.object_id - WHERE o.name NOT LIKE 'mz_%' - AND o.id NOT IN (SELECT id FROM mz_sinks) - AND (l.global_lag IS NULL OR l.global_lag > INTERVAL '10 seconds') - ORDER BY l.global_lag DESC NULLS FIRST - LIMIT 5;""" - ) - ] - if lagging: - summary = ", ".join(f"{name} ({lag})" for name, lag in lagging) - print(f" Lagging: {summary}") - time.sleep(5) - else: - break - print("Freshness complete") if run_ingestions: print("Starting continuous ingestions") threads.extend( diff --git a/misc/python/materialize/workload_replay/objects.py b/misc/python/materialize/workload_replay/objects.py index e77b02910c9f8..e539d565f3410 100644 --- a/misc/python/materialize/workload_replay/objects.py +++ b/misc/python/materialize/workload_replay/objects.py @@ -541,6 +541,8 @@ def run_create_objects_part_1( flags=re.DOTALL | re.IGNORECASE, ) + return + def run_create_objects_part_2( c: Composition, services: set[str], workload: dict[str, Any], verbose: bool diff --git a/misc/python/materialize/workload_replay/stats.py b/misc/python/materialize/workload_replay/stats.py index f965e1de06e76..c7ff2e7133785 100644 --- a/misc/python/materialize/workload_replay/stats.py +++ b/misc/python/materialize/workload_replay/stats.py @@ -169,14 +169,14 @@ def upload_plots( class DockerSeries: - """Container for time series data from Docker stats.""" + """Time series data for the materialized container.""" def __init__( self, *, t: list[int], - cpu_percent: dict[str, list[float]], - mem_percent: dict[str, list[float]], + cpu_percent: list[float], + mem_percent: list[float], ) -> None: self.t = t self.cpu_percent = cpu_percent @@ -186,34 +186,17 @@ def __init__( def extract_docker_series( docker_stats: list[tuple[int, dict[str, dict[str, Any]]]] ) -> DockerSeries: - """Extract time series from Docker stats snapshots.""" + """Extract materialized time series from Docker stats snapshots.""" t0 = docker_stats[0][0] - times: list[int] = [ts - t0 for (ts, _snapshot) in docker_stats] + times: list[int] = [] + cpu_percent: list[float] = [] + mem_percent: list[float] = [] - containers: set[str] = set() - for _ts, snapshot in docker_stats: - containers |= set(snapshot.keys()) - - def init_float() -> dict[str, list[float]]: - return {c: [] for c in sorted(containers)} - - cpu_percent = init_float() - mem_percent = init_float() - - for _ts, snapshot in docker_stats: - for c in sorted(containers): - m = snapshot.get(c) - - def last_or(lst: list[Any], default: Any): - return default if not lst else lst[-1] - - if m is None: - cpu_percent[c].append(last_or(cpu_percent[c], 0.0)) - mem_percent[c].append(last_or(mem_percent[c], 0.0)) - continue - - cpu_percent[c].append(float(m["cpu_percent"])) - mem_percent[c].append(float(m["mem_percent"])) + for ts, snapshot in docker_stats: + m = snapshot.get("materialized") + times.append(ts - t0) + cpu_percent.append(float(m["cpu_percent"]) if m else 0.0) + mem_percent.append(float(m["mem_percent"]) if m else 0.0) return DockerSeries( t=times, @@ -225,27 +208,37 @@ def last_or(lst: list[Any], default: Any): YScale = TypeLiteral["linear", "log", "symlog", "logit"] +_COLOR_OLD = "#1f77b4" # blue +_COLOR_NEW = "#ff7f0e" # orange + + def plot_timeseries_compare( *, t_old: list[int], - ys_old: dict[str, list[float]], + ys_old: list[float], t_new: list[int], - ys_new: dict[str, list[float]], + ys_new: list[float], title: str, ylabel: str, out_path: Path, yscale: YScale | None = None, + vlines_old: dict[str, int] | None = None, + vlines_new: dict[str, int] | None = None, ): """Plot a comparison of two time series.""" plt.figure(figsize=(10, 6)) - containers = sorted(set(ys_old.keys()) | set(ys_new.keys())) + plt.plot(t_old, ys_old, linestyle="--", color=_COLOR_OLD, label="old") + plt.plot(t_new, ys_new, linestyle="-", color=_COLOR_NEW, label="new") - for c in containers: - if c in ys_old: - plt.plot(t_old, ys_old[c], linestyle="--", label=f"{c} (old)") - if c in ys_new: - plt.plot(t_new, ys_new[c], linestyle="-", label=f"{c} (new)") + for label, x in (vlines_old or {}).items(): + plt.axvline( + x=x, linestyle="--", color=_COLOR_OLD, alpha=0.8, label=f"{label} (old)" + ) + for label, x in (vlines_new or {}).items(): + plt.axvline( + x=x, linestyle="-", color=_COLOR_NEW, alpha=0.8, label=f"{label} (new)" + ) plt.xlabel("time [s]") plt.ylabel(ylabel) @@ -277,6 +270,19 @@ def plot_docker_stats_compare( old = extract_docker_series(stats_old["initial_data"]["docker"]) new = extract_docker_series(stats_new["initial_data"]["docker"]) + def initial_vlines(stats: dict[str, Any]) -> dict[str, int]: + vlines: dict[str, int] = {} + docker = stats["initial_data"]["docker"] + if docker: + t0 = docker[0][0] + ts = stats["initial_data"].get("sources_created_at") + if ts is not None: + vlines["sources created"] = int(ts - t0) + return vlines + + vlines_old = initial_vlines(stats_old) + vlines_new = initial_vlines(stats_new) + plot_path = Path("plots") / f"{file}_initial_cpu.png" plot_timeseries_compare( t_old=old.t, @@ -286,6 +292,8 @@ def plot_docker_stats_compare( title=f"{file} - Initial Data Phase CPU\n{old_version} [old] vs {new_version} [new]", ylabel="CPU [%]", out_path=plot_path, + vlines_old=vlines_old, + vlines_new=vlines_new, ) plot_paths.append(plot_path) @@ -298,6 +306,8 @@ def plot_docker_stats_compare( title=f"{file} - Initial Data Phase Memory\n{old_version} [old] vs {new_version} [new]", ylabel="Memory [%]", out_path=plot_path, + vlines_old=vlines_old, + vlines_new=vlines_new, ) plot_paths.append(plot_path) diff --git a/misc/python/materialize/workload_replay/util.py b/misc/python/materialize/workload_replay/util.py index 5d9a0b87f3f73..ead132ae1286c 100644 --- a/misc/python/materialize/workload_replay/util.py +++ b/misc/python/materialize/workload_replay/util.py @@ -200,14 +200,9 @@ def update_captured_workloads_repo() -> None: """Clone or update the captured-workloads repository.""" path = pathlib.Path(MZ_ROOT / "test" / "workload-replay" / "captured-workloads") if (path / ".git").is_dir(): - if ui.env_is_truthy("CI"): - spawn.runv(["git", "-C", str(path), "pull"]) - else: - spawn.runv(["git", "-C", str(path), "fetch"]) - local = spawn.capture(["git", "-C", str(path), "rev-parse", "@"]) - remote = spawn.capture(["git", "-C", str(path), "rev-parse", "@{upstream}"]) - if local != remote: - spawn.runv(["git", "-C", str(path), "pull"]) + commit = "598060c6cf4c2d69730a1313a46704f8663e24fa" + spawn.runv(["git", "-C", str(path), "fetch"]) + spawn.runv(["git", "-C", str(path), "checkout", commit]) else: path.mkdir(exist_ok=True) if ui.env_is_truthy("CI"):