Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/evstack/apex/pkg/fetch"
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/profile"
"github.com/evstack/apex/pkg/s3"
"github.com/evstack/apex/pkg/store"
syncer "github.com/evstack/apex/pkg/sync"
"github.com/evstack/apex/pkg/types"
Expand Down Expand Up @@ -207,6 +208,53 @@ func setStoreMetrics(db store.Store, rec metrics.Recorder) {
}
}

func setupS3Server(cfg *config.Config, db store.Store, submitter s3.BlobSubmitter, log zerolog.Logger) (*http.Server, error) {
if !cfg.S3.Enabled {
return nil, nil
}

var ns types.Namespace
if cfg.S3.Namespace != "" {
var err error
ns, err = types.NamespaceFromHex(cfg.S3.Namespace)
if err != nil {
return nil, fmt.Errorf("parse S3 namespace: %w", err)
}
}

var objStore s3.ObjectStore
switch d := db.(type) {
case *store.SQLiteStore:
objStore = store.NewObjectStore(d, ns)
case *store.S3Store:
client := d.Client()
if client == nil {
return nil, errors.New("S3Store client is not *s3.Client")
}
objStore = store.NewS3ObjectStore(client)
default:
return nil, fmt.Errorf("unsupported store type for S3 API: %T", db)
}

s3Svc := s3.NewService(objStore, submitter, ns)
s3Srv := s3.NewServer(s3Svc, cfg.S3.Region, log)

httpSrv := &http.Server{
Addr: cfg.S3.ListenAddr,
Handler: s3Srv,
ReadHeaderTimeout: 10 * time.Second,
}

go func() {
log.Info().Str("addr", cfg.S3.ListenAddr).Msg("S3 API server listening")
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("S3 API server error")
}
}()

return httpSrv, nil
}

func persistNamespaces(ctx context.Context, db store.Store, namespaces []types.Namespace) error {
for _, ns := range namespaces {
if err := db.PutNamespace(ctx, ns); err != nil {
Expand Down Expand Up @@ -240,6 +288,7 @@ func maybeBackfillSourceOption(cfg *config.Config, logger zerolog.Logger) (synce
return syncer.WithBackfillSource(dbSrc), func() { _ = dbSrc.Close() }, nil
}

//nolint:gocyclo
func runIndexer(ctx context.Context, cfg *config.Config) error {
// Parse namespaces from config.
namespaces, err := cfg.ParsedNamespaces()
Expand Down Expand Up @@ -275,6 +324,19 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}
defer dataFetcher.Close() //nolint:errcheck

// Setup S3 API server if enabled.
var s3Srv *http.Server
if cfg.S3.Enabled {
var submitter s3.BlobSubmitter
if s, ok := dataFetcher.(s3.BlobSubmitter); ok {
submitter = s
}
s3Srv, err = setupS3Server(cfg, db, submitter, log.Logger)
if err != nil {
return fmt.Errorf("setup S3 server: %w", err)
}
}

// Set up API layer.
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)
Expand Down Expand Up @@ -350,7 +412,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {

err = coord.Run(ctx)

gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv)
gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv, s3Srv)

if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("coordinator: %w", err)
Expand All @@ -360,7 +422,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
return nil
}

func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server, s3Srv *http.Server) {
stopped := make(chan struct{})
go func() {
grpcSrv.GracefulStop()
Expand All @@ -381,6 +443,12 @@ func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *me
log.Error().Err(err).Msg("JSON-RPC server shutdown error")
}

if s3Srv != nil {
if err := s3Srv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("S3 API server shutdown error")
}
}

if metricsSrv != nil {
if err := metricsSrv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("metrics server shutdown error")
Expand Down
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Config struct {
Metrics MetricsConfig `yaml:"metrics"`
Profiling ProfilingConfig `yaml:"profiling"`
Log LogConfig `yaml:"log"`
S3 S3APIConfig `yaml:"s3"`
}

// DataSourceConfig configures the Celestia data source.
Expand Down Expand Up @@ -92,6 +93,14 @@ type LogConfig struct {
Format string `yaml:"format"`
}

// S3APIConfig configures the S3-compatible API server.
type S3APIConfig struct {
Enabled bool `yaml:"enabled"`
ListenAddr string `yaml:"listen_addr"`
Region string `yaml:"region"`
Namespace string `yaml:"namespace"` // Celestia namespace for S3 objects
}

// DefaultConfig returns a Config with sensible defaults.
func DefaultConfig() Config {
return Config{
Expand Down Expand Up @@ -132,6 +141,11 @@ func DefaultConfig() Config {
Level: "info",
Format: "json",
},
S3: S3APIConfig{
Enabled: false,
ListenAddr: ":8333",
Region: "us-east-1",
},
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/cockroachdb/pebble v1.1.5
github.com/filecoin-project/go-jsonrpc v0.10.1
github.com/google/orderedcode v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/prometheus/client_golang v1.23.2
github.com/rs/zerolog v1.34.0
github.com/spf13/cobra v1.10.2
Expand Down Expand Up @@ -50,7 +51,6 @@ require (
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/ipfs/go-log/v2 v2.0.8 // indirect
github.com/klauspost/compress v1.18.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestHealthEndpoint(t *testing.T) {
mux := http.NewServeMux()
h.Register(mux)

req := httptest.NewRequest(http.MethodGet, "/health", nil)
req := httptest.NewRequest(http.MethodGet, "/health", nil) //nolint:noctx
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)

Expand Down Expand Up @@ -90,7 +90,7 @@ func TestReadyEndpoint(t *testing.T) {
mux := http.NewServeMux()
h.Register(mux)

req := httptest.NewRequest(http.MethodGet, "/health/ready", nil)
req := httptest.NewRequest(http.MethodGet, "/health/ready", nil) //nolint:noctx
rec := httptest.NewRecorder()
mux.ServeHTTP(rec, req)

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/jsonrpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func doRPC(t *testing.T, srv http.Handler, method string, params ...any) jsonRPC
t.Fatalf("marshal request: %v", err)
}

httpReq := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body))
httpReq := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) //nolint:noctx
httpReq.Header.Set("Content-Type", "application/json")
w := httptest.NewRecorder()
srv.ServeHTTP(w, httpReq)
Expand Down
35 changes: 35 additions & 0 deletions pkg/fetch/celestia_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type blobAPI struct {
GetAll func(ctx context.Context, height uint64, namespaces [][]byte) (json.RawMessage, error)
GetProof func(ctx context.Context, height uint64, namespace []byte, commitment []byte) (json.RawMessage, error)
Included func(ctx context.Context, height uint64, namespace []byte, proof json.RawMessage, commitment []byte) (bool, error)
Submit func(ctx context.Context, namespace []byte, data []byte, shareVersion int) (json.RawMessage, error)
}

// CelestiaNodeFetcher implements DataFetcher using a Celestia node's JSON-RPC API.
Expand Down Expand Up @@ -340,6 +341,17 @@ func (f *CelestiaNodeFetcher) Included(ctx context.Context, height uint64, names
return ok, nil
}

// SubmitBlob submits a blob to Celestia and returns the resulting blob with height and commitment.
func (f *CelestiaNodeFetcher) SubmitBlob(ctx context.Context, namespace types.Namespace, data []byte) (*types.Blob, error) {
raw, err := f.callRawWithRetry(ctx, "blob.Submit", func(callCtx context.Context) (json.RawMessage, error) {
return f.blob.Submit(callCtx, namespace[:], data, 0)
})
if err != nil {
return nil, fmt.Errorf("blob.Submit: %w", err)
}
return mapSubmitResult(raw, namespace, data)
}

func (f *CelestiaNodeFetcher) Close() error {
f.mu.Lock()
defer f.mu.Unlock()
Expand Down Expand Up @@ -475,6 +487,29 @@ func mapBlobs(raw json.RawMessage, height uint64) ([]types.Blob, error) {
return blobs, nil
}

// rpcSubmitResult is the JSON response from blob.Submit.
type rpcSubmitResult struct {
Height uint64 `json:"height"`
Commitment []byte `json:"commitment"`
ShareVersion uint32 `json:"share_version"`
Index int `json:"index"`
}

func mapSubmitResult(raw json.RawMessage, namespace types.Namespace, data []byte) (*types.Blob, error) {
var result rpcSubmitResult
if err := json.Unmarshal(raw, &result); err != nil {
return nil, fmt.Errorf("unmarshal submit result: %w", err)
}
return &types.Blob{
Height: result.Height,
Namespace: namespace,
Data: data,
Commitment: result.Commitment,
ShareVersion: result.ShareVersion,
Index: result.Index,
}, nil
}

func namespacesToBytes(nss []types.Namespace) [][]byte {
out := make([][]byte, len(nss))
for i := range nss {
Expand Down
Loading