Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,16 @@ jobs:
- name: Install just
uses: extractions/setup-just@v3
- name: E2E Tests
run: just test-e2e
run: |
mkdir -p .artifacts/e2e-server-logs
EV_E2E_LOG_DIR="${{ github.workspace }}/.artifacts/e2e-server-logs" just test-e2e
- name: Upload E2E server logs
if: failure()
uses: actions/upload-artifact@v7.0.0
with:
name: e2e-server-logs-${{ github.sha }}
path: ./.artifacts/e2e-server-logs
if-no-files-found: warn

evm-tests:
name: Run EVM Execution Tests
Expand Down
60 changes: 38 additions & 22 deletions execution/evm/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,24 +756,33 @@ func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height ui
// If we have a started execution with a payloadID, validate it still exists before resuming.
// After node restart, the EL's payload cache is ephemeral and the payloadID may be stale.
if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 {
var pid engine.PayloadID
copy(pid[:], execMeta.PayloadID)
requestedTxHash := hashTxs(txs)
if execMeta.Timestamp != timestamp.Unix() || !bytes.Equal(execMeta.TxHash, requestedTxHash) {
c.logger.Warn().
Uint64("height", height).
Int64("execmeta_timestamp", execMeta.Timestamp).
Int64("requested_timestamp", timestamp.Unix()).
Msg("ExecuteTxs: ignoring stale in-progress execution for different block inputs")
} else {
var pid engine.PayloadID
copy(pid[:], execMeta.PayloadID)

// Validate payload still exists by attempting to retrieve it
if _, err = c.engineClient.GetPayload(ctx, pid); err == nil {
c.logger.Info().
// Validate payload still exists by attempting to retrieve it
if _, err = c.engineClient.GetPayload(ctx, pid); err == nil {
c.logger.Info().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
return nil, &pid, true, nil
}
// Payload is stale (expired or node restarted) - proceed with fresh execution
c.logger.Debug().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
return nil, &pid, true, nil
Str("payloadID", pid.String()).
Err(err).
Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute")
// Don't return - fall through to fresh execution
}
// Payload is stale (expired or node restarted) - proceed with fresh execution
c.logger.Debug().
Uint64("height", height).
Str("payloadID", pid.String()).
Err(err).
Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute")
// Don't return - fall through to fresh execution
}
}

Expand Down Expand Up @@ -1023,13 +1032,7 @@ func (c *EngineClient) saveExecMeta(ctx context.Context, height uint64, timestam
}

// Compute tx hash for sanity checks on retry
if len(txs) > 0 {
h := sha256.New()
for _, tx := range txs {
h.Write(tx)
}
execMeta.TxHash = h.Sum(nil)
}
execMeta.TxHash = hashTxs(txs)

if err := c.store.SaveExecMeta(ctx, execMeta); err != nil {
c.logger.Warn().Err(err).Uint64("height", height).Msg("saveExecMeta: failed to save exec meta")
Expand All @@ -1042,6 +1045,19 @@ func (c *EngineClient) saveExecMeta(ctx context.Context, height uint64, timestam
Msg("saveExecMeta: saved execution metadata")
}

func hashTxs(txs [][]byte) []byte {
if len(txs) == 0 {
return nil
}

h := sha256.New()
for _, tx := range txs {
h.Write(tx)
}

return h.Sum(nil)
}

// GetLatestHeight returns the current block height of the execution layer
func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) {
header, err := c.ethClient.HeaderByNumber(ctx, nil) // nil = latest block
Expand Down
130 changes: 130 additions & 0 deletions execution/evm/execution_reconcile_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package evm

import (
"context"
"errors"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/core/types"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"
)

func TestReconcileExecutionAtHeight_StartedExecMeta(t *testing.T) {
t.Parallel()

specs := map[string]struct {
execMetaTimestamp int64
execMetaTxs [][]byte
requestedTxs [][]byte
requestedTime time.Time
expectFound bool
expectPayloadID bool
expectGetPayloads int
}{
"resume_when_inputs_match": {
execMetaTimestamp: 1700000012,
execMetaTxs: [][]byte{[]byte("tx-1")},
requestedTxs: [][]byte{[]byte("tx-1")},
requestedTime: time.Unix(1700000012, 0),
expectFound: true,
expectPayloadID: true,
expectGetPayloads: 1,
},
"ignore_when_timestamp_differs": {
execMetaTimestamp: 1700000010,
execMetaTxs: [][]byte{[]byte("tx-1")},
requestedTxs: [][]byte{[]byte("tx-1")},
requestedTime: time.Unix(1700000012, 0),
expectFound: false,
expectPayloadID: false,
expectGetPayloads: 0,
},
"ignore_when_txs_differ": {
execMetaTimestamp: 1700000012,
execMetaTxs: [][]byte{[]byte("tx-old")},
requestedTxs: [][]byte{[]byte("tx-new")},
requestedTime: time.Unix(1700000012, 0),
expectFound: false,
expectPayloadID: false,
expectGetPayloads: 0,
},
}

for name, spec := range specs {
t.Run(name, func(t *testing.T) {
t.Parallel()

store := NewEVMStore(dssync.MutexWrap(ds.NewMapDatastore()))
payloadID := engine.PayloadID{1, 2, 3, 4, 5, 6, 7, 8}
require.NoError(t, store.SaveExecMeta(t.Context(), &ExecMeta{
Height: 12,
PayloadID: payloadID[:],
TxHash: hashTxs(spec.execMetaTxs),
Timestamp: spec.execMetaTimestamp,
Stage: ExecStageStarted,
}))

engineRPC := &mockReconcileEngineRPCClient{
payloads: map[engine.PayloadID]*engine.ExecutionPayloadEnvelope{
payloadID: {},
},
}
client := &EngineClient{
engineClient: engineRPC,
ethClient: mockReconcileEthRPCClient{},
store: store,
logger: zerolog.Nop(),
}

stateRoot, gotPayloadID, found, err := client.reconcileExecutionAtHeight(t.Context(), 12, spec.requestedTime, spec.requestedTxs)

require.NoError(t, err)
require.Nil(t, stateRoot)
require.Equal(t, spec.expectFound, found)
require.Equal(t, spec.expectPayloadID, gotPayloadID != nil)
if spec.expectPayloadID {
require.Equal(t, payloadID, *gotPayloadID)
}
require.Equal(t, spec.expectGetPayloads, engineRPC.getPayloadCalls)
})
}
}

type mockReconcileEngineRPCClient struct {
payloads map[engine.PayloadID]*engine.ExecutionPayloadEnvelope
getPayloadCalls int
}

func (m *mockReconcileEngineRPCClient) ForkchoiceUpdated(_ context.Context, _ engine.ForkchoiceStateV1, _ map[string]any) (*engine.ForkChoiceResponse, error) {
return nil, errors.New("unexpected ForkchoiceUpdated call")
}

func (m *mockReconcileEngineRPCClient) GetPayload(_ context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) {
m.getPayloadCalls++
payload, ok := m.payloads[payloadID]
if !ok {
return nil, errors.New("payload not found")
}

return payload, nil
}

func (m *mockReconcileEngineRPCClient) NewPayload(_ context.Context, _ *engine.ExecutableData, _ []string, _ string, _ [][]byte) (*engine.PayloadStatusV1, error) {
return nil, errors.New("unexpected NewPayload call")
}

type mockReconcileEthRPCClient struct{}

func (mockReconcileEthRPCClient) HeaderByNumber(_ context.Context, _ *big.Int) (*types.Header, error) {
return nil, errors.New("header not found")
}

func (mockReconcileEthRPCClient) GetTxs(_ context.Context) ([]string, error) {
return nil, errors.New("unexpected GetTxs call")
}
40 changes: 38 additions & 2 deletions node/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type failoverState struct {
dataSyncService *evsync.DataSyncService
rpcServer *http.Server
bc *block.Components
raftNode *raft.Node
isAggregator bool

// catchup fields — used when the aggregator needs to sync before producing
catchupEnabled bool
Expand Down Expand Up @@ -172,13 +174,34 @@ func setupFailoverState(
dataSyncService: dataSyncService,
rpcServer: rpcServer,
bc: bc,
raftNode: raftNode,
isAggregator: isAggregator,
store: rktStore,
catchupEnabled: catchupEnabled,
catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration,
daBlockTime: nodeConfig.DA.BlockTime.Duration,
}, nil
}

func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool {
if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() {
return false
}

height, err := f.store.Height(ctx)
if err != nil {
f.logger.Warn().Err(err).Msg("cannot determine local height; keeping blocking sync startup")
return false
}
if height > 0 {
return false
}

f.logger.Info().
Msg("raft leader with empty store: starting sync services in publisher mode")
return true
}

func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally
// parent context is cancelled already, so we need to create a new one
Expand Down Expand Up @@ -207,15 +230,28 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
})

// start header and data sync services concurrently to avoid cumulative startup delay.
startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx)
syncWg, syncCtx := errgroup.WithContext(ctx)
syncWg.Go(func() error {
if err := f.headerSyncService.Start(syncCtx); err != nil {
var err error
if startSyncInPublisherMode {
err = f.headerSyncService.StartForPublishing(syncCtx)
} else {
err = f.headerSyncService.Start(syncCtx)
}
if err != nil {
return fmt.Errorf("header sync service: %w", err)
}
return nil
})
syncWg.Go(func() error {
if err := f.dataSyncService.Start(syncCtx); err != nil {
var err error
if startSyncInPublisherMode {
err = f.dataSyncService.StartForPublishing(syncCtx)
} else {
err = f.dataSyncService.Start(syncCtx)
}
if err != nil {
return fmt.Errorf("data sync service: %w", err)
}
return nil
Expand Down
Loading
Loading