Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ steps:
- common-ancestor

- id: workload-replay
label: "Workload Replay (1% initial data)"
label: "Workload Replay (10% initial data)"
depends_on: build-x86_64
timeout_in_minutes: 240
parallelism: 5
Expand All @@ -273,7 +273,7 @@ steps:
composition: workload-replay
run: benchmark
args:
- --factor-initial-data=0.01
- --factor-initial-data=0.1
- --compare-against
- common-ancestor
agents:
Expand Down
4 changes: 2 additions & 2 deletions ci/release-qualification/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ steps:
- 1200

- id: long-workload-replay
label: "Long Workload Replay (10% initial data)"
label: "Long Workload Replay (100% initial data)"
depends_on: build-x86_64
timeout_in_minutes: 1200
parallelism: 3
Expand All @@ -204,7 +204,7 @@ steps:
composition: workload-replay
run: benchmark
args:
- --factor-initial-data=0.1
- --factor-initial-data=1
- --runtime=3600
- --compare-against
- common-ancestor
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/feature_benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def create_scenario_instance(self) -> Scenario:
elif float(self._scale) > 0:
scale = float(self._scale)

if self._scenario_cls.MAX_SCALE is not None:
scale = min(scale, self._scenario_cls.MAX_SCALE)

scenario_class = self._scenario_cls
return scenario_class(
scale=scale,
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/feature_benchmark/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
class RootScenario:
SCALE: float = 6
FIXED_SCALE: bool = False # Will --scale=N have effect on the scenario
MAX_SCALE: float | None = None # Cap scale to this value when set
RELATIVE_THRESHOLD: dict[MeasurementType, float] = {
MeasurementType.WALLCLOCK: 0.10,
# Increased the other measurements since they are easy to regress now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,8 @@ class StartupLoaded(Scenario):
"""Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something"""

SCALE = 1 # 10 objects of each kind
# Can not scale to 100s of objects
MAX_SCALE = 1.5

def shared(self) -> Action:
return TdAction(
Expand Down
159 changes: 91 additions & 68 deletions misc/python/materialize/workload_replay/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
from pg8000.native import literal

from materialize.workload_replay.util import (
long_tail_choice,
long_tail_float,
long_tail_int,
long_tail_rank,
long_tail_text,
)

Expand All @@ -41,8 +41,9 @@ def __init__(
self.default = default
self.chars = string.ascii_letters + string.digits
self.data_shape = data_shape
if data_shape:
assert typ in ("text", "bytea"), f"Can't create text shape for type {typ}"

self._years = list(range(2019, 2026))
self._seq_counter = 0

self._hot_strings = [
f"{name}_a",
Expand All @@ -56,6 +57,43 @@ def __init__(
"NULL",
]

def _shaped_text(self, rng: random.Random) -> str | None:
"""Generate text according to data_shape, or None if not applicable."""
if self.data_shape == "datetime":
return self._random_datetime(rng)
elif self.data_shape == "random":
length = rng.randrange(5, 40)
return "".join(rng.choice(self.chars) for _ in range(length))
elif self.data_shape == "uuid":
return str(uuid.UUID(int=rng.getrandbits(128), version=4))
elif self.data_shape == "sequential":
self._seq_counter += 1
return f"{self.name}_{self._seq_counter}"
elif self.data_shape == "zipfian":
rank = long_tail_rank(n=10000, a=1.3, rng=rng)
return f"{self.name}_{rank}"
elif self.data_shape is not None and self.data_shape != "duration":
raise ValueError(f"Unhandled data_shape {self.data_shape!r}")
return None

def _shaped_float(self, rng: random.Random) -> float | None:
"""Generate a float according to data_shape, or None if not applicable."""
if self.data_shape == "duration":
return round(rng.uniform(10.0, 1800.0), 2)
return None

def _random_date(self, rng: random.Random) -> str:
"""Generate a uniformly random date string."""
year = rng.choice(self._years)
return f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}"

def _random_datetime(self, rng: random.Random) -> str:
"""Generate a uniformly random datetime string."""
return (
f"{self._random_date(rng)}"
f"T{rng.randrange(0, 24):02}:{rng.randrange(0, 60):02}:{rng.randrange(0, 60):02}Z"
)

def avro_type(self) -> str | list[str]:
"""Return the Avro type for this column."""
result = self.typ
Expand Down Expand Up @@ -96,21 +134,21 @@ def kafka_value(self, rng: random.Random) -> Any:
return long_tail_int(0, 18446744073709551615, rng=rng)

elif self.typ in ("float", "double precision", "numeric"):
shaped = self._shaped_float(rng)
if shaped is not None:
return shaped
return long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng)

elif self.typ in ("text", "bytea"):
if self.data_shape == "datetime":
year = long_tail_choice(
[2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng
)
return literal(
f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}T{rng.randrange(0, 23):02}:{rng.randrange(0, 59):02}:{rng.randrange(0, 59):02}Z"
)
elif self.data_shape:
raise ValueError(f"Unhandled text shape {self.data_shape}")
shaped = self._shaped_text(rng)
if shaped is not None:
return literal(shaped)
return literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))

elif self.typ in ("character", "character varying"):
shaped = self._shaped_text(rng)
if shaped is not None:
return literal(shaped)
return literal(long_tail_text(self.chars, 10, self._hot_strings, rng=rng))

elif self.typ == "uuid":
Expand All @@ -123,23 +161,15 @@ def kafka_value(self, rng: random.Random) -> Any:
return json.dumps(result)

elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
now = 1700000000000 # doesn't need to be exact
if rng.random() < 0.9:
return now + long_tail_int(-86_400_000, 86_400_000, rng=rng)
else:
return rng.randrange(0, 9223372036854775807)
# Epoch millis spread uniformly across 2019–2025
# 2019-01-01 = 1546300800000, 2026-01-01 = 1767225600000
return rng.randrange(1546300800000, 1767225600000)

elif self.typ == "mz_timestamp":
year = long_tail_choice(
[2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng
)
return literal(f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}")
return literal(self._random_date(rng))

elif self.typ == "date":
year = long_tail_choice(
[2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng
)
return literal(f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}")
return literal(self._random_date(rng))

elif self.typ == "time":
if rng.random() < 0.8:
Expand All @@ -150,19 +180,22 @@ def kafka_value(self, rng: random.Random) -> Any:
)

elif self.typ == "int2range":
a = str(long_tail_int(-32768, 32767, rng=rng))
b = str(long_tail_int(-32768, 32767, rng=rng))
return literal(f"[{a},{b})")
a = long_tail_int(-32768, 32767, rng=rng)
b = long_tail_int(-32768, 32767, rng=rng)
lo, hi = min(a, b), max(a, b)
return literal(f"[{lo},{hi})")

elif self.typ == "int4range":
a = str(long_tail_int(-2147483648, 2147483647, rng=rng))
b = str(long_tail_int(-2147483648, 2147483647, rng=rng))
return literal(f"[{a},{b})")
a = long_tail_int(-2147483648, 2147483647, rng=rng)
b = long_tail_int(-2147483648, 2147483647, rng=rng)
lo, hi = min(a, b), max(a, b)
return literal(f"[{lo},{hi})")

elif self.typ == "int8range":
a = str(long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng))
b = str(long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng))
return literal(f"[{a},{b})")
a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
lo, hi = min(a, b), max(a, b)
return literal(f"[{lo},{hi})")

elif self.typ == "map":
return {
Expand Down Expand Up @@ -216,27 +249,23 @@ def value(self, rng: random.Random, in_query: bool = True) -> Any:
return str(val) if in_query else val

elif self.typ in ("float", "double precision", "numeric"):
shaped = self._shaped_float(rng)
if shaped is not None:
return str(shaped) if in_query else shaped
val = long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng)
return str(val) if in_query else val

elif self.typ in ("text", "bytea"):
if self.data_shape == "datetime":
year = long_tail_choice(
[2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng
)
s = (
f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}"
f"T{rng.randrange(0, 23):02}:{rng.randrange(0, 59):02}:{rng.randrange(0, 59):02}Z"
)
return literal(s) if in_query else s

elif self.data_shape:
raise ValueError(f"Unhandled text shape {self.data_shape}")

shaped = self._shaped_text(rng)
if shaped is not None:
return literal(shaped) if in_query else shaped
s = long_tail_text(self.chars, 100, self._hot_strings, rng=rng)
return literal(s) if in_query else s

elif self.typ in ("character", "character varying"):
shaped = self._shaped_text(rng)
if shaped is not None:
return literal(shaped) if in_query else shaped
s = long_tail_text(self.chars, 10, self._hot_strings, rng=rng)
return literal(s) if in_query else s

Expand All @@ -254,24 +283,15 @@ def value(self, rng: random.Random, in_query: bool = True) -> Any:
return json.dumps(obj)

elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
year = long_tail_choice(
[2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng
)
s = f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}"
s = self._random_date(rng)
return literal(s) if in_query else s

elif self.typ == "mz_timestamp":
year = long_tail_choice(
[2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng
)
s = f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}"
s = self._random_date(rng)
return literal(s) if in_query else s

elif self.typ == "date":
year = long_tail_choice(
[2023, 2024, 2025, 2022, 2021, 2020, 2019], hot_prob=0.9, rng=rng
)
s = f"{year}-{rng.randrange(1, 13)}-{rng.randrange(1, 29)}"
s = self._random_date(rng)
return literal(s) if in_query else s

elif self.typ == "time":
Expand All @@ -288,21 +308,24 @@ def value(self, rng: random.Random, in_query: bool = True) -> Any:
return literal(s) if in_query else s

elif self.typ == "int2range":
a = str(long_tail_int(-32768, 32767, rng=rng))
b = str(long_tail_int(-32768, 32767, rng=rng))
s = f"[{a},{b})"
a = long_tail_int(-32768, 32767, rng=rng)
b = long_tail_int(-32768, 32767, rng=rng)
lo, hi = min(a, b), max(a, b)
s = f"[{lo},{hi})"
return literal(s) if in_query else s

elif self.typ == "int4range":
a = str(long_tail_int(-2147483648, 2147483647, rng=rng))
b = str(long_tail_int(-2147483648, 2147483647, rng=rng))
s = f"[{a},{b})"
a = long_tail_int(-2147483648, 2147483647, rng=rng)
b = long_tail_int(-2147483648, 2147483647, rng=rng)
lo, hi = min(a, b), max(a, b)
s = f"[{lo},{hi})"
return literal(s) if in_query else s

elif self.typ == "int8range":
a = str(long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng))
b = str(long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng))
s = f"[{a},{b})"
a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
lo, hi = min(a, b), max(a, b)
s = f"[{lo},{hi})"
return literal(s) if in_query else s

elif self.typ == "map":
Expand Down
Loading