Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions app/seidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ const (
FlagEVMSSSplit = "state-store.evm-ss-split"
FlagEVMSSSeparateDBs = "state-store.evm-ss-separate-dbs"

// Historical SS offload fallback.
FlagHistoricalOffloadScyllaHosts = "state-store.historical-offload-scylla-hosts"
FlagHistoricalOffloadScyllaKeyspace = "state-store.historical-offload-scylla-keyspace"
FlagHistoricalOffloadScyllaUsername = "state-store.historical-offload-scylla-username"
FlagHistoricalOffloadScyllaPassword = "state-store.historical-offload-scylla-password"
FlagHistoricalOffloadScyllaDatacenter = "state-store.historical-offload-scylla-datacenter"
FlagHistoricalOffloadScyllaConsistency = "state-store.historical-offload-scylla-consistency"
FlagHistoricalOffloadScyllaTimeoutMS = "state-store.historical-offload-scylla-timeout-ms"

// Other configs
FlagSnapshotInterval = "state-sync.snapshot-interval"
)
Expand Down Expand Up @@ -148,6 +157,13 @@ func parseSSConfigs(appOpts servertypes.AppOptions) config.StateStoreConfig {
ssConfig.EVMDBDirectory = cast.ToString(appOpts.Get(FlagEVMSSDirectory))
ssConfig.SeparateEVMSubDBs = cast.ToBool(appOpts.Get(FlagEVMSSSeparateDBs))
ssConfig.EVMSplit = cast.ToBool(appOpts.Get(FlagEVMSSSplit))
ssConfig.HistoricalOffloadScyllaHosts = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaHosts))
ssConfig.HistoricalOffloadScyllaKeyspace = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaKeyspace))
ssConfig.HistoricalOffloadScyllaUsername = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaUsername))
ssConfig.HistoricalOffloadScyllaPassword = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaPassword))
ssConfig.HistoricalOffloadScyllaDatacenter = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaDatacenter))
ssConfig.HistoricalOffloadScyllaConsistency = cast.ToString(appOpts.Get(FlagHistoricalOffloadScyllaConsistency))
ssConfig.HistoricalOffloadScyllaTimeoutMS = cast.ToInt(appOpts.Get(FlagHistoricalOffloadScyllaTimeoutMS))
return ssConfig
}

Expand Down
38 changes: 38 additions & 0 deletions app/seidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ func (t TestSeiDBAppOpts) Get(s string) interface{} {
return defaultSSConfig.EVMSplit
case FlagEVMSSSeparateDBs:
return defaultSSConfig.SeparateEVMSubDBs
case FlagHistoricalOffloadScyllaHosts:
return defaultSSConfig.HistoricalOffloadScyllaHosts
case FlagHistoricalOffloadScyllaKeyspace:
return defaultSSConfig.HistoricalOffloadScyllaKeyspace
case FlagHistoricalOffloadScyllaUsername:
return defaultSSConfig.HistoricalOffloadScyllaUsername
case FlagHistoricalOffloadScyllaPassword:
return defaultSSConfig.HistoricalOffloadScyllaPassword
case FlagHistoricalOffloadScyllaDatacenter:
return defaultSSConfig.HistoricalOffloadScyllaDatacenter
case FlagHistoricalOffloadScyllaConsistency:
return defaultSSConfig.HistoricalOffloadScyllaConsistency
case FlagHistoricalOffloadScyllaTimeoutMS:
return defaultSSConfig.HistoricalOffloadScyllaTimeoutMS
}
return nil
}
Expand Down Expand Up @@ -114,6 +128,30 @@ func TestParseSSConfigs_EVMFlags(t *testing.T) {
assert.True(t, ssConfig.SeparateEVMSubDBs)
}

func TestParseSSConfigs_HistoricalScyllaFlags(t *testing.T) {
appOpts := mapAppOpts{
FlagSSEnable: true,
FlagHistoricalOffloadScyllaHosts: "10.0.0.1:9042,10.0.0.2:9042",
FlagHistoricalOffloadScyllaKeyspace: "sei_history",
FlagHistoricalOffloadScyllaUsername: "sei",
FlagHistoricalOffloadScyllaPassword: "secret",
FlagHistoricalOffloadScyllaDatacenter: "use1",
FlagHistoricalOffloadScyllaConsistency: "local_quorum",
FlagHistoricalOffloadScyllaTimeoutMS: 1500,
FlagSSAsyncWriterBuffer: 0,
}

ssConfig := parseSSConfigs(appOpts)
assert.True(t, ssConfig.Enable)
assert.Equal(t, "10.0.0.1:9042,10.0.0.2:9042", ssConfig.HistoricalOffloadScyllaHosts)
assert.Equal(t, "sei_history", ssConfig.HistoricalOffloadScyllaKeyspace)
assert.Equal(t, "sei", ssConfig.HistoricalOffloadScyllaUsername)
assert.Equal(t, "secret", ssConfig.HistoricalOffloadScyllaPassword)
assert.Equal(t, "use1", ssConfig.HistoricalOffloadScyllaDatacenter)
assert.Equal(t, "local_quorum", ssConfig.HistoricalOffloadScyllaConsistency)
assert.Equal(t, 1500, ssConfig.HistoricalOffloadScyllaTimeoutMS)
}

func TestParseReceiptConfigs_DefaultsToPebbleWhenUnset(t *testing.T) {
receiptConfig, err := config.ReadReceiptConfig(mapAppOpts{})
assert.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.1
github.com/golang/mock v1.7.0-rc.1
github.com/golang/protobuf v1.5.4
github.com/gocql/gocql v1.7.0
github.com/google/btree v1.1.3
github.com/google/go-cmp v0.7.0
github.com/google/gofuzz v1.2.0
Expand Down Expand Up @@ -119,6 +120,7 @@ require (
github.com/emirpasic/gods v1.18.1 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-git/go-billy/v5 v5.8.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
Expand All @@ -128,6 +130,7 @@ require (
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/skeema/knownhosts v1.3.1 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -736,13 +736,15 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bgentry/speakeasy v0.2.0 h1:tgObeVOf8WAvtuAX6DhJ4xks4CFNwPDZiqzGqIHE51E=
github.com/bgentry/speakeasy v0.2.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bits-and-blooms/bitset v1.7.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bits-and-blooms/bitset v1.14.2/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.17.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.24.3 h1:Bte86SlO3lwPQqww+7BE9ZuUCKIjfqnG5jtEyqA9y9Y=
github.com/bits-and-blooms/bitset v1.24.3/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/btcsuite/btcd v0.23.2 h1:/YOgUp25sdCnP5ho6Hl3s0E438zlX+Kak7E6TgBgoT0=
Expand Down Expand Up @@ -1122,6 +1124,8 @@ github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4=
github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus=
github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -1328,6 +1332,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/guptarohit/asciigraph v0.5.5/go.mod h1:dYl5wwK4gNsnFf9Zp+l06rFiDZ5YtXM6x7SRWZ3KGag=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
Expand Down Expand Up @@ -2911,6 +2917,8 @@ gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qS
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
Expand Down
21 changes: 21 additions & 0 deletions sei-db/config/ss_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ type StateStoreConfig struct {
// When true, data is routed to separate DBs by EVM key family while
// preserving the same logical store key and full key encoding inside each DB.
SeparateEVMSubDBs bool `mapstructure:"evm-separate-dbs"`

// HistoricalOffloadScyllaHosts enables ScyllaDB/Cassandra fallback reads
// for versions pruned from local SS when non-empty. Hosts are comma-separated
// host[:port] values.
HistoricalOffloadScyllaHosts string `mapstructure:"historical-offload-scylla-hosts"`

// HistoricalOffloadScyllaKeyspace is the keyspace containing state_mutations.
HistoricalOffloadScyllaKeyspace string `mapstructure:"historical-offload-scylla-keyspace"`

// HistoricalOffloadScyllaUsername and Password are optional.
HistoricalOffloadScyllaUsername string `mapstructure:"historical-offload-scylla-username"`
HistoricalOffloadScyllaPassword string `mapstructure:"historical-offload-scylla-password"`

// HistoricalOffloadScyllaDatacenter enables DC-aware routing when set.
HistoricalOffloadScyllaDatacenter string `mapstructure:"historical-offload-scylla-datacenter"`

// HistoricalOffloadScyllaConsistency defaults to local_quorum when empty.
HistoricalOffloadScyllaConsistency string `mapstructure:"historical-offload-scylla-consistency"`

// HistoricalOffloadScyllaTimeoutMS defaults in the Scylla reader when zero.
HistoricalOffloadScyllaTimeoutMS int `mapstructure:"historical-offload-scylla-timeout-ms"`
}

// DefaultStateStoreConfig returns the default StateStoreConfig
Expand Down
11 changes: 11 additions & 0 deletions sei-db/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,17 @@ evm-ss-split = {{ .StateStore.EVMSplit }}
# When false, all EVM data stays in one DB using the current unified layout.
# When true, data is routed to separate DBs while preserving the same evm key prefix format.
evm-ss-separate-dbs = {{ .StateStore.SeparateEVMSubDBs }}

# Optional ScyllaDB/Cassandra historical-state fallback. When hosts are set,
# point reads for versions pruned from local SS fall back to state_mutations in
# the configured keyspace. Iterators still use local SS.
historical-offload-scylla-hosts = "{{ .StateStore.HistoricalOffloadScyllaHosts }}"
historical-offload-scylla-keyspace = "{{ .StateStore.HistoricalOffloadScyllaKeyspace }}"
historical-offload-scylla-username = "{{ .StateStore.HistoricalOffloadScyllaUsername }}"
historical-offload-scylla-password = "{{ .StateStore.HistoricalOffloadScyllaPassword }}"
historical-offload-scylla-datacenter = "{{ .StateStore.HistoricalOffloadScyllaDatacenter }}"
historical-offload-scylla-consistency = "{{ .StateStore.HistoricalOffloadScyllaConsistency }}"
historical-offload-scylla-timeout-ms = {{ .StateStore.HistoricalOffloadScyllaTimeoutMS }}
`

// ReceiptStoreConfigTemplate defines the configuration template for receipt-store
Expand Down
4 changes: 4 additions & 0 deletions sei-db/config/toml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func TestStateStoreConfigTemplate(t *testing.T) {
require.Contains(t, output, `evm-ss-db-directory = ""`, "Missing evm-ss-db-directory")
require.Contains(t, output, `evm-ss-split = false`, "Missing or incorrect evm-ss-split")
require.Contains(t, output, "evm-ss-separate-dbs = false", "Missing or incorrect evm-ss-separate-dbs")
require.Contains(t, output, `historical-offload-scylla-hosts = ""`, "Missing historical Scylla hosts")
require.Contains(t, output, `historical-offload-scylla-keyspace = ""`, "Missing historical Scylla keyspace")
require.Contains(t, output, `historical-offload-scylla-consistency = ""`, "Missing historical Scylla consistency")
require.Contains(t, output, "historical-offload-scylla-timeout-ms = 0", "Missing historical Scylla timeout")
}

// TestReceiptStoreConfigTemplate verifies that all field paths in the receipt-store TOML template
Expand Down
74 changes: 74 additions & 0 deletions sei-db/state_db/ss/offload/consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Historical Scylla/Cassandra Offload

This is a prototype historical-state backend for ScyllaDB or Cassandra.

The intended shape is narrow:

- local SS remains the hot store for recent state, writes, imports, pruning, and iterators
- Scylla/Cassandra stores immutable MVCC mutation rows for older history
- reads below local SS retention can fall back to Scylla/Cassandra for `Get` and `Has`

The table layout is built for point reads by `(store_name, state_key, target_version)`:

```sql
SELECT version, value, deleted
FROM state_mutations
WHERE store_name = ? AND state_key = ? AND version <= ?
ORDER BY version DESC
LIMIT 1;
```

Ordered prefix iteration is intentionally not served from Scylla/Cassandra in this prototype.

## Schema

Apply the schema once:

```bash
cqlsh 127.0.0.1 9042 -f sei-db/state_db/ss/offload/consumer/schema/scylla.cql
```

For production, edit the keyspace replication in `schema/scylla.cql` to use
`NetworkTopologyStrategy` with the actual datacenter names and replication
factors before applying it.

## Consumer

The consumer reads historical offload changelog messages from Kafka and writes
them into Scylla/Cassandra. Kafka offsets are committed only after the sink
write succeeds. Within each block, mutation rows are written with bounded
concurrency and the version marker is written last.

```bash
go run ./sei-db/state_db/ss/offload/consumer/cmd/historical-scylla-consumer \
./sei-db/state_db/ss/offload/consumer/config/example-scylla.json
```

The example config is local-dev only. Set real Kafka brokers, Scylla hosts,
keyspace, datacenter, and credentials in your own config.

## Node Read Fallback

Enable fallback reads in the node config:

```toml
[state-store]
historical-offload-scylla-hosts = "10.0.0.1:9042,10.0.0.2:9042"
historical-offload-scylla-keyspace = "sei_history"
historical-offload-scylla-username = ""
historical-offload-scylla-password = ""
historical-offload-scylla-datacenter = "datacenter1"
historical-offload-scylla-consistency = "local_quorum"
historical-offload-scylla-timeout-ms = 2000
```

Fallback activates only for point reads where the requested version is below the
local SS earliest version. Missing rows and tombstones return empty state, same
as local SS.

## Current Limits

- No Scylla/Cassandra iterator path.
- No cross-row transaction on ingest; mutation rows are written first and the
version marker is written last, so replay is idempotent after partial failure.
- No automatic schema creation from the binary.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/sei-protocol/sei-chain/sei-db/state_db/ss/offload/consumer"
)

func main() {
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "usage: %s <config.json>\n", os.Args[0])
os.Exit(2)
}

cfg, err := consumer.LoadConfig(os.Args[1])
if err != nil {
log.Fatalf("load config: %v", err)
}

sink, err := consumer.NewScyllaSink(cfg.Scylla)
if err != nil {
log.Fatalf("open scylla/cassandra sink: %v", err)
}
defer func() { _ = sink.Close() }()

reader, err := consumer.NewKafkaReader(cfg.Kafka)
if err != nil {
log.Fatalf("open kafka reader: %v", err)
}
defer func() { _ = reader.Close() }()

ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

c := consumer.New(reader, sink, consumer.Options{
Logf: func(format string, args ...interface{}) { log.Printf(format, args...) },
Workers: cfg.Workers,
ShardBufferSize: cfg.ShardBufferSize,
MaxBatchRecords: cfg.MaxBatchRecords,
BatchMaxWait: time.Duration(cfg.BatchMaxWaitMS) * time.Millisecond,
})
if err := c.Run(ctx); err != nil {
log.Fatalf("consumer: %v", err)
}
}
54 changes: 54 additions & 0 deletions sei-db/state_db/ss/offload/consumer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package consumer

import (
"encoding/json"
"fmt"
"os"
)

type Config struct {
Kafka KafkaReaderConfig
Scylla ScyllaConfig
Workers int
ShardBufferSize int
MaxBatchRecords int
BatchMaxWaitMS int
}

func (c *Config) Validate() error {
if err := c.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka: %w", err)
}
if err := c.Scylla.Validate(); err != nil {
return fmt.Errorf("scylla: %w", err)
}
if c.Workers < 0 {
return fmt.Errorf("workers must be non-negative")
}
if c.ShardBufferSize < 0 {
return fmt.Errorf("shard buffer size must be non-negative")
}
if c.MaxBatchRecords < 0 {
return fmt.Errorf("max batch records must be non-negative")
}
if c.BatchMaxWaitMS < 0 {
return fmt.Errorf("batch max wait ms must be non-negative")
}
return nil
}

func LoadConfig(path string) (*Config, error) {
// #nosec G304 -- config path is supplied by the operator on the command line.
raw, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("read config: %w", err)
}
cfg := &Config{}
if err := json.Unmarshal(raw, cfg); err != nil {
return nil, fmt.Errorf("parse config: %w", err)
}
if err := cfg.Validate(); err != nil {
return nil, err
}
return cfg, nil
}
Loading
Loading