From e04092ddbb58440d659421eb8506d61be6ca4b6a Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Mon, 11 May 2026 14:05:47 -0400 Subject: [PATCH 1/8] Add Scylla historical state offload prototype --- app/seidb.go | 16 + app/seidb_test.go | 38 +++ go.mod | 3 + go.sum | 8 + sei-db/config/ss_config.go | 21 ++ sei-db/config/toml.go | 11 + sei-db/config/toml_test.go | 4 + sei-db/state_db/ss/offload/consumer/README.md | 73 +++++ .../cmd/historical-scylla-consumer/main.go | 51 +++ sei-db/state_db/ss/offload/consumer/config.go | 54 ++++ .../consumer/config/example-scylla.json | 21 ++ .../state_db/ss/offload/consumer/consumer.go | 297 ++++++++++++++++++ sei-db/state_db/ss/offload/consumer/kafka.go | 115 +++++++ .../ss/offload/consumer/kafka_test.go | 46 +++ .../ss/offload/consumer/schema/scylla.cql | 46 +++ sei-db/state_db/ss/offload/consumer/scylla.go | 201 ++++++++++++ .../ss/offload/consumer/scylla_test.go | 68 ++++ sei-db/state_db/ss/offload/consumer/sink.go | 26 ++ .../state_db/ss/offload/historical/reader.go | 37 +++ .../state_db/ss/offload/historical/scylla.go | 239 ++++++++++++++ .../ss/offload/historical/scylla_test.go | 95 ++++++ .../state_db/ss/offload/historical/store.go | 96 ++++++ .../ss/offload/historical/store_test.go | 103 ++++++ sei-db/state_db/ss/offload/kafka.go | 5 +- sei-db/state_db/ss/store.go | 47 ++- 25 files changed, 1718 insertions(+), 3 deletions(-) create mode 100644 sei-db/state_db/ss/offload/consumer/README.md create mode 100644 sei-db/state_db/ss/offload/consumer/cmd/historical-scylla-consumer/main.go create mode 100644 sei-db/state_db/ss/offload/consumer/config.go create mode 100644 sei-db/state_db/ss/offload/consumer/config/example-scylla.json create mode 100644 sei-db/state_db/ss/offload/consumer/consumer.go create mode 100644 sei-db/state_db/ss/offload/consumer/kafka.go create mode 100644 sei-db/state_db/ss/offload/consumer/kafka_test.go create mode 100644 sei-db/state_db/ss/offload/consumer/schema/scylla.cql create mode 100644 sei-db/state_db/ss/offload/consumer/scylla.go create mode 100644 sei-db/state_db/ss/offload/consumer/scylla_test.go create mode 100644 sei-db/state_db/ss/offload/consumer/sink.go create mode 100644 sei-db/state_db/ss/offload/historical/reader.go create mode 100644 sei-db/state_db/ss/offload/historical/scylla.go create mode 100644 sei-db/state_db/ss/offload/historical/scylla_test.go create mode 100644 sei-db/state_db/ss/offload/historical/store.go create mode 100644 sei-db/state_db/ss/offload/historical/store_test.go diff --git a/app/seidb.go b/app/seidb.go index 4ed31b2d7e..6be013875d 100644 --- a/app/seidb.go +++ b/app/seidb.go @@ -45,6 +45,15 @@ const ( FlagEVMSSSplit = "state-store.evm-ss-split" FlagEVMSSSeparateDBs = "state-store.evm-ss-separate-dbs" + // Historical SS offload fallback. + FlagHistoricalOffloadScyllaHosts = "state-store.historical-offload-scylla-hosts" + FlagHistoricalOffloadScyllaKeyspace = "state-store.historical-offload-scylla-keyspace" + FlagHistoricalOffloadScyllaUsername = "state-store.historical-offload-scylla-username" + FlagHistoricalOffloadScyllaPassword = "state-store.historical-offload-scylla-password" + FlagHistoricalOffloadScyllaDatacenter = "state-store.historical-offload-scylla-datacenter" + FlagHistoricalOffloadScyllaConsistency = "state-store.historical-offload-scylla-consistency" + FlagHistoricalOffloadScyllaTimeoutMS = "state-store.historical-offload-scylla-timeout-ms" + // Other configs FlagSnapshotInterval = "state-sync.snapshot-interval" ) @@ -148,6 +157,13 @@ func parseSSConfigs(appOpts servertypes.AppOptions) config.StateStoreConfig { ssConfig.EVMDBDirectory = cast.ToString(appOpts.Get(FlagEVMSSDirectory)) ssConfig.SeparateEVMSubDBs = cast.ToBool(appOpts.Get(FlagEVMSSSeparateDBs)) ssConfig.EVMSplit = cast.ToBool(appOpts.Get(FlagEVMSSSplit)) + ssConfig.HistoricalOffloadScyllaHosts = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaHosts)) + ssConfig.HistoricalOffloadScyllaKeyspace = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaKeyspace)) + ssConfig.HistoricalOffloadScyllaUsername = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaUsername)) + ssConfig.HistoricalOffloadScyllaPassword = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaPassword)) + ssConfig.HistoricalOffloadScyllaDatacenter = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaDatacenter)) + ssConfig.HistoricalOffloadScyllaConsistency = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaConsistency)) + ssConfig.HistoricalOffloadScyllaTimeoutMS = cast.ToInt(appOpts.Get(FlagHistoricalOffloadScyllaTimeoutMS)) return ssConfig } diff --git a/app/seidb_test.go b/app/seidb_test.go index 54b3eb6027..d0a0c88dc1 100644 --- a/app/seidb_test.go +++ b/app/seidb_test.go @@ -61,6 +61,20 @@ func (t TestSeiDBAppOpts) Get(s string) interface{} { return defaultSSConfig.EVMSplit case FlagEVMSSSeparateDBs: return defaultSSConfig.SeparateEVMSubDBs + case FlagHistoricalOffloadScyllaHosts: + return defaultSSConfig.HistoricalOffloadScyllaHosts + case FlagHistoricalOffloadScyllaKeyspace: + return defaultSSConfig.HistoricalOffloadScyllaKeyspace + case FlagHistoricalOffloadScyllaUsername: + return defaultSSConfig.HistoricalOffloadScyllaUsername + case FlagHistoricalOffloadScyllaPassword: + return defaultSSConfig.HistoricalOffloadScyllaPassword + case FlagHistoricalOffloadScyllaDatacenter: + return defaultSSConfig.HistoricalOffloadScyllaDatacenter + case FlagHistoricalOffloadScyllaConsistency: + return defaultSSConfig.HistoricalOffloadScyllaConsistency + case FlagHistoricalOffloadScyllaTimeoutMS: + return defaultSSConfig.HistoricalOffloadScyllaTimeoutMS } return nil } @@ -114,6 +128,30 @@ func TestParseSSConfigs_EVMFlags(t *testing.T) { assert.True(t, ssConfig.SeparateEVMSubDBs) } +func TestParseSSConfigs_HistoricalScyllaFlags(t *testing.T) { + appOpts := mapAppOpts{ + FlagSSEnable: true, + FlagHistoricalOffloadScyllaHosts: "10.0.0.1:9042,10.0.0.2:9042", + FlagHistoricalOffloadScyllaKeyspace: "sei_history", + FlagHistoricalOffloadScyllaUsername: "sei", + FlagHistoricalOffloadScyllaPassword: "secret", + FlagHistoricalOffloadScyllaDatacenter: "use1", + FlagHistoricalOffloadScyllaConsistency: "local_quorum", + FlagHistoricalOffloadScyllaTimeoutMS: 1500, + FlagSSAsyncWriterBuffer: 0, + } + + ssConfig := parseSSConfigs(appOpts) + assert.True(t, ssConfig.Enable) + assert.Equal(t, "10.0.0.1:9042,10.0.0.2:9042", ssConfig.HistoricalOffloadScyllaHosts) + assert.Equal(t, "sei_history", ssConfig.HistoricalOffloadScyllaKeyspace) + assert.Equal(t, "sei", ssConfig.HistoricalOffloadScyllaUsername) + assert.Equal(t, "secret", ssConfig.HistoricalOffloadScyllaPassword) + assert.Equal(t, "use1", ssConfig.HistoricalOffloadScyllaDatacenter) + assert.Equal(t, "local_quorum", ssConfig.HistoricalOffloadScyllaConsistency) + assert.Equal(t, 1500, ssConfig.HistoricalOffloadScyllaTimeoutMS) +} + func TestParseReceiptConfigs_DefaultsToPebbleWhenUnset(t *testing.T) { receiptConfig, err := config.ReadReceiptConfig(mapAppOpts{}) assert.NoError(t, err) diff --git a/go.mod b/go.mod index db681f7bf4..dae2d00b1a 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.5.1 github.com/golang/mock v1.7.0-rc.1 github.com/golang/protobuf v1.5.4 + github.com/gocql/gocql v1.7.0 github.com/google/btree v1.1.3 github.com/google/go-cmp v0.7.0 github.com/google/gofuzz v1.2.0 @@ -119,6 +120,7 @@ require ( github.com/emirpasic/gods v1.18.1 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect github.com/go-git/go-billy/v5 v5.8.0 // indirect + github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect @@ -128,6 +130,7 @@ require ( github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect github.com/skeema/knownhosts v1.3.1 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8ab76b9ab4..b941802165 100644 --- a/go.sum +++ b/go.sum @@ -736,6 +736,7 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bgentry/speakeasy v0.2.0 h1:tgObeVOf8WAvtuAX6DhJ4xks4CFNwPDZiqzGqIHE51E= github.com/bgentry/speakeasy v0.2.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bits-and-blooms/bitset v1.7.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bits-and-blooms/bitset v1.14.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bitset v1.17.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= @@ -743,6 +744,7 @@ github.com/bits-and-blooms/bitset v1.24.3 h1:Bte86SlO3lwPQqww+7BE9ZuUCKIjfqnG5jt github.com/bits-and-blooms/bitset v1.24.3/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/btcsuite/btcd v0.23.2 h1:/YOgUp25sdCnP5ho6Hl3s0E438zlX+Kak7E6TgBgoT0= @@ -1122,6 +1124,8 @@ github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= +github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus= +github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0= github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -1328,6 +1332,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/guptarohit/asciigraph v0.5.5/go.mod h1:dYl5wwK4gNsnFf9Zp+l06rFiDZ5YtXM6x7SRWZ3KGag= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= @@ -2911,6 +2917,8 @@ gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qS gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= diff --git a/sei-db/config/ss_config.go b/sei-db/config/ss_config.go index 3fe94e750d..09fd69117f 100644 --- a/sei-db/config/ss_config.go +++ b/sei-db/config/ss_config.go @@ -76,6 +76,27 @@ type StateStoreConfig struct { // When true, data is routed to separate DBs by EVM key family while // preserving the same logical store key and full key encoding inside each DB. SeparateEVMSubDBs bool `mapstructure:"evm-separate-dbs"` + + // HistoricalOffloadScyllaHosts enables ScyllaDB/Cassandra fallback reads + // for versions pruned from local SS when non-empty. Hosts are comma-separated + // host[:port] values. + HistoricalOffloadScyllaHosts string `mapstructure:"historical-offload-scylla-hosts"` + + // HistoricalOffloadScyllaKeyspace is the keyspace containing state_mutations. + HistoricalOffloadScyllaKeyspace string `mapstructure:"historical-offload-scylla-keyspace"` + + // HistoricalOffloadScyllaUsername and Password are optional. + HistoricalOffloadScyllaUsername string `mapstructure:"historical-offload-scylla-username"` + HistoricalOffloadScyllaPassword string `mapstructure:"historical-offload-scylla-password"` + + // HistoricalOffloadScyllaDatacenter enables DC-aware routing when set. + HistoricalOffloadScyllaDatacenter string `mapstructure:"historical-offload-scylla-datacenter"` + + // HistoricalOffloadScyllaConsistency defaults to local_quorum when empty. + HistoricalOffloadScyllaConsistency string `mapstructure:"historical-offload-scylla-consistency"` + + // HistoricalOffloadScyllaTimeoutMS defaults in the Scylla reader when zero. + HistoricalOffloadScyllaTimeoutMS int `mapstructure:"historical-offload-scylla-timeout-ms"` } // DefaultStateStoreConfig returns the default StateStoreConfig diff --git a/sei-db/config/toml.go b/sei-db/config/toml.go index eb387cb1d5..68e14fce7a 100644 --- a/sei-db/config/toml.go +++ b/sei-db/config/toml.go @@ -139,6 +139,17 @@ evm-ss-split = {{ .StateStore.EVMSplit }} # When false, all EVM data stays in one DB using the current unified layout. # When true, data is routed to separate DBs while preserving the same evm key prefix format. evm-ss-separate-dbs = {{ .StateStore.SeparateEVMSubDBs }} + +# Optional ScyllaDB/Cassandra historical-state fallback. When hosts are set, +# point reads for versions pruned from local SS fall back to state_mutations in +# the configured keyspace. Iterators still use local SS. +historical-offload-scylla-hosts = "{{ .StateStore.HistoricalOffloadScyllaHosts }}" +historical-offload-scylla-keyspace = "{{ .StateStore.HistoricalOffloadScyllaKeyspace }}" +historical-offload-scylla-username = "{{ .StateStore.HistoricalOffloadScyllaUsername }}" +historical-offload-scylla-password = "{{ .StateStore.HistoricalOffloadScyllaPassword }}" +historical-offload-scylla-datacenter = "{{ .StateStore.HistoricalOffloadScyllaDatacenter }}" +historical-offload-scylla-consistency = "{{ .StateStore.HistoricalOffloadScyllaConsistency }}" +historical-offload-scylla-timeout-ms = {{ .StateStore.HistoricalOffloadScyllaTimeoutMS }} ` // ReceiptStoreConfigTemplate defines the configuration template for receipt-store diff --git a/sei-db/config/toml_test.go b/sei-db/config/toml_test.go index fd0a51f932..b6bd267faf 100644 --- a/sei-db/config/toml_test.go +++ b/sei-db/config/toml_test.go @@ -88,6 +88,10 @@ func TestStateStoreConfigTemplate(t *testing.T) { require.Contains(t, output, `evm-ss-db-directory = ""`, "Missing evm-ss-db-directory") require.Contains(t, output, `evm-ss-split = false`, "Missing or incorrect evm-ss-split") require.Contains(t, output, "evm-ss-separate-dbs = false", "Missing or incorrect evm-ss-separate-dbs") + require.Contains(t, output, `historical-offload-scylla-hosts = ""`, "Missing historical Scylla hosts") + require.Contains(t, output, `historical-offload-scylla-keyspace = ""`, "Missing historical Scylla keyspace") + require.Contains(t, output, `historical-offload-scylla-consistency = ""`, "Missing historical Scylla consistency") + require.Contains(t, output, "historical-offload-scylla-timeout-ms = 0", "Missing historical Scylla timeout") } // TestReceiptStoreConfigTemplate verifies that all field paths in the receipt-store TOML template diff --git a/sei-db/state_db/ss/offload/consumer/README.md b/sei-db/state_db/ss/offload/consumer/README.md new file mode 100644 index 0000000000..9ccc28b29d --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/README.md @@ -0,0 +1,73 @@ +# Historical Scylla/Cassandra Offload + +This is a prototype historical-state backend for ScyllaDB or Cassandra. + +The intended shape is narrow: + +- local SS remains the hot store for recent state, writes, imports, pruning, and iterators +- Scylla/Cassandra stores immutable MVCC mutation rows for older history +- reads below local SS retention can fall back to Scylla/Cassandra for `Get` and `Has` + +The table layout is built for point reads by `(store_name, state_key, target_version)`: + +```sql +SELECT version, value, deleted +FROM state_mutations +WHERE store_name = ? AND state_key = ? AND version <= ? +ORDER BY version DESC +LIMIT 1; +``` + +Ordered prefix iteration is intentionally not served from Scylla/Cassandra in this prototype. + +## Schema + +Apply the schema once: + +```bash +cqlsh 127.0.0.1 9042 -f sei-db/state_db/ss/offload/consumer/schema/scylla.cql +``` + +For production, edit the keyspace replication in `schema/scylla.cql` to use +`NetworkTopologyStrategy` with the actual datacenter names and replication +factors before applying it. + +## Consumer + +The consumer reads historical offload changelog messages from Kafka and writes +them into Scylla/Cassandra. Kafka offsets are committed only after the sink +write succeeds. + +```bash +go run ./sei-db/state_db/ss/offload/consumer/cmd/historical-scylla-consumer \ + ./sei-db/state_db/ss/offload/consumer/config/example-scylla.json +``` + +The example config is local-dev only. Set real Kafka brokers, Scylla hosts, +keyspace, datacenter, and credentials in your own config. + +## Node Read Fallback + +Enable fallback reads in the node config: + +```toml +[state-store] +historical-offload-scylla-hosts = "10.0.0.1:9042,10.0.0.2:9042" +historical-offload-scylla-keyspace = "sei_history" +historical-offload-scylla-username = "" +historical-offload-scylla-password = "" +historical-offload-scylla-datacenter = "datacenter1" +historical-offload-scylla-consistency = "local_quorum" +historical-offload-scylla-timeout-ms = 2000 +``` + +Fallback activates only for point reads where the requested version is below the +local SS earliest version. Missing rows and tombstones return empty state, same +as local SS. + +## Current Limits + +- No Scylla/Cassandra iterator path. +- No cross-row transaction on ingest; mutation rows are written first and the + version marker is written last, so replay is idempotent after partial failure. +- No automatic schema creation from the binary. diff --git a/sei-db/state_db/ss/offload/consumer/cmd/historical-scylla-consumer/main.go b/sei-db/state_db/ss/offload/consumer/cmd/historical-scylla-consumer/main.go new file mode 100644 index 0000000000..65020ca354 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/cmd/historical-scylla-consumer/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload/consumer" +) + +func main() { + if len(os.Args) != 2 { + fmt.Fprintf(os.Stderr, "usage: %s \n", os.Args[0]) + os.Exit(2) + } + + cfg, err := consumer.LoadConfig(os.Args[1]) + if err != nil { + log.Fatalf("load config: %v", err) + } + + sink, err := consumer.NewScyllaSink(cfg.Scylla) + if err != nil { + log.Fatalf("open scylla/cassandra sink: %v", err) + } + defer func() { _ = sink.Close() }() + + reader, err := consumer.NewKafkaReader(cfg.Kafka) + if err != nil { + log.Fatalf("open kafka reader: %v", err) + } + defer func() { _ = reader.Close() }() + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + c := consumer.New(reader, sink, consumer.Options{ + Logf: func(format string, args ...interface{}) { log.Printf(format, args...) }, + Workers: cfg.Workers, + ShardBufferSize: cfg.ShardBufferSize, + MaxBatchRecords: cfg.MaxBatchRecords, + BatchMaxWait: time.Duration(cfg.BatchMaxWaitMS) * time.Millisecond, + }) + if err := c.Run(ctx); err != nil { + log.Fatalf("consumer: %v", err) + } +} diff --git a/sei-db/state_db/ss/offload/consumer/config.go b/sei-db/state_db/ss/offload/consumer/config.go new file mode 100644 index 0000000000..f381b2c11f --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/config.go @@ -0,0 +1,54 @@ +package consumer + +import ( + "encoding/json" + "fmt" + "os" +) + +type Config struct { + Kafka KafkaReaderConfig + Scylla ScyllaConfig + Workers int + ShardBufferSize int + MaxBatchRecords int + BatchMaxWaitMS int +} + +func (c *Config) Validate() error { + if err := c.Kafka.Validate(); err != nil { + return fmt.Errorf("kafka: %w", err) + } + if err := c.Scylla.Validate(); err != nil { + return fmt.Errorf("scylla: %w", err) + } + if c.Workers < 0 { + return fmt.Errorf("workers must be non-negative") + } + if c.ShardBufferSize < 0 { + return fmt.Errorf("shard buffer size must be non-negative") + } + if c.MaxBatchRecords < 0 { + return fmt.Errorf("max batch records must be non-negative") + } + if c.BatchMaxWaitMS < 0 { + return fmt.Errorf("batch max wait ms must be non-negative") + } + return nil +} + +func LoadConfig(path string) (*Config, error) { + // #nosec G304 -- config path is supplied by the operator on the command line. + raw, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read config: %w", err) + } + cfg := &Config{} + if err := json.Unmarshal(raw, cfg); err != nil { + return nil, fmt.Errorf("parse config: %w", err) + } + if err := cfg.Validate(); err != nil { + return nil, err + } + return cfg, nil +} diff --git a/sei-db/state_db/ss/offload/consumer/config/example-scylla.json b/sei-db/state_db/ss/offload/consumer/config/example-scylla.json new file mode 100644 index 0000000000..94779c83e9 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/config/example-scylla.json @@ -0,0 +1,21 @@ +{ + "Kafka": { + "Brokers": ["localhost:9092"], + "Topic": "historical-offload", + "GroupID": "historical-scylla", + "StartOffset": "first" + }, + "Scylla": { + "Hosts": ["127.0.0.1:9042"], + "Keyspace": "sei_history", + "Datacenter": "datacenter1", + "Consistency": "local_quorum", + "TimeoutMS": 2000, + "ConnectTimeoutMS": 2000, + "NumConns": 4 + }, + "Workers": 16, + "ShardBufferSize": 128, + "MaxBatchRecords": 16, + "BatchMaxWaitMS": 10 +} diff --git a/sei-db/state_db/ss/offload/consumer/consumer.go b/sei-db/state_db/ss/offload/consumer/consumer.go new file mode 100644 index 0000000000..a0adaaca46 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/consumer.go @@ -0,0 +1,297 @@ +package consumer + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/segmentio/kafka-go" + "golang.org/x/sync/errgroup" +) + +// MessageSource is the subset of *kafka.Reader used by the loop. +type MessageSource interface { + FetchMessage(ctx context.Context) (kafka.Message, error) + CommitMessages(ctx context.Context, msgs ...kafka.Message) error +} + +// Messages are sharded by partition: cross-partition writes parallelize while +// ordering within a partition is preserved. +type Consumer struct { + reader MessageSource + sink Sink + logf func(format string, args ...interface{}) + workers int + shardBuf int + batchSize int + batchWait time.Duration + maxAttempts int + baseBackoff time.Duration + maxBackoff time.Duration +} + +const ( + defaultSinkMaxAttempts = 5 + defaultSinkBaseBackoff = 1 * time.Second + defaultSinkMaxBackoff = 30 * time.Second + defaultWorkers = 16 + defaultShardBuffer = 128 + defaultBatchSize = 16 + defaultBatchMaxWait = 10 * time.Millisecond +) + +// Backpressure: when the sink falls behind, ShardBufferSize fills, the fetcher +// blocks, and Kafka stops being polled. Zero values pick defaults. +type Options struct { + Logf func(format string, args ...interface{}) + SinkMaxAttempts int + SinkBaseBackoff time.Duration + SinkMaxBackoff time.Duration + Workers int + ShardBufferSize int + MaxBatchRecords int + BatchMaxWait time.Duration +} + +func New(reader MessageSource, sink Sink, opts Options) *Consumer { + logf := opts.Logf + if logf == nil { + logf = func(string, ...interface{}) {} + } + maxAttempts := opts.SinkMaxAttempts + if maxAttempts <= 0 { + maxAttempts = defaultSinkMaxAttempts + } + base := opts.SinkBaseBackoff + if base <= 0 { + base = defaultSinkBaseBackoff + } + maxBackoff := opts.SinkMaxBackoff + if maxBackoff <= 0 { + maxBackoff = defaultSinkMaxBackoff + } + workers := opts.Workers + if workers <= 0 { + workers = defaultWorkers + } + shardBuf := opts.ShardBufferSize + if shardBuf <= 0 { + shardBuf = defaultShardBuffer + } + batchSize := opts.MaxBatchRecords + if batchSize <= 0 { + batchSize = defaultBatchSize + } + batchWait := opts.BatchMaxWait + if batchWait <= 0 { + batchWait = defaultBatchMaxWait + } + return &Consumer{ + reader: reader, + sink: sink, + logf: logf, + workers: workers, + shardBuf: shardBuf, + batchSize: batchSize, + batchWait: batchWait, + maxAttempts: maxAttempts, + baseBackoff: base, + maxBackoff: maxBackoff, + } +} + +// Run commits offsets only after the sink persists each message. +func (c *Consumer) Run(ctx context.Context) error { + return c.runParallel(ctx) +} + +func (c *Consumer) runParallel(ctx context.Context) error { + g, gctx := errgroup.WithContext(ctx) + shards := make([]chan kafka.Message, c.workers) + for i := range shards { + shards[i] = make(chan kafka.Message, c.shardBuf) + ch := shards[i] + g.Go(func() error { return c.workerLoop(gctx, ch) }) + } + g.Go(func() error { + defer func() { + for _, ch := range shards { + close(ch) + } + }() + for { + msg, err := c.reader.FetchMessage(gctx) + if err != nil { + if isCancellation(err) { + return nil + } + return fmt.Errorf("fetch kafka message: %w", err) + } + shard := shardFor(msg.Partition, c.workers) + select { + case shards[shard] <- msg: + case <-gctx.Done(): + return nil + } + } + }) + if err := g.Wait(); err != nil && !isCancellation(err) { + return err + } + return nil +} + +func (c *Consumer) workerLoop(ctx context.Context, ch <-chan kafka.Message) error { + for { + select { + case <-ctx.Done(): + return nil + case msg, ok := <-ch: + if !ok { + return nil + } + msgs, ok := c.collectBatch(ctx, ch, msg) + if !ok { + return nil + } + if err := c.processBatch(ctx, msgs); err != nil { + if isCancellation(err) { + return nil + } + return err + } + } + } +} + +func (c *Consumer) collectBatch(ctx context.Context, ch <-chan kafka.Message, first kafka.Message) ([]kafka.Message, bool) { + msgs := make([]kafka.Message, 1, c.batchSize) + msgs[0] = first + if c.batchSize <= 1 { + return msgs, true + } + +drainBuffered: + for len(msgs) < c.batchSize { + select { + case <-ctx.Done(): + return nil, false + case msg, ok := <-ch: + if !ok { + return msgs, true + } + msgs = append(msgs, msg) + default: + break drainBuffered + } + } + if len(msgs) == c.batchSize { + return msgs, true + } + + timer := time.NewTimer(c.batchWait) + defer timer.Stop() + for len(msgs) < c.batchSize { + select { + case <-ctx.Done(): + return nil, false + case msg, ok := <-ch: + if !ok { + return msgs, true + } + msgs = append(msgs, msg) + case <-timer.C: + return msgs, true + } + } + return msgs, true +} + +func (c *Consumer) processBatch(ctx context.Context, msgs []kafka.Message) error { + records := make([]Record, 0, len(msgs)) + var firstVersion, lastVersion int64 + for i, msg := range msgs { + entry, err := DecodeEntry(msg.Value) + if err != nil { + return fmt.Errorf("decode message at offset %d: %w", msg.Offset, err) + } + if i == 0 { + firstVersion = entry.Version + } + lastVersion = entry.Version + records = append(records, Record{ + Topic: msg.Topic, + Partition: msg.Partition, + Offset: msg.Offset, + Entry: entry, + }) + } + start := time.Now() + if err := c.writeBatchWithRetry(ctx, records); err != nil { + return fmt.Errorf("sink write batch first_version=%d last_version=%d: %w", + firstVersion, lastVersion, err) + } + c.logf("wrote records=%d first_version=%d last_version=%d in %s", + len(records), firstVersion, lastVersion, time.Since(start)) + if err := c.reader.CommitMessages(ctx, msgs...); err != nil { + return fmt.Errorf("commit kafka offsets: %w", err) + } + return nil +} + +func (c *Consumer) writeBatchWithRetry(ctx context.Context, records []Record) error { + backoff := c.baseBackoff + var lastErr error + for attempt := 1; attempt <= c.maxAttempts; attempt++ { + err := c.writeRecords(ctx, records) + if err == nil { + return nil + } + lastErr = err + if isCancellation(err) { + return err + } + if attempt == c.maxAttempts { + break + } + c.logf("sink write attempt %d/%d failed: %v; retrying in %s", + attempt, c.maxAttempts, err, backoff) + select { + case <-time.After(backoff): + case <-ctx.Done(): + return ctx.Err() + } + backoff *= 2 + if backoff > c.maxBackoff { + backoff = c.maxBackoff + } + } + return fmt.Errorf("sink write failed after %d attempts: %w", c.maxAttempts, lastErr) +} + +func (c *Consumer) writeRecords(ctx context.Context, records []Record) error { + if len(records) == 0 { + return nil + } + if sink, ok := c.sink.(BatchSink); ok { + return sink.WriteBatch(ctx, records) + } + for _, rec := range records { + if err := c.sink.Write(ctx, rec); err != nil { + return err + } + } + return nil +} + +func shardFor(partition, workers int) int { + if partition < 0 { + partition = -partition + } + return partition % workers +} + +func isCancellation(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +} diff --git a/sei-db/state_db/ss/offload/consumer/kafka.go b/sei-db/state_db/ss/offload/consumer/kafka.go new file mode 100644 index 0000000000..1785bb05f8 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/kafka.go @@ -0,0 +1,115 @@ +package consumer + +import ( + "crypto/tls" + "fmt" + "strings" + "time" + + gogoproto "github.com/gogo/protobuf/proto" + "github.com/segmentio/kafka-go" + + dbproto "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload" +) + +// TLS/SASL must match the producer cluster. Commits are synchronous +// (kafka-go's zero CommitInterval) so offsets only advance after the sink +// persists each entry. +type KafkaReaderConfig struct { + Brokers []string + Topic string + GroupID string + ClientID string + Region string + StartOffset string // "first" or "last"; defaults to "first" + MinBytes int + MaxBytes int + MaxWait time.Duration + TLSEnabled bool + SASLMechanism string +} + +func (c *KafkaReaderConfig) ApplyDefaults() { + if c.ClientID == "" { + c.ClientID = "cryptosim-historical-scylla-consumer" + } + if c.StartOffset == "" { + c.StartOffset = "first" + } + if c.MinBytes == 0 { + c.MinBytes = 1 + } + if c.MaxBytes == 0 { + c.MaxBytes = 10 << 20 + } + if c.MaxWait == 0 { + c.MaxWait = 500 * time.Millisecond + } +} + +func (c *KafkaReaderConfig) Validate() error { + if len(c.Brokers) == 0 { + return fmt.Errorf("kafka brokers are required") + } + if c.Topic == "" { + return fmt.Errorf("kafka topic is required") + } + if c.GroupID == "" { + return fmt.Errorf("kafka group id is required") + } + switch strings.ToLower(c.StartOffset) { + case "", "first", "last": + default: + return fmt.Errorf("unsupported kafka start offset %q", c.StartOffset) + } + return nil +} + +func NewKafkaReader(cfg KafkaReaderConfig) (*kafka.Reader, error) { + cfg.ApplyDefaults() + if err := cfg.Validate(); err != nil { + return nil, err + } + + dialer := &kafka.Dialer{ + ClientID: cfg.ClientID, + Timeout: 10 * time.Second, + } + if cfg.TLSEnabled { + dialer.TLS = &tls.Config{MinVersion: tls.VersionTLS12} + } + mech, err := offload.NewSASLMechanism(offload.KafkaConfig{ + Region: cfg.Region, + TLSEnabled: cfg.TLSEnabled, + SASLMechanism: cfg.SASLMechanism, + }) + if err != nil { + return nil, err + } + dialer.SASLMechanism = mech + + start := kafka.FirstOffset + if strings.EqualFold(cfg.StartOffset, "last") { + start = kafka.LastOffset + } + + return kafka.NewReader(kafka.ReaderConfig{ + Brokers: cfg.Brokers, + Topic: cfg.Topic, + GroupID: cfg.GroupID, + Dialer: dialer, + MinBytes: cfg.MinBytes, + MaxBytes: cfg.MaxBytes, + MaxWait: cfg.MaxWait, + StartOffset: start, + }), nil +} + +func DecodeEntry(payload []byte) (*dbproto.ChangelogEntry, error) { + entry := &dbproto.ChangelogEntry{} + if err := gogoproto.Unmarshal(payload, entry); err != nil { + return nil, fmt.Errorf("decode changelog entry: %w", err) + } + return entry, nil +} diff --git a/sei-db/state_db/ss/offload/consumer/kafka_test.go b/sei-db/state_db/ss/offload/consumer/kafka_test.go new file mode 100644 index 0000000000..7fa6b320e4 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/kafka_test.go @@ -0,0 +1,46 @@ +package consumer + +import ( + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/stretchr/testify/require" +) + +func TestKafkaReaderConfigApplyDefaults(t *testing.T) { + cfg := KafkaReaderConfig{ + Brokers: []string{"localhost:9092"}, + Topic: "historical-offload", + GroupID: "scylla", + } + cfg.ApplyDefaults() + require.Equal(t, "cryptosim-historical-scylla-consumer", cfg.ClientID) + require.Equal(t, "first", cfg.StartOffset) + require.Equal(t, 1, cfg.MinBytes) + require.Equal(t, 10<<20, cfg.MaxBytes) +} + +func TestKafkaReaderConfigValidate(t *testing.T) { + cfg := KafkaReaderConfig{} + require.ErrorContains(t, cfg.Validate(), "brokers") + cfg = KafkaReaderConfig{Brokers: []string{"x"}} + require.ErrorContains(t, cfg.Validate(), "topic") + cfg = KafkaReaderConfig{Brokers: []string{"x"}, Topic: "t"} + require.ErrorContains(t, cfg.Validate(), "group id") + cfg = KafkaReaderConfig{ + Brokers: []string{"x"}, + Topic: "t", + GroupID: "g", + StartOffset: "middle", + } + require.ErrorContains(t, cfg.Validate(), "start offset") +} + +func TestDecodeEntry(t *testing.T) { + entry := &proto.ChangelogEntry{Version: 7} + payload, err := entry.Marshal() + require.NoError(t, err) + got, err := DecodeEntry(payload) + require.NoError(t, err) + require.Equal(t, int64(7), got.Version) +} diff --git a/sei-db/state_db/ss/offload/consumer/schema/scylla.cql b/sei-db/state_db/ss/offload/consumer/schema/scylla.cql new file mode 100644 index 0000000000..ceb7d3cfc2 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/schema/scylla.cql @@ -0,0 +1,46 @@ +-- ScyllaDB/Cassandra schema for Sei historical state offload. +-- Apply once before running historical-scylla-consumer. + +CREATE KEYSPACE IF NOT EXISTS sei_history +WITH replication = { + 'class': 'SimpleStrategy', + 'replication_factor': '1' +}; + +-- For production, replace the keyspace replication with +-- NetworkTopologyStrategy and the real datacenter replication factors. + +USE sei_history; + +-- Version markers are written after all mutation rows for a block version. +-- Buckets avoid a single hot partition while keeping LastVersion bounded to +-- 64 small point reads. +CREATE TABLE IF NOT EXISTS state_versions ( + bucket int, + version bigint, + kafka_topic text, + kafka_partition int, + kafka_offset bigint, + ingested_at timestamp, + PRIMARY KEY ((bucket), version) +) WITH CLUSTERING ORDER BY (version DESC); + +-- Historical point lookup: +-- WHERE store_name = ? AND state_key = ? AND version <= ? +-- ORDER BY version DESC LIMIT 1 +CREATE TABLE IF NOT EXISTS state_mutations ( + store_name text, + state_key blob, + version bigint, + value blob, + deleted boolean, + PRIMARY KEY ((store_name, state_key), version) +) WITH CLUSTERING ORDER BY (version DESC); + +CREATE TABLE IF NOT EXISTS state_tree_upgrades ( + version bigint, + name text, + rename_from text, + deleted boolean, + PRIMARY KEY ((version), name) +); diff --git a/sei-db/state_db/ss/offload/consumer/scylla.go b/sei-db/state_db/ss/offload/consumer/scylla.go new file mode 100644 index 0000000000..c16744662c --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/scylla.go @@ -0,0 +1,201 @@ +package consumer + +import ( + "context" + "fmt" + "time" + + "github.com/gocql/gocql" + + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload/historical" +) + +type ScyllaConfig struct { + Hosts []string + Keyspace string + Username string + Password string + Datacenter string + Consistency string + TimeoutMS int + ConnectTimeoutMS int + NumConns int +} + +func (c *ScyllaConfig) ApplyDefaults() { + cfg := c.toHistorical() + cfg.ApplyDefaults() + c.Consistency = cfg.Consistency + c.TimeoutMS = int(cfg.Timeout / time.Millisecond) + c.ConnectTimeoutMS = int(cfg.ConnectTimeout / time.Millisecond) + c.NumConns = cfg.NumConns +} + +func (c *ScyllaConfig) Validate() error { + cfg := c.toHistorical() + cfg.ApplyDefaults() + return cfg.Validate() +} + +func (c ScyllaConfig) toHistorical() historical.ScyllaConfig { + return historical.ScyllaConfig{ + Hosts: c.Hosts, + Keyspace: c.Keyspace, + Username: c.Username, + Password: c.Password, + Datacenter: c.Datacenter, + Consistency: c.Consistency, + Timeout: time.Duration(c.TimeoutMS) * time.Millisecond, + ConnectTimeout: time.Duration(c.ConnectTimeoutMS) * time.Millisecond, + NumConns: c.NumConns, + } +} + +type scyllaSink struct { + session *gocql.Session +} + +var _ Sink = (*scyllaSink)(nil) +var _ BatchSink = (*scyllaSink)(nil) + +func NewScyllaSink(cfg ScyllaConfig) (Sink, error) { + session, err := historical.OpenScyllaSession(cfg.toHistorical()) + if err != nil { + return nil, err + } + return &scyllaSink{session: session}, nil +} + +func (s *scyllaSink) Close() error { + s.session.Close() + return nil +} + +func (s *scyllaSink) LastVersion(ctx context.Context) (int64, error) { + var maxVersion int64 + for bucket := 0; bucket < historical.VersionBucketCount; bucket++ { + var version int64 + err := s.session.Query(selectLatestVersionCQL, bucket).WithContext(ctx).Scan(&version) + if err != nil { + if err == gocql.ErrNotFound { + continue + } + return 0, fmt.Errorf("read latest scylla/cassandra version bucket %d: %w", bucket, err) + } + if version > maxVersion { + maxVersion = version + } + } + return maxVersion, nil +} + +func (s *scyllaSink) Write(ctx context.Context, rec Record) error { + return s.WriteBatch(ctx, []Record{rec}) +} + +func (s *scyllaSink) WriteBatch(ctx context.Context, records []Record) error { + for _, rec := range compactRecords(records) { + if err := s.writeRecord(ctx, rec); err != nil { + return err + } + } + return nil +} + +func compactRecords(records []Record) []Record { + for _, rec := range records { + if rec.Entry == nil { + out := make([]Record, 0, len(records)) + for _, rec := range records { + if rec.Entry != nil { + out = append(out, rec) + } + } + return out + } + } + return records +} + +func (s *scyllaSink) writeRecord(ctx context.Context, rec Record) error { + entry := rec.Entry + if entry == nil { + return nil + } + version := entry.Version + for _, ncs := range entry.Changesets { + for _, pair := range ncs.Changeset.Pairs { + if err := s.writeMutation(ctx, version, ncs.Name, pair); err != nil { + return err + } + } + } + for _, up := range entry.Upgrades { + if err := s.writeUpgrade(ctx, version, up); err != nil { + return err + } + } + if err := s.session.Query(insertVersionCQL, + historical.VersionBucket(version), + version, + rec.Topic, + rec.Partition, + rec.Offset, + time.Now(), + ).WithContext(ctx).Exec(); err != nil { + return fmt.Errorf("insert scylla/cassandra version %d: %w", version, err) + } + return nil +} + +func (s *scyllaSink) writeMutation(ctx context.Context, version int64, storeName string, pair *proto.KVPair) error { + deleted := pair.Delete || pair.Value == nil + value := pair.Value + if deleted { + value = nil + } + if err := s.session.Query(insertMutationCQL, + storeName, + pair.Key, + version, + value, + deleted, + ).WithContext(ctx).Exec(); err != nil { + return fmt.Errorf("insert scylla/cassandra mutation store=%s version=%d: %w", storeName, version, err) + } + return nil +} + +func (s *scyllaSink) writeUpgrade(ctx context.Context, version int64, up *proto.TreeNameUpgrade) error { + if err := s.session.Query(insertUpgradeCQL, + version, + up.Name, + up.RenameFrom, + up.Delete, + ).WithContext(ctx).Exec(); err != nil { + return fmt.Errorf("insert scylla/cassandra tree upgrade version=%d name=%s: %w", version, up.Name, err) + } + return nil +} + +const selectLatestVersionCQL = ` +SELECT version +FROM state_versions +WHERE bucket = ? +LIMIT 1` + +const insertVersionCQL = ` +INSERT INTO state_versions ( + bucket, version, kafka_topic, kafka_partition, kafka_offset, ingested_at +) VALUES (?, ?, ?, ?, ?, ?)` + +const insertMutationCQL = ` +INSERT INTO state_mutations ( + store_name, state_key, version, value, deleted +) VALUES (?, ?, ?, ?, ?)` + +const insertUpgradeCQL = ` +INSERT INTO state_tree_upgrades ( + version, name, rename_from, deleted +) VALUES (?, ?, ?, ?)` diff --git a/sei-db/state_db/ss/offload/consumer/scylla_test.go b/sei-db/state_db/ss/offload/consumer/scylla_test.go new file mode 100644 index 0000000000..b80b409aa8 --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/scylla_test.go @@ -0,0 +1,68 @@ +package consumer + +import ( + "strings" + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/stretchr/testify/require" +) + +func TestScyllaConfigValidate(t *testing.T) { + cfg := ScyllaConfig{ + Hosts: []string{"127.0.0.1"}, + Keyspace: "sei_history", + } + require.NoError(t, cfg.Validate()) + + cfg.TimeoutMS = -1 + require.ErrorContains(t, cfg.Validate(), "timeout") +} + +func TestScyllaConfigApplyDefaults(t *testing.T) { + cfg := ScyllaConfig{ + Hosts: []string{"127.0.0.1"}, + Keyspace: "sei_history", + } + cfg.ApplyDefaults() + require.Equal(t, "local_quorum", cfg.Consistency) + require.Equal(t, 2000, cfg.TimeoutMS) + require.Equal(t, 2000, cfg.ConnectTimeoutMS) + require.Equal(t, 4, cfg.NumConns) +} + +func TestCompactRecordsDropsNilEntries(t *testing.T) { + records := compactRecords([]Record{ + {Entry: &proto.ChangelogEntry{Version: 1}}, + {}, + {Entry: &proto.ChangelogEntry{Version: 2}}, + }) + require.Len(t, records, 2) + require.Equal(t, int64(1), records[0].Entry.Version) + require.Equal(t, int64(2), records[1].Entry.Version) +} + +func TestScyllaCQLShape(t *testing.T) { + for _, frag := range []string{ + "INSERT INTO state_mutations", + "store_name", + "state_key", + "version", + "value", + "deleted", + } { + require.Contains(t, insertMutationCQL, frag) + } + for _, frag := range []string{ + "INSERT INTO state_versions", + "bucket", + "version", + "kafka_topic", + "kafka_partition", + "kafka_offset", + "ingested_at", + } { + require.Contains(t, insertVersionCQL, frag) + } + require.True(t, strings.Contains(selectLatestVersionCQL, "LIMIT 1")) +} diff --git a/sei-db/state_db/ss/offload/consumer/sink.go b/sei-db/state_db/ss/offload/consumer/sink.go new file mode 100644 index 0000000000..79da24d0cd --- /dev/null +++ b/sei-db/state_db/ss/offload/consumer/sink.go @@ -0,0 +1,26 @@ +package consumer + +import ( + "context" + + dbproto "github.com/sei-protocol/sei-chain/sei-db/proto" +) + +// Topic/Partition/Offset are kept alongside Entry so sinks can be idempotent +// across replayed Kafka messages. +type Record struct { + Topic string + Partition int + Offset int64 + Entry *dbproto.ChangelogEntry +} + +type Sink interface { + Write(ctx context.Context, rec Record) error + LastVersion(ctx context.Context) (int64, error) + Close() error +} + +type BatchSink interface { + WriteBatch(ctx context.Context, records []Record) error +} diff --git a/sei-db/state_db/ss/offload/historical/reader.go b/sei-db/state_db/ss/offload/historical/reader.go new file mode 100644 index 0000000000..7dfdfee993 --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/reader.go @@ -0,0 +1,37 @@ +// Package historical reads MVCC state from an external historical store. +package historical + +import ( + "context" + "errors" +) + +var ErrNotFound = errors.New("historical state not found") + +// Key is a string so Lookup is usable as a map key. +type Lookup struct { + StoreName string + Key string +} + +// Value is the actual MVCC value that satisfied the lookup. +// Version may be older than the requested target version. +type Value struct { + Bytes []byte + Version int64 +} + +type Reader interface { + // Get returns ErrNotFound if no row exists at or before targetVersion, + // or if the latest such row is a tombstone. + Get(ctx context.Context, storeName string, key []byte, targetVersion int64) (Value, error) + + // Has skips value transfer and returns false for missing or tombstoned keys. + Has(ctx context.Context, storeName string, key []byte, targetVersion int64) (bool, error) + + // BatchGet returns only found, non-tombstoned lookups. + BatchGet(ctx context.Context, targetVersion int64, lookups []Lookup) (map[Lookup]Value, error) + + LastVersion(ctx context.Context) (int64, error) + Close() error +} diff --git a/sei-db/state_db/ss/offload/historical/scylla.go b/sei-db/state_db/ss/offload/historical/scylla.go new file mode 100644 index 0000000000..5d48c8a778 --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/scylla.go @@ -0,0 +1,239 @@ +package historical + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/gocql/gocql" +) + +const ( + defaultScyllaConsistency = "local_quorum" + defaultScyllaTimeout = 2 * time.Second + defaultScyllaNumConns = 4 + + // VersionBucketCount spreads monotonically increasing block-version markers + // across a bounded set of partitions while keeping LastVersion cheap. + VersionBucketCount = 64 +) + +type ScyllaConfig struct { + Hosts []string + Keyspace string + Username string + Password string + Datacenter string + Consistency string + Timeout time.Duration + ConnectTimeout time.Duration + NumConns int +} + +func (c *ScyllaConfig) ApplyDefaults() { + if c.Consistency == "" { + c.Consistency = defaultScyllaConsistency + } + if c.Timeout == 0 { + c.Timeout = defaultScyllaTimeout + } + if c.ConnectTimeout == 0 { + c.ConnectTimeout = defaultScyllaTimeout + } + if c.NumConns == 0 { + c.NumConns = defaultScyllaNumConns + } +} + +func (c *ScyllaConfig) Validate() error { + if len(c.Hosts) == 0 { + return fmt.Errorf("scylla/cassandra hosts are required") + } + for _, host := range c.Hosts { + if strings.TrimSpace(host) == "" { + return fmt.Errorf("scylla/cassandra hosts must not contain blanks") + } + } + if strings.TrimSpace(c.Keyspace) == "" { + return fmt.Errorf("scylla/cassandra keyspace is required") + } + if c.Password != "" && c.Username == "" { + return fmt.Errorf("scylla/cassandra username is required when password is set") + } + if _, err := parseConsistency(c.Consistency); err != nil { + return err + } + if c.Timeout < 0 { + return fmt.Errorf("scylla/cassandra timeout must be non-negative") + } + if c.ConnectTimeout < 0 { + return fmt.Errorf("scylla/cassandra connect timeout must be non-negative") + } + if c.NumConns < 0 { + return fmt.Errorf("scylla/cassandra num conns must be non-negative") + } + return nil +} + +func NewScyllaReader(cfg ScyllaConfig) (Reader, error) { + session, err := OpenScyllaSession(cfg) + if err != nil { + return nil, err + } + return &scyllaReader{session: session}, nil +} + +func OpenScyllaSession(cfg ScyllaConfig) (*gocql.Session, error) { + cfg.ApplyDefaults() + if err := cfg.Validate(); err != nil { + return nil, err + } + consistency, err := parseConsistency(cfg.Consistency) + if err != nil { + return nil, err + } + + cluster := gocql.NewCluster(cfg.Hosts...) + cluster.Keyspace = cfg.Keyspace + cluster.Consistency = consistency + cluster.Timeout = cfg.Timeout + cluster.ConnectTimeout = cfg.ConnectTimeout + cluster.NumConns = cfg.NumConns + if cfg.Username != "" { + cluster.Authenticator = gocql.PasswordAuthenticator{ + Username: cfg.Username, + Password: cfg.Password, + } + } + if cfg.Datacenter != "" { + cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy( + gocql.DCAwareRoundRobinPolicy(cfg.Datacenter), + ) + } + + session, err := cluster.CreateSession() + if err != nil { + return nil, fmt.Errorf("open scylla/cassandra session: %w", err) + } + return session, nil +} + +type scyllaReader struct { + session *gocql.Session +} + +var _ Reader = (*scyllaReader)(nil) + +func (r *scyllaReader) Close() error { + r.session.Close() + return nil +} + +func (r *scyllaReader) LastVersion(ctx context.Context) (int64, error) { + var maxVersion int64 + for bucket := 0; bucket < VersionBucketCount; bucket++ { + var version int64 + err := r.session.Query(selectLatestVersionCQL, bucket).WithContext(ctx).Scan(&version) + if err != nil { + if err == gocql.ErrNotFound { + continue + } + return 0, fmt.Errorf("read latest scylla/cassandra version bucket %d: %w", bucket, err) + } + if version > maxVersion { + maxVersion = version + } + } + return maxVersion, nil +} + +func (r *scyllaReader) Has(ctx context.Context, storeName string, key []byte, targetVersion int64) (bool, error) { + var deleted bool + err := r.session.Query(hasLookupCQL, storeName, key, targetVersion).WithContext(ctx).Scan(&deleted) + if err != nil { + if err == gocql.ErrNotFound { + return false, nil + } + return false, fmt.Errorf("scylla/cassandra has lookup: %w", err) + } + return !deleted, nil +} + +func (r *scyllaReader) Get(ctx context.Context, storeName string, key []byte, targetVersion int64) (Value, error) { + var ( + version int64 + bz []byte + deleted bool + ) + err := r.session.Query(getLookupCQL, storeName, key, targetVersion).WithContext(ctx).Scan(&version, &bz, &deleted) + if err != nil { + if err == gocql.ErrNotFound { + return Value{}, ErrNotFound + } + return Value{}, fmt.Errorf("scylla/cassandra get lookup: %w", err) + } + if deleted { + return Value{}, ErrNotFound + } + return Value{Bytes: bz, Version: version}, nil +} + +func (r *scyllaReader) BatchGet(ctx context.Context, targetVersion int64, lookups []Lookup) (map[Lookup]Value, error) { + out := make(map[Lookup]Value, len(lookups)) + for _, lookup := range lookups { + value, err := r.Get(ctx, lookup.StoreName, []byte(lookup.Key), targetVersion) + if err != nil { + if err == ErrNotFound { + continue + } + return nil, err + } + out[lookup] = value + } + return out, nil +} + +func VersionBucket(version int64) int { + if version < 0 { + version = -version + } + return int(version % VersionBucketCount) +} + +func parseConsistency(name string) (gocql.Consistency, error) { + switch strings.ToLower(strings.TrimSpace(name)) { + case "one": + return gocql.One, nil + case "local_one": + return gocql.LocalOne, nil + case "quorum": + return gocql.Quorum, nil + case "", "local_quorum": + return gocql.LocalQuorum, nil + case "all": + return gocql.All, nil + default: + return gocql.Any, fmt.Errorf("unsupported scylla/cassandra consistency %q", name) + } +} + +const selectLatestVersionCQL = ` +SELECT version +FROM state_versions +WHERE bucket = ? +LIMIT 1` + +const hasLookupCQL = ` +SELECT deleted +FROM state_mutations +WHERE store_name = ? AND state_key = ? AND version <= ? +ORDER BY version DESC +LIMIT 1` + +const getLookupCQL = ` +SELECT version, value, deleted +FROM state_mutations +WHERE store_name = ? AND state_key = ? AND version <= ? +ORDER BY version DESC +LIMIT 1` diff --git a/sei-db/state_db/ss/offload/historical/scylla_test.go b/sei-db/state_db/ss/offload/historical/scylla_test.go new file mode 100644 index 0000000000..8147b44bc4 --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/scylla_test.go @@ -0,0 +1,95 @@ +package historical + +import ( + "strings" + "testing" + "time" + + "github.com/gocql/gocql" + "github.com/stretchr/testify/require" +) + +func TestScyllaConfigApplyDefaults(t *testing.T) { + cfg := ScyllaConfig{ + Hosts: []string{"127.0.0.1"}, + Keyspace: "sei_history", + } + cfg.ApplyDefaults() + require.Equal(t, defaultScyllaConsistency, cfg.Consistency) + require.Equal(t, defaultScyllaTimeout, cfg.Timeout) + require.Equal(t, defaultScyllaTimeout, cfg.ConnectTimeout) + require.Equal(t, defaultScyllaNumConns, cfg.NumConns) +} + +func TestScyllaConfigValidate(t *testing.T) { + tests := []struct { + name string + cfg ScyllaConfig + err string + }{ + {"missing hosts", ScyllaConfig{Keyspace: "ks"}, "hosts"}, + {"blank host", ScyllaConfig{Hosts: []string{" "}, Keyspace: "ks"}, "blanks"}, + {"missing keyspace", ScyllaConfig{Hosts: []string{"127.0.0.1"}}, "keyspace"}, + {"password without username", ScyllaConfig{Hosts: []string{"127.0.0.1"}, Keyspace: "ks", Password: "secret"}, "username"}, + {"bad consistency", ScyllaConfig{Hosts: []string{"127.0.0.1"}, Keyspace: "ks", Consistency: "bad"}, "consistency"}, + {"negative timeout", ScyllaConfig{Hosts: []string{"127.0.0.1"}, Keyspace: "ks", Timeout: -time.Second}, "timeout"}, + {"negative connect timeout", ScyllaConfig{Hosts: []string{"127.0.0.1"}, Keyspace: "ks", ConnectTimeout: -time.Second}, "connect"}, + {"negative conns", ScyllaConfig{Hosts: []string{"127.0.0.1"}, Keyspace: "ks", NumConns: -1}, "conns"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.cfg.Validate() + require.Error(t, err) + require.Contains(t, err.Error(), tc.err) + }) + } +} + +func TestParseConsistency(t *testing.T) { + tests := []struct { + in string + out gocql.Consistency + }{ + {"", gocql.LocalQuorum}, + {"local_quorum", gocql.LocalQuorum}, + {"LOCAL_ONE", gocql.LocalOne}, + {"one", gocql.One}, + {"quorum", gocql.Quorum}, + {"all", gocql.All}, + } + for _, tc := range tests { + t.Run(tc.in, func(t *testing.T) { + got, err := parseConsistency(tc.in) + require.NoError(t, err) + require.Equal(t, tc.out, got) + }) + } +} + +func TestVersionBucket(t *testing.T) { + require.Equal(t, 0, VersionBucket(0)) + require.Equal(t, 1, VersionBucket(1)) + require.Equal(t, 0, VersionBucket(VersionBucketCount)) + require.Equal(t, 1, VersionBucket(-1)) +} + +func TestPointLookupCQLShape(t *testing.T) { + for _, q := range []string{getLookupCQL, hasLookupCQL} { + for _, frag := range []string{ + "FROM state_mutations", + "store_name = ?", + "state_key = ?", + "version <= ?", + "ORDER BY version DESC", + "LIMIT 1", + } { + require.Contains(t, q, frag) + } + } +} + +func TestLatestVersionCQLShape(t *testing.T) { + require.Contains(t, selectLatestVersionCQL, "FROM state_versions") + require.Contains(t, selectLatestVersionCQL, "bucket = ?") + require.True(t, strings.Contains(selectLatestVersionCQL, "LIMIT 1")) +} diff --git a/sei-db/state_db/ss/offload/historical/store.go b/sei-db/state_db/ss/offload/historical/store.go new file mode 100644 index 0000000000..9c0dc975f6 --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/store.go @@ -0,0 +1,96 @@ +package historical + +import ( + "context" + "errors" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + "github.com/sei-protocol/sei-chain/sei-db/proto" +) + +// FallbackStateStore routes pruned point reads to the historical reader. +// Iteration and writes stay on the primary state store. +type FallbackStateStore struct { + primary types.StateStore + reader Reader +} + +var _ types.StateStore = (*FallbackStateStore)(nil) + +// NewFallbackStateStore takes ownership of primary and reader for Close. +func NewFallbackStateStore(primary types.StateStore, reader Reader) *FallbackStateStore { + return &FallbackStateStore{primary: primary, reader: reader} +} + +func (s *FallbackStateStore) shouldFallback(version int64) bool { + earliest := s.primary.GetEarliestVersion() + return earliest > 0 && version < earliest +} + +func (s *FallbackStateStore) Get(storeKey string, version int64, key []byte) ([]byte, error) { + if !s.shouldFallback(version) { + return s.primary.Get(storeKey, version, key) + } + v, err := s.reader.Get(context.Background(), storeKey, key, version) + if err != nil { + if errors.Is(err, ErrNotFound) { + return nil, nil + } + return nil, err + } + return v.Bytes, nil +} + +func (s *FallbackStateStore) Has(storeKey string, version int64, key []byte) (bool, error) { + if !s.shouldFallback(version) { + return s.primary.Has(storeKey, version, key) + } + return s.reader.Has(context.Background(), storeKey, key, version) +} + +func (s *FallbackStateStore) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + return s.primary.Iterator(storeKey, version, start, end) +} + +func (s *FallbackStateStore) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) { + return s.primary.ReverseIterator(storeKey, version, start, end) +} + +func (s *FallbackStateStore) RawIterate(storeKey string, fn func([]byte, []byte, int64) bool) (bool, error) { + return s.primary.RawIterate(storeKey, fn) +} + +func (s *FallbackStateStore) GetLatestVersion() int64 { return s.primary.GetLatestVersion() } + +func (s *FallbackStateStore) SetLatestVersion(version int64) error { + return s.primary.SetLatestVersion(version) +} + +func (s *FallbackStateStore) GetEarliestVersion() int64 { return s.primary.GetEarliestVersion() } + +func (s *FallbackStateStore) SetEarliestVersion(version int64, ignoreVersion bool) error { + return s.primary.SetEarliestVersion(version, ignoreVersion) +} + +func (s *FallbackStateStore) ApplyChangesetSync(version int64, changesets []*proto.NamedChangeSet) error { + return s.primary.ApplyChangesetSync(version, changesets) +} + +func (s *FallbackStateStore) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { + return s.primary.ApplyChangesetAsync(version, changesets) +} + +func (s *FallbackStateStore) Prune(version int64) error { return s.primary.Prune(version) } + +func (s *FallbackStateStore) Import(version int64, ch <-chan types.SnapshotNode) error { + return s.primary.Import(version, ch) +} + +func (s *FallbackStateStore) Close() error { + primaryErr := s.primary.Close() + readerErr := s.reader.Close() + if primaryErr != nil { + return primaryErr + } + return readerErr +} diff --git a/sei-db/state_db/ss/offload/historical/store_test.go b/sei-db/state_db/ss/offload/historical/store_test.go new file mode 100644 index 0000000000..fd7696aed1 --- /dev/null +++ b/sei-db/state_db/ss/offload/historical/store_test.go @@ -0,0 +1,103 @@ +package historical + +import ( + "context" + "testing" + + "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" + "github.com/sei-protocol/sei-chain/sei-db/proto" + "github.com/stretchr/testify/require" +) + +type fakeStateStore struct { + earliest int64 + gets int + has int +} + +func (f *fakeStateStore) Get(_ string, _ int64, _ []byte) ([]byte, error) { + f.gets++ + return []byte("primary"), nil +} + +func (f *fakeStateStore) Has(_ string, _ int64, _ []byte) (bool, error) { + f.has++ + return true, nil +} + +func (f *fakeStateStore) Iterator(string, int64, []byte, []byte) (types.DBIterator, error) { + return nil, nil +} + +func (f *fakeStateStore) ReverseIterator(string, int64, []byte, []byte) (types.DBIterator, error) { + return nil, nil +} + +func (f *fakeStateStore) RawIterate(string, func([]byte, []byte, int64) bool) (bool, error) { + return false, nil +} + +func (f *fakeStateStore) GetLatestVersion() int64 { return 0 } +func (f *fakeStateStore) SetLatestVersion(int64) error { return nil } +func (f *fakeStateStore) GetEarliestVersion() int64 { return f.earliest } +func (f *fakeStateStore) SetEarliestVersion(version int64, _ bool) error { + f.earliest = version + return nil +} +func (f *fakeStateStore) ApplyChangesetSync(int64, []*proto.NamedChangeSet) error { return nil } +func (f *fakeStateStore) ApplyChangesetAsync(int64, []*proto.NamedChangeSet) error { return nil } +func (f *fakeStateStore) Prune(int64) error { return nil } +func (f *fakeStateStore) Import(int64, <-chan types.SnapshotNode) error { return nil } +func (f *fakeStateStore) Close() error { return nil } + +type fakeReader struct { + gets int + has int +} + +func (f *fakeReader) Get(context.Context, string, []byte, int64) (Value, error) { + f.gets++ + return Value{Bytes: []byte("historical"), Version: 7}, nil +} + +func (f *fakeReader) Has(context.Context, string, []byte, int64) (bool, error) { + f.has++ + return true, nil +} + +func (f *fakeReader) BatchGet(context.Context, int64, []Lookup) (map[Lookup]Value, error) { + return nil, nil +} + +func (f *fakeReader) LastVersion(context.Context) (int64, error) { return 0, nil } +func (f *fakeReader) Close() error { return nil } + +func TestFallbackStateStoreRoutesPrunedPointReads(t *testing.T) { + primary := &fakeStateStore{earliest: 10} + reader := &fakeReader{} + store := NewFallbackStateStore(primary, reader) + + value, err := store.Get("bank", 7, []byte("k")) + require.NoError(t, err) + require.Equal(t, []byte("historical"), value) + require.Equal(t, 0, primary.gets) + require.Equal(t, 1, reader.gets) + + ok, err := store.Has("bank", 7, []byte("k")) + require.NoError(t, err) + require.True(t, ok) + require.Equal(t, 0, primary.has) + require.Equal(t, 1, reader.has) +} + +func TestFallbackStateStoreKeepsRecentPointReadsOnPrimary(t *testing.T) { + primary := &fakeStateStore{earliest: 10} + reader := &fakeReader{} + store := NewFallbackStateStore(primary, reader) + + value, err := store.Get("bank", 10, []byte("k")) + require.NoError(t, err) + require.Equal(t, []byte("primary"), value) + require.Equal(t, 1, primary.gets) + require.Equal(t, 0, reader.gets) +} diff --git a/sei-db/state_db/ss/offload/kafka.go b/sei-db/state_db/ss/offload/kafka.go index edbe366818..981cb35e02 100644 --- a/sei-db/state_db/ss/offload/kafka.go +++ b/sei-db/state_db/ss/offload/kafka.go @@ -122,7 +122,7 @@ func NewKafkaStream(cfg KafkaConfig) (Stream, error) { } } - mechanism, err := kafkaSASLMechanism(cfg) + mechanism, err := NewSASLMechanism(cfg) if err != nil { return nil, err } @@ -211,7 +211,8 @@ func kafkaCompression(name string) compress.Compression { } } -func kafkaSASLMechanism(cfg KafkaConfig) (sasl.Mechanism, error) { +// NewSASLMechanism is exported so out-of-package consumers share the producer's auth path. +func NewSASLMechanism(cfg KafkaConfig) (sasl.Mechanism, error) { switch strings.ToLower(cfg.SASLMechanism) { case "", kafkaOptionNone: return nil, nil diff --git a/sei-db/state_db/ss/store.go b/sei-db/state_db/ss/store.go index 0fb4b5184e..dc1ce2edc2 100644 --- a/sei-db/state_db/ss/store.go +++ b/sei-db/state_db/ss/store.go @@ -1,9 +1,14 @@ package ss import ( + "fmt" + "strings" + "time" + "github.com/sei-protocol/sei-chain/sei-db/config" "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/composite" + "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload/historical" ) // NewStateStore creates a CompositeStateStore which handles both Cosmos and EVM data. @@ -11,5 +16,45 @@ import ( // files in the backend package. When WriteMode/ReadMode are both cosmos_only (the default), // the EVM stores are not opened and the composite store behaves identically to a plain cosmos state store. func NewStateStore(homeDir string, ssConfig config.StateStoreConfig) (types.StateStore, error) { - return composite.NewCompositeStateStore(ssConfig, homeDir) + primary, err := composite.NewCompositeStateStore(ssConfig, homeDir) + if err != nil { + return nil, err + } + if !scyllaHistoricalOffloadConfigured(ssConfig) { + return primary, nil + } + reader, err := historical.NewScyllaReader(historical.ScyllaConfig{ + Hosts: splitCSV(ssConfig.HistoricalOffloadScyllaHosts), + Keyspace: ssConfig.HistoricalOffloadScyllaKeyspace, + Username: ssConfig.HistoricalOffloadScyllaUsername, + Password: ssConfig.HistoricalOffloadScyllaPassword, + Datacenter: ssConfig.HistoricalOffloadScyllaDatacenter, + Consistency: ssConfig.HistoricalOffloadScyllaConsistency, + Timeout: time.Duration(ssConfig.HistoricalOffloadScyllaTimeoutMS) * time.Millisecond, + }) + if err != nil { + _ = primary.Close() + return nil, fmt.Errorf("open scylla/cassandra historical offload reader: %w", err) + } + return historical.NewFallbackStateStore(primary, reader), nil +} + +func scyllaHistoricalOffloadConfigured(cfg config.StateStoreConfig) bool { + return strings.TrimSpace(cfg.HistoricalOffloadScyllaHosts) != "" || + strings.TrimSpace(cfg.HistoricalOffloadScyllaKeyspace) != "" +} + +func splitCSV(value string) []string { + if strings.TrimSpace(value) == "" { + return nil + } + parts := strings.Split(value, ",") + out := make([]string, 0, len(parts)) + for _, part := range parts { + part = strings.TrimSpace(part) + if part != "" { + out = append(out, part) + } + } + return out } From 32b55d2b0c910d546f054af7c11fdba7fddfd2f3 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 12 May 2026 12:02:48 -0400 Subject: [PATCH 2/8] Avoid retry timer leak in Scylla consumer --- sei-db/state_db/ss/offload/consumer/consumer.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/sei-db/state_db/ss/offload/consumer/consumer.go b/sei-db/state_db/ss/offload/consumer/consumer.go index a0adaaca46..a04c42e75e 100644 --- a/sei-db/state_db/ss/offload/consumer/consumer.go +++ b/sei-db/state_db/ss/offload/consumer/consumer.go @@ -257,10 +257,8 @@ func (c *Consumer) writeBatchWithRetry(ctx context.Context, records []Record) er } c.logf("sink write attempt %d/%d failed: %v; retrying in %s", attempt, c.maxAttempts, err, backoff) - select { - case <-time.After(backoff): - case <-ctx.Done(): - return ctx.Err() + if err := sleepWithContext(ctx, backoff); err != nil { + return err } backoff *= 2 if backoff > c.maxBackoff { @@ -270,6 +268,17 @@ func (c *Consumer) writeBatchWithRetry(ctx context.Context, records []Record) er return fmt.Errorf("sink write failed after %d attempts: %w", c.maxAttempts, lastErr) } +func sleepWithContext(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + func (c *Consumer) writeRecords(ctx context.Context, records []Record) error { if len(records) == 0 { return nil From 6c6f5890114f4664fdd08e08dba51cfd2d7776ae Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 12 May 2026 12:20:53 -0400 Subject: [PATCH 3/8] Parallelize Scylla mutation ingest --- sei-db/state_db/ss/offload/consumer/README.md | 3 +- .../consumer/config/example-scylla.json | 3 +- sei-db/state_db/ss/offload/consumer/scylla.go | 94 ++++++++++++++----- .../ss/offload/consumer/scylla_test.go | 89 ++++++++++++++++++ 4 files changed, 166 insertions(+), 23 deletions(-) diff --git a/sei-db/state_db/ss/offload/consumer/README.md b/sei-db/state_db/ss/offload/consumer/README.md index 9ccc28b29d..185920d783 100644 --- a/sei-db/state_db/ss/offload/consumer/README.md +++ b/sei-db/state_db/ss/offload/consumer/README.md @@ -36,7 +36,8 @@ factors before applying it. The consumer reads historical offload changelog messages from Kafka and writes them into Scylla/Cassandra. Kafka offsets are committed only after the sink -write succeeds. +write succeeds. Within each block, mutation rows are written with bounded +concurrency and the version marker is written last. ```bash go run ./sei-db/state_db/ss/offload/consumer/cmd/historical-scylla-consumer \ diff --git a/sei-db/state_db/ss/offload/consumer/config/example-scylla.json b/sei-db/state_db/ss/offload/consumer/config/example-scylla.json index 94779c83e9..013217af75 100644 --- a/sei-db/state_db/ss/offload/consumer/config/example-scylla.json +++ b/sei-db/state_db/ss/offload/consumer/config/example-scylla.json @@ -12,7 +12,8 @@ "Consistency": "local_quorum", "TimeoutMS": 2000, "ConnectTimeoutMS": 2000, - "NumConns": 4 + "NumConns": 4, + "MutationWorkers": 16 }, "Workers": 16, "ShardBufferSize": 128, diff --git a/sei-db/state_db/ss/offload/consumer/scylla.go b/sei-db/state_db/ss/offload/consumer/scylla.go index c16744662c..ebe0b276eb 100644 --- a/sei-db/state_db/ss/offload/consumer/scylla.go +++ b/sei-db/state_db/ss/offload/consumer/scylla.go @@ -6,11 +6,14 @@ import ( "time" "github.com/gocql/gocql" + "golang.org/x/sync/errgroup" "github.com/sei-protocol/sei-chain/sei-db/proto" "github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload/historical" ) +const defaultScyllaMutationWorkers = 16 + type ScyllaConfig struct { Hosts []string Keyspace string @@ -21,6 +24,7 @@ type ScyllaConfig struct { TimeoutMS int ConnectTimeoutMS int NumConns int + MutationWorkers int } func (c *ScyllaConfig) ApplyDefaults() { @@ -30,12 +34,21 @@ func (c *ScyllaConfig) ApplyDefaults() { c.TimeoutMS = int(cfg.Timeout / time.Millisecond) c.ConnectTimeoutMS = int(cfg.ConnectTimeout / time.Millisecond) c.NumConns = cfg.NumConns + if c.MutationWorkers == 0 { + c.MutationWorkers = defaultScyllaMutationWorkers + } } func (c *ScyllaConfig) Validate() error { cfg := c.toHistorical() cfg.ApplyDefaults() - return cfg.Validate() + if err := cfg.Validate(); err != nil { + return err + } + if c.MutationWorkers < 0 { + return fmt.Errorf("scylla/cassandra mutation workers must be non-negative") + } + return nil } func (c ScyllaConfig) toHistorical() historical.ScyllaConfig { @@ -53,22 +66,34 @@ func (c ScyllaConfig) toHistorical() historical.ScyllaConfig { } type scyllaSink struct { - session *gocql.Session + session *gocql.Session + exec scyllaExecFunc + mutationWorkers int } var _ Sink = (*scyllaSink)(nil) var _ BatchSink = (*scyllaSink)(nil) func NewScyllaSink(cfg ScyllaConfig) (Sink, error) { + cfg.ApplyDefaults() + if err := cfg.Validate(); err != nil { + return nil, err + } session, err := historical.OpenScyllaSession(cfg.toHistorical()) if err != nil { return nil, err } - return &scyllaSink{session: session}, nil + return &scyllaSink{ + session: session, + exec: sessionExec(session), + mutationWorkers: cfg.MutationWorkers, + }, nil } func (s *scyllaSink) Close() error { - s.session.Close() + if s.session != nil { + s.session.Close() + } return nil } @@ -124,61 +149,88 @@ func (s *scyllaSink) writeRecord(ctx context.Context, rec Record) error { return nil } version := entry.Version - for _, ncs := range entry.Changesets { - for _, pair := range ncs.Changeset.Pairs { - if err := s.writeMutation(ctx, version, ncs.Name, pair); err != nil { - return err - } - } - } - for _, up := range entry.Upgrades { - if err := s.writeUpgrade(ctx, version, up); err != nil { - return err - } + if err := s.writeRecordRows(ctx, version, entry); err != nil { + return err } - if err := s.session.Query(insertVersionCQL, + if err := s.exec(ctx, insertVersionCQL, historical.VersionBucket(version), version, rec.Topic, rec.Partition, rec.Offset, time.Now(), - ).WithContext(ctx).Exec(); err != nil { + ); err != nil { return fmt.Errorf("insert scylla/cassandra version %d: %w", version, err) } return nil } +func (s *scyllaSink) writeRecordRows(ctx context.Context, version int64, entry *proto.ChangelogEntry) error { + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(s.effectiveMutationWorkers()) + for _, ncs := range entry.Changesets { + storeName := ncs.Name + for _, pair := range ncs.Changeset.Pairs { + pair := pair + g.Go(func() error { + return s.writeMutation(gctx, version, storeName, pair) + }) + } + } + for _, up := range entry.Upgrades { + up := up + g.Go(func() error { + return s.writeUpgrade(gctx, version, up) + }) + } + return g.Wait() +} + +func (s *scyllaSink) effectiveMutationWorkers() int { + if s.mutationWorkers <= 0 { + return defaultScyllaMutationWorkers + } + return s.mutationWorkers +} + func (s *scyllaSink) writeMutation(ctx context.Context, version int64, storeName string, pair *proto.KVPair) error { deleted := pair.Delete || pair.Value == nil value := pair.Value if deleted { value = nil } - if err := s.session.Query(insertMutationCQL, + if err := s.exec(ctx, insertMutationCQL, storeName, pair.Key, version, value, deleted, - ).WithContext(ctx).Exec(); err != nil { + ); err != nil { return fmt.Errorf("insert scylla/cassandra mutation store=%s version=%d: %w", storeName, version, err) } return nil } func (s *scyllaSink) writeUpgrade(ctx context.Context, version int64, up *proto.TreeNameUpgrade) error { - if err := s.session.Query(insertUpgradeCQL, + if err := s.exec(ctx, insertUpgradeCQL, version, up.Name, up.RenameFrom, up.Delete, - ).WithContext(ctx).Exec(); err != nil { + ); err != nil { return fmt.Errorf("insert scylla/cassandra tree upgrade version=%d name=%s: %w", version, up.Name, err) } return nil } +type scyllaExecFunc func(ctx context.Context, stmt string, values ...interface{}) error + +func sessionExec(session *gocql.Session) scyllaExecFunc { + return func(ctx context.Context, stmt string, values ...interface{}) error { + return session.Query(stmt, values...).WithContext(ctx).Exec() + } +} + const selectLatestVersionCQL = ` SELECT version FROM state_versions diff --git a/sei-db/state_db/ss/offload/consumer/scylla_test.go b/sei-db/state_db/ss/offload/consumer/scylla_test.go index b80b409aa8..60db6acf9a 100644 --- a/sei-db/state_db/ss/offload/consumer/scylla_test.go +++ b/sei-db/state_db/ss/offload/consumer/scylla_test.go @@ -1,8 +1,11 @@ package consumer import ( + "context" "strings" + "sync/atomic" "testing" + "time" "github.com/sei-protocol/sei-chain/sei-db/proto" "github.com/stretchr/testify/require" @@ -17,6 +20,10 @@ func TestScyllaConfigValidate(t *testing.T) { cfg.TimeoutMS = -1 require.ErrorContains(t, cfg.Validate(), "timeout") + + cfg.TimeoutMS = 0 + cfg.MutationWorkers = -1 + require.ErrorContains(t, cfg.Validate(), "mutation workers") } func TestScyllaConfigApplyDefaults(t *testing.T) { @@ -29,6 +36,7 @@ func TestScyllaConfigApplyDefaults(t *testing.T) { require.Equal(t, 2000, cfg.TimeoutMS) require.Equal(t, 2000, cfg.ConnectTimeoutMS) require.Equal(t, 4, cfg.NumConns) + require.Equal(t, 16, cfg.MutationWorkers) } func TestCompactRecordsDropsNilEntries(t *testing.T) { @@ -66,3 +74,84 @@ func TestScyllaCQLShape(t *testing.T) { } require.True(t, strings.Contains(selectLatestVersionCQL, "LIMIT 1")) } + +func TestScyllaSinkWritesRowsConcurrentlyBeforeVersionMarker(t *testing.T) { + rowStarted := make(chan struct{}, 8) + releaseRows := make(chan struct{}) + var activeRows atomic.Int32 + var sawConcurrentRows atomic.Bool + var markerBeforeRowsDone atomic.Bool + var versionMarkers atomic.Int32 + + sink := &scyllaSink{ + mutationWorkers: 2, + exec: func(ctx context.Context, stmt string, _ ...interface{}) error { + if strings.Contains(stmt, "state_versions") { + if activeRows.Load() != 0 { + markerBeforeRowsDone.Store(true) + } + versionMarkers.Add(1) + return nil + } + if activeRows.Add(1) > 1 { + sawConcurrentRows.Store(true) + } + rowStarted <- struct{}{} + select { + case <-releaseRows: + case <-ctx.Done(): + activeRows.Add(-1) + return ctx.Err() + } + activeRows.Add(-1) + return nil + }, + } + entry := &proto.ChangelogEntry{ + Version: 7, + Changesets: []*proto.NamedChangeSet{{ + Name: "bank", + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + {Key: []byte("k1"), Value: []byte("v1")}, + {Key: []byte("k2"), Value: []byte("v2")}, + {Key: []byte("k3"), Value: []byte("v3")}, + }}, + }}, + Upgrades: []*proto.TreeNameUpgrade{{Name: "new-store"}}, + } + + errCh := make(chan error, 1) + go func() { + errCh <- sink.writeRecord(context.Background(), Record{Topic: "t", Partition: 1, Offset: 2, Entry: entry}) + }() + + releaseClosed := false + closeRelease := func() { + if !releaseClosed { + close(releaseRows) + releaseClosed = true + } + } + defer closeRelease() + + for i := 0; i < 2; i++ { + select { + case <-rowStarted: + case <-time.After(time.Second): + closeRelease() + t.Fatal("timed out waiting for concurrent row writes") + } + } + require.True(t, sawConcurrentRows.Load()) + require.Equal(t, int32(0), versionMarkers.Load(), "version marker must wait for row writes") + + closeRelease() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timed out waiting for record write") + } + require.False(t, markerBeforeRowsDone.Load()) + require.Equal(t, int32(1), versionMarkers.Load()) +} From 4fcd2421f4fd638195aea8c949d640509422ca1f Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Tue, 12 May 2026 12:56:20 -0400 Subject: [PATCH 4/8] Parallelize Scylla historical batch reads --- .../state_db/ss/offload/historical/scylla.go | 74 +++++++++++++------ .../ss/offload/historical/scylla_test.go | 73 ++++++++++++++++++ 2 files changed, 125 insertions(+), 22 deletions(-) diff --git a/sei-db/state_db/ss/offload/historical/scylla.go b/sei-db/state_db/ss/offload/historical/scylla.go index 5d48c8a778..bcb98cebba 100644 --- a/sei-db/state_db/ss/offload/historical/scylla.go +++ b/sei-db/state_db/ss/offload/historical/scylla.go @@ -2,17 +2,21 @@ package historical import ( "context" + "errors" "fmt" "strings" + "sync" "time" "github.com/gocql/gocql" + "golang.org/x/sync/errgroup" ) const ( defaultScyllaConsistency = "local_quorum" defaultScyllaTimeout = 2 * time.Second defaultScyllaNumConns = 4 + defaultScyllaReadWorkers = 16 // VersionBucketCount spreads monotonically increasing block-version markers // across a bounded set of partitions while keeping LastVersion cheap. @@ -81,7 +85,10 @@ func NewScyllaReader(cfg ScyllaConfig) (Reader, error) { if err != nil { return nil, err } - return &scyllaReader{session: session}, nil + return &scyllaReader{ + session: session, + get: sessionGet(session), + }, nil } func OpenScyllaSession(cfg ScyllaConfig) (*gocql.Session, error) { @@ -121,12 +128,15 @@ func OpenScyllaSession(cfg ScyllaConfig) (*gocql.Session, error) { type scyllaReader struct { session *gocql.Session + get scyllaGetFunc } var _ Reader = (*scyllaReader)(nil) func (r *scyllaReader) Close() error { - r.session.Close() + if r.session != nil { + r.session.Close() + } return nil } @@ -161,35 +171,55 @@ func (r *scyllaReader) Has(ctx context.Context, storeName string, key []byte, ta } func (r *scyllaReader) Get(ctx context.Context, storeName string, key []byte, targetVersion int64) (Value, error) { - var ( - version int64 - bz []byte - deleted bool - ) - err := r.session.Query(getLookupCQL, storeName, key, targetVersion).WithContext(ctx).Scan(&version, &bz, &deleted) - if err != nil { - if err == gocql.ErrNotFound { + return r.get(ctx, storeName, key, targetVersion) +} + +func sessionGet(session *gocql.Session) scyllaGetFunc { + return func(ctx context.Context, storeName string, key []byte, targetVersion int64) (Value, error) { + var ( + version int64 + bz []byte + deleted bool + ) + err := session.Query(getLookupCQL, storeName, key, targetVersion).WithContext(ctx).Scan(&version, &bz, &deleted) + if err != nil { + if err == gocql.ErrNotFound { + return Value{}, ErrNotFound + } + return Value{}, fmt.Errorf("scylla/cassandra get lookup: %w", err) + } + if deleted { return Value{}, ErrNotFound } - return Value{}, fmt.Errorf("scylla/cassandra get lookup: %w", err) - } - if deleted { - return Value{}, ErrNotFound + return Value{Bytes: bz, Version: version}, nil } - return Value{Bytes: bz, Version: version}, nil } +type scyllaGetFunc func(ctx context.Context, storeName string, key []byte, targetVersion int64) (Value, error) + func (r *scyllaReader) BatchGet(ctx context.Context, targetVersion int64, lookups []Lookup) (map[Lookup]Value, error) { out := make(map[Lookup]Value, len(lookups)) + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(defaultScyllaReadWorkers) + var mu sync.Mutex for _, lookup := range lookups { - value, err := r.Get(ctx, lookup.StoreName, []byte(lookup.Key), targetVersion) - if err != nil { - if err == ErrNotFound { - continue + lookup := lookup + g.Go(func() error { + value, err := r.Get(gctx, lookup.StoreName, []byte(lookup.Key), targetVersion) + if err != nil { + if errors.Is(err, ErrNotFound) { + return nil + } + return err } - return nil, err - } - out[lookup] = value + mu.Lock() + out[lookup] = value + mu.Unlock() + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err } return out, nil } diff --git a/sei-db/state_db/ss/offload/historical/scylla_test.go b/sei-db/state_db/ss/offload/historical/scylla_test.go index 8147b44bc4..2c5f3a8d42 100644 --- a/sei-db/state_db/ss/offload/historical/scylla_test.go +++ b/sei-db/state_db/ss/offload/historical/scylla_test.go @@ -1,7 +1,9 @@ package historical import ( + "context" "strings" + "sync/atomic" "testing" "time" @@ -93,3 +95,74 @@ func TestLatestVersionCQLShape(t *testing.T) { require.Contains(t, selectLatestVersionCQL, "bucket = ?") require.True(t, strings.Contains(selectLatestVersionCQL, "LIMIT 1")) } + +func TestScyllaReaderBatchGetParallelizesLookups(t *testing.T) { + started := make(chan string, 4) + release := make(chan struct{}) + var active atomic.Int32 + var sawConcurrent atomic.Bool + + reader := &scyllaReader{ + get: func(ctx context.Context, _ string, key []byte, targetVersion int64) (Value, error) { + if active.Add(1) > 1 { + sawConcurrent.Store(true) + } + defer active.Add(-1) + keyString := string(key) + started <- keyString + select { + case <-release: + case <-ctx.Done(): + return Value{}, ctx.Err() + } + if keyString == "missing" { + return Value{}, ErrNotFound + } + return Value{Bytes: []byte("value-" + keyString), Version: targetVersion - 1}, nil + }, + } + + errCh := make(chan error, 1) + var got map[Lookup]Value + lookups := []Lookup{ + {StoreName: "bank", Key: "k1"}, + {StoreName: "bank", Key: "missing"}, + {StoreName: "evm", Key: "k2"}, + } + go func() { + var err error + got, err = reader.BatchGet(context.Background(), 10, lookups) + errCh <- err + }() + + releaseClosed := false + closeRelease := func() { + if !releaseClosed { + close(release) + releaseClosed = true + } + } + defer closeRelease() + + for i := 0; i < 2; i++ { + select { + case <-started: + case <-time.After(time.Second): + closeRelease() + t.Fatal("timed out waiting for concurrent lookups") + } + } + require.True(t, sawConcurrent.Load()) + + closeRelease() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timed out waiting for batch get") + } + require.Len(t, got, 2) + require.Equal(t, []byte("value-k1"), got[lookups[0]].Bytes) + require.Equal(t, []byte("value-k2"), got[lookups[2]].Bytes) + require.NotContains(t, got, lookups[1]) +} From 0fa1aef5c03e79b41b23344e30d5a67a939bf004 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Wed, 13 May 2026 10:08:22 -0400 Subject: [PATCH 5/8] Cache Scylla historical point reads --- .../state_db/ss/offload/historical/store.go | 43 ++++++++++++++++++- .../ss/offload/historical/store_test.go | 21 +++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/sei-db/state_db/ss/offload/historical/store.go b/sei-db/state_db/ss/offload/historical/store.go index 9c0dc975f6..90dc7abdac 100644 --- a/sei-db/state_db/ss/offload/historical/store.go +++ b/sei-db/state_db/ss/offload/historical/store.go @@ -1,25 +1,43 @@ package historical import ( + "bytes" "context" "errors" + lru "github.com/hashicorp/golang-lru/v2" "github.com/sei-protocol/sei-chain/sei-db/db_engine/types" "github.com/sei-protocol/sei-chain/sei-db/proto" ) +const ( + defaultHistoricalReadCacheEntries = 64 * 1024 + maxHistoricalReadCacheValueBytes = 64 * 1024 +) + +type historicalReadCacheKey struct { + storeKey string + version int64 + key string +} + // FallbackStateStore routes pruned point reads to the historical reader. // Iteration and writes stay on the primary state store. type FallbackStateStore struct { primary types.StateStore reader Reader + cache *lru.Cache[historicalReadCacheKey, []byte] } var _ types.StateStore = (*FallbackStateStore)(nil) // NewFallbackStateStore takes ownership of primary and reader for Close. func NewFallbackStateStore(primary types.StateStore, reader Reader) *FallbackStateStore { - return &FallbackStateStore{primary: primary, reader: reader} + cache, err := lru.New[historicalReadCacheKey, []byte](defaultHistoricalReadCacheEntries) + if err != nil { + panic(err) + } + return &FallbackStateStore{primary: primary, reader: reader, cache: cache} } func (s *FallbackStateStore) shouldFallback(version int64) bool { @@ -31,6 +49,10 @@ func (s *FallbackStateStore) Get(storeKey string, version int64, key []byte) ([] if !s.shouldFallback(version) { return s.primary.Get(storeKey, version, key) } + cacheKey := historicalReadCacheKey{storeKey: storeKey, version: version, key: string(key)} + if value, ok := s.getCached(cacheKey); ok { + return value, nil + } v, err := s.reader.Get(context.Background(), storeKey, key, version) if err != nil { if errors.Is(err, ErrNotFound) { @@ -38,9 +60,28 @@ func (s *FallbackStateStore) Get(storeKey string, version int64, key []byte) ([] } return nil, err } + s.cacheValue(cacheKey, v.Bytes) return v.Bytes, nil } +func (s *FallbackStateStore) getCached(key historicalReadCacheKey) ([]byte, bool) { + if s.cache == nil { + return nil, false + } + value, ok := s.cache.Get(key) + if !ok { + return nil, false + } + return bytes.Clone(value), true +} + +func (s *FallbackStateStore) cacheValue(key historicalReadCacheKey, value []byte) { + if s.cache == nil || value == nil || len(value) > maxHistoricalReadCacheValueBytes { + return + } + s.cache.Add(key, bytes.Clone(value)) +} + func (s *FallbackStateStore) Has(storeKey string, version int64, key []byte) (bool, error) { if !s.shouldFallback(version) { return s.primary.Has(storeKey, version, key) diff --git a/sei-db/state_db/ss/offload/historical/store_test.go b/sei-db/state_db/ss/offload/historical/store_test.go index fd7696aed1..1e3cbfea57 100644 --- a/sei-db/state_db/ss/offload/historical/store_test.go +++ b/sei-db/state_db/ss/offload/historical/store_test.go @@ -101,3 +101,24 @@ func TestFallbackStateStoreKeepsRecentPointReadsOnPrimary(t *testing.T) { require.Equal(t, 1, primary.gets) require.Equal(t, 0, reader.gets) } + +func TestFallbackStateStoreCachesHistoricalPointReads(t *testing.T) { + primary := &fakeStateStore{earliest: 10} + reader := &fakeReader{} + store := NewFallbackStateStore(primary, reader) + + value, err := store.Get("bank", 7, []byte("k")) + require.NoError(t, err) + require.Equal(t, []byte("historical"), value) + value[0] = 'H' + + value, err = store.Get("bank", 7, []byte("k")) + require.NoError(t, err) + require.Equal(t, []byte("historical"), value) + value[0] = 'H' + + value, err = store.Get("bank", 7, []byte("k")) + require.NoError(t, err) + require.Equal(t, []byte("historical"), value) + require.Equal(t, 1, reader.gets) +} From 5ef70bafd2bcc277ecedd37aef8e10082fd7faf2 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Wed, 13 May 2026 10:31:25 -0400 Subject: [PATCH 6/8] Enable token-aware Scylla routing --- sei-db/state_db/ss/offload/historical/scylla.go | 14 +++++++++----- .../ss/offload/historical/scylla_test.go | 17 +++++++++++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/sei-db/state_db/ss/offload/historical/scylla.go b/sei-db/state_db/ss/offload/historical/scylla.go index bcb98cebba..686908c343 100644 --- a/sei-db/state_db/ss/offload/historical/scylla.go +++ b/sei-db/state_db/ss/offload/historical/scylla.go @@ -113,11 +113,7 @@ func OpenScyllaSession(cfg ScyllaConfig) (*gocql.Session, error) { Password: cfg.Password, } } - if cfg.Datacenter != "" { - cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy( - gocql.DCAwareRoundRobinPolicy(cfg.Datacenter), - ) - } + cluster.PoolConfig.HostSelectionPolicy = scyllaHostSelectionPolicy(cfg.Datacenter) session, err := cluster.CreateSession() if err != nil { @@ -126,6 +122,14 @@ func OpenScyllaSession(cfg ScyllaConfig) (*gocql.Session, error) { return session, nil } +func scyllaHostSelectionPolicy(datacenter string) gocql.HostSelectionPolicy { + datacenter = strings.TrimSpace(datacenter) + if datacenter != "" { + return gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy(datacenter)) + } + return gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) +} + type scyllaReader struct { session *gocql.Session get scyllaGetFunc diff --git a/sei-db/state_db/ss/offload/historical/scylla_test.go b/sei-db/state_db/ss/offload/historical/scylla_test.go index 2c5f3a8d42..2699cdf3ea 100644 --- a/sei-db/state_db/ss/offload/historical/scylla_test.go +++ b/sei-db/state_db/ss/offload/historical/scylla_test.go @@ -2,6 +2,7 @@ package historical import ( "context" + "reflect" "strings" "sync/atomic" "testing" @@ -68,6 +69,22 @@ func TestParseConsistency(t *testing.T) { } } +func TestScyllaHostSelectionPolicyIsTokenAware(t *testing.T) { + for _, tc := range []struct { + name string + datacenter string + }{ + {"no datacenter", ""}, + {"with datacenter", "dc1"}, + } { + t.Run(tc.name, func(t *testing.T) { + policy := scyllaHostSelectionPolicy(tc.datacenter) + require.NotNil(t, policy) + require.Contains(t, reflect.TypeOf(policy).String(), "tokenAware") + }) + } +} + func TestVersionBucket(t *testing.T) { require.Equal(t, 0, VersionBucket(0)) require.Equal(t, 1, VersionBucket(1)) From 37d0086494da7beef1475edfa1e4c79f30a6c604 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Wed, 13 May 2026 15:57:40 -0400 Subject: [PATCH 7/8] Compact duplicate Scylla mutation writes --- sei-db/state_db/ss/offload/consumer/scylla.go | 41 +++++++++++++--- .../ss/offload/consumer/scylla_test.go | 49 +++++++++++++++++++ 2 files changed, 82 insertions(+), 8 deletions(-) diff --git a/sei-db/state_db/ss/offload/consumer/scylla.go b/sei-db/state_db/ss/offload/consumer/scylla.go index ebe0b276eb..fe030c6901 100644 --- a/sei-db/state_db/ss/offload/consumer/scylla.go +++ b/sei-db/state_db/ss/offload/consumer/scylla.go @@ -168,14 +168,11 @@ func (s *scyllaSink) writeRecord(ctx context.Context, rec Record) error { func (s *scyllaSink) writeRecordRows(ctx context.Context, version int64, entry *proto.ChangelogEntry) error { g, gctx := errgroup.WithContext(ctx) g.SetLimit(s.effectiveMutationWorkers()) - for _, ncs := range entry.Changesets { - storeName := ncs.Name - for _, pair := range ncs.Changeset.Pairs { - pair := pair - g.Go(func() error { - return s.writeMutation(gctx, version, storeName, pair) - }) - } + for _, mutation := range compactMutations(entry) { + mutation := mutation + g.Go(func() error { + return s.writeMutation(gctx, version, mutation.storeName, mutation.pair) + }) } for _, up := range entry.Upgrades { up := up @@ -186,6 +183,34 @@ func (s *scyllaSink) writeRecordRows(ctx context.Context, version int64, entry * return g.Wait() } +type scyllaMutation struct { + storeName string + pair *proto.KVPair +} + +type scyllaMutationKey struct { + storeName string + key string +} + +func compactMutations(entry *proto.ChangelogEntry) []scyllaMutation { + mutations := make([]scyllaMutation, 0) + indexByKey := make(map[scyllaMutationKey]int) + for _, ncs := range entry.Changesets { + storeName := ncs.Name + for _, pair := range ncs.Changeset.Pairs { + key := scyllaMutationKey{storeName: storeName, key: string(pair.Key)} + if idx, ok := indexByKey[key]; ok { + mutations[idx].pair = pair + continue + } + indexByKey[key] = len(mutations) + mutations = append(mutations, scyllaMutation{storeName: storeName, pair: pair}) + } + } + return mutations +} + func (s *scyllaSink) effectiveMutationWorkers() int { if s.mutationWorkers <= 0 { return defaultScyllaMutationWorkers diff --git a/sei-db/state_db/ss/offload/consumer/scylla_test.go b/sei-db/state_db/ss/offload/consumer/scylla_test.go index 60db6acf9a..1c2d3b1645 100644 --- a/sei-db/state_db/ss/offload/consumer/scylla_test.go +++ b/sei-db/state_db/ss/offload/consumer/scylla_test.go @@ -3,6 +3,7 @@ package consumer import ( "context" "strings" + "sync" "sync/atomic" "testing" "time" @@ -155,3 +156,51 @@ func TestScyllaSinkWritesRowsConcurrentlyBeforeVersionMarker(t *testing.T) { require.False(t, markerBeforeRowsDone.Load()) require.Equal(t, int32(1), versionMarkers.Load()) } + +func TestScyllaSinkCompactsDuplicateMutations(t *testing.T) { + type write struct { + value []byte + deleted bool + } + var mu sync.Mutex + writes := make(map[string]write) + sink := &scyllaSink{ + mutationWorkers: 1, + exec: func(_ context.Context, stmt string, values ...interface{}) error { + if !strings.Contains(stmt, "state_mutations") { + return nil + } + storeName := values[0].(string) + key := string(values[1].([]byte)) + value := values[3].([]byte) + deleted := values[4].(bool) + mu.Lock() + writes[storeName+"/"+key] = write{value: value, deleted: deleted} + mu.Unlock() + return nil + }, + } + entry := &proto.ChangelogEntry{ + Version: 9, + Changesets: []*proto.NamedChangeSet{{ + Name: "bank", + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + {Key: []byte("k"), Value: []byte("old")}, + {Key: []byte("drop"), Value: []byte("present")}, + {Key: []byte("k"), Value: []byte("new")}, + {Key: []byte("drop"), Delete: true}, + }}, + }, { + Name: "evm", + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{ + {Key: []byte("k"), Value: []byte("separate-store")}, + }}, + }}, + } + + require.NoError(t, sink.writeRecordRows(context.Background(), entry.Version, entry)) + require.Len(t, writes, 3) + require.Equal(t, write{value: []byte("new")}, writes["bank/k"]) + require.Equal(t, write{deleted: true}, writes["bank/drop"]) + require.Equal(t, write{value: []byte("separate-store")}, writes["evm/k"]) +} From 4a14131fc1d9b3e311819937293982e1a150fe48 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Wed, 13 May 2026 16:07:31 -0400 Subject: [PATCH 8/8] Pipeline Scylla batch row writes --- sei-db/state_db/ss/offload/consumer/scylla.go | 50 +++++++- .../ss/offload/consumer/scylla_test.go | 121 ++++++++++++++++++ 2 files changed, 166 insertions(+), 5 deletions(-) diff --git a/sei-db/state_db/ss/offload/consumer/scylla.go b/sei-db/state_db/ss/offload/consumer/scylla.go index fe030c6901..bcf061c7b0 100644 --- a/sei-db/state_db/ss/offload/consumer/scylla.go +++ b/sei-db/state_db/ss/offload/consumer/scylla.go @@ -120,12 +120,14 @@ func (s *scyllaSink) Write(ctx context.Context, rec Record) error { } func (s *scyllaSink) WriteBatch(ctx context.Context, records []Record) error { - for _, rec := range compactRecords(records) { - if err := s.writeRecord(ctx, rec); err != nil { - return err - } + records = compactRecords(records) + if len(records) == 0 { + return nil } - return nil + if len(records) == 1 { + return s.writeRecord(ctx, records[0]) + } + return s.writeRecordsPipelined(ctx, records) } func compactRecords(records []Record) []Record { @@ -152,6 +154,11 @@ func (s *scyllaSink) writeRecord(ctx context.Context, rec Record) error { if err := s.writeRecordRows(ctx, version, entry); err != nil { return err } + return s.writeVersionMarker(ctx, rec) +} + +func (s *scyllaSink) writeVersionMarker(ctx context.Context, rec Record) error { + version := rec.Entry.Version if err := s.exec(ctx, insertVersionCQL, historical.VersionBucket(version), version, @@ -165,6 +172,39 @@ func (s *scyllaSink) writeRecord(ctx context.Context, rec Record) error { return nil } +func (s *scyllaSink) writeRecordsPipelined(ctx context.Context, records []Record) error { + rowCtx, cancel := context.WithCancel(ctx) + defer cancel() + g, gctx := errgroup.WithContext(rowCtx) + rowDone := make([]chan error, len(records)) + for i := range records { + rowDone[i] = make(chan error, 1) + i := i + rec := records[i] + g.Go(func() error { + err := s.writeRecordRows(gctx, rec.Entry.Version, rec.Entry) + if err != nil { + err = fmt.Errorf("write scylla/cassandra rows version %d: %w", rec.Entry.Version, err) + } + rowDone[i] <- err + return err + }) + } + for i, rec := range records { + if err := <-rowDone[i]; err != nil { + cancel() + _ = g.Wait() + return err + } + if err := s.writeVersionMarker(ctx, rec); err != nil { + cancel() + _ = g.Wait() + return err + } + } + return g.Wait() +} + func (s *scyllaSink) writeRecordRows(ctx context.Context, version int64, entry *proto.ChangelogEntry) error { g, gctx := errgroup.WithContext(ctx) g.SetLimit(s.effectiveMutationWorkers()) diff --git a/sei-db/state_db/ss/offload/consumer/scylla_test.go b/sei-db/state_db/ss/offload/consumer/scylla_test.go index 1c2d3b1645..8516ed6e74 100644 --- a/sei-db/state_db/ss/offload/consumer/scylla_test.go +++ b/sei-db/state_db/ss/offload/consumer/scylla_test.go @@ -204,3 +204,124 @@ func TestScyllaSinkCompactsDuplicateMutations(t *testing.T) { require.Equal(t, write{deleted: true}, writes["bank/drop"]) require.Equal(t, write{value: []byte("separate-store")}, writes["evm/k"]) } + +func TestScyllaSinkWriteBatchPipelinesRowsAndOrdersMarkers(t *testing.T) { + rowStarted := make(chan int64, 2) + markerWritten := make(chan int64, 2) + releaseRows := map[int64]chan struct{}{ + 1: make(chan struct{}), + 2: make(chan struct{}), + } + var activeRows atomic.Int32 + var sawConcurrentRows atomic.Bool + var mu sync.Mutex + rowsDone := make(map[int64]bool) + var markers []int64 + var markerBeforeRowsDone bool + + sink := &scyllaSink{ + mutationWorkers: 1, + exec: func(ctx context.Context, stmt string, values ...interface{}) error { + switch { + case strings.Contains(stmt, "state_mutations"): + version := values[2].(int64) + if activeRows.Add(1) > 1 { + sawConcurrentRows.Store(true) + } + rowStarted <- version + select { + case <-releaseRows[version]: + case <-ctx.Done(): + activeRows.Add(-1) + return ctx.Err() + } + activeRows.Add(-1) + mu.Lock() + rowsDone[version] = true + mu.Unlock() + return nil + case strings.Contains(stmt, "state_versions"): + version := values[1].(int64) + mu.Lock() + if !rowsDone[version] { + markerBeforeRowsDone = true + } + markers = append(markers, version) + mu.Unlock() + markerWritten <- version + return nil + default: + return nil + } + }, + } + records := []Record{ + { + Topic: "t", + Partition: 0, + Offset: 10, + Entry: &proto.ChangelogEntry{ + Version: 1, + Changesets: []*proto.NamedChangeSet{{ + Name: "bank", + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{{Key: []byte("k1"), Value: []byte("v1")}}}, + }}, + }, + }, + { + Topic: "t", + Partition: 0, + Offset: 11, + Entry: &proto.ChangelogEntry{ + Version: 2, + Changesets: []*proto.NamedChangeSet{{ + Name: "bank", + Changeset: proto.ChangeSet{Pairs: []*proto.KVPair{{Key: []byte("k2"), Value: []byte("v2")}}}, + }}, + }, + }, + } + + errCh := make(chan error, 1) + go func() { + errCh <- sink.WriteBatch(context.Background(), records) + }() + + started := map[int64]bool{} + for len(started) < 2 { + select { + case version := <-rowStarted: + started[version] = true + case <-time.After(time.Second): + t.Fatal("timed out waiting for pipelined row writes") + } + } + require.True(t, sawConcurrentRows.Load()) + + close(releaseRows[2]) + select { + case version := <-markerWritten: + t.Fatalf("marker %d written before earlier record rows completed", version) + case <-time.After(100 * time.Millisecond): + } + + close(releaseRows[1]) + for _, want := range []int64{1, 2} { + select { + case got := <-markerWritten: + require.Equal(t, want, got) + case <-time.After(time.Second): + t.Fatalf("timed out waiting for marker %d", want) + } + } + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("timed out waiting for batch write") + } + mu.Lock() + defer mu.Unlock() + require.False(t, markerBeforeRowsDone) + require.Equal(t, []int64{1, 2}, markers) +}