diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index aea8dee0e2..919db59c1f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -103,7 +103,16 @@ jobs: - name: Install just uses: extractions/setup-just@v3 - name: E2E Tests - run: just test-e2e + run: | + mkdir -p .artifacts/e2e-server-logs + EV_E2E_LOG_DIR="${{ github.workspace }}/.artifacts/e2e-server-logs" just test-e2e + - name: Upload E2E server logs + if: failure() + uses: actions/upload-artifact@v7.0.0 + with: + name: e2e-server-logs-${{ github.sha }} + path: ./.artifacts/e2e-server-logs + if-no-files-found: warn evm-tests: name: Run EVM Execution Tests diff --git a/execution/evm/execution.go b/execution/evm/execution.go index 15cddf6417..fe5196d5c9 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -756,24 +756,33 @@ func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height ui // If we have a started execution with a payloadID, validate it still exists before resuming. // After node restart, the EL's payload cache is ephemeral and the payloadID may be stale. if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 { - var pid engine.PayloadID - copy(pid[:], execMeta.PayloadID) + requestedTxHash := hashTxs(txs) + if execMeta.Timestamp != timestamp.Unix() || !bytes.Equal(execMeta.TxHash, requestedTxHash) { + c.logger.Warn(). + Uint64("height", height). + Int64("execmeta_timestamp", execMeta.Timestamp). + Int64("requested_timestamp", timestamp.Unix()). + Msg("ExecuteTxs: ignoring stale in-progress execution for different block inputs") + } else { + var pid engine.PayloadID + copy(pid[:], execMeta.PayloadID) - // Validate payload still exists by attempting to retrieve it - if _, err = c.engineClient.GetPayload(ctx, pid); err == nil { - c.logger.Info(). + // Validate payload still exists by attempting to retrieve it + if _, err = c.engineClient.GetPayload(ctx, pid); err == nil { + c.logger.Info(). + Uint64("height", height). + Str("stage", execMeta.Stage). + Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume") + return nil, &pid, true, nil + } + // Payload is stale (expired or node restarted) - proceed with fresh execution + c.logger.Debug(). Uint64("height", height). - Str("stage", execMeta.Stage). - Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume") - return nil, &pid, true, nil + Str("payloadID", pid.String()). + Err(err). + Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute") + // Don't return - fall through to fresh execution } - // Payload is stale (expired or node restarted) - proceed with fresh execution - c.logger.Debug(). - Uint64("height", height). - Str("payloadID", pid.String()). - Err(err). - Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute") - // Don't return - fall through to fresh execution } } @@ -1023,13 +1032,7 @@ func (c *EngineClient) saveExecMeta(ctx context.Context, height uint64, timestam } // Compute tx hash for sanity checks on retry - if len(txs) > 0 { - h := sha256.New() - for _, tx := range txs { - h.Write(tx) - } - execMeta.TxHash = h.Sum(nil) - } + execMeta.TxHash = hashTxs(txs) if err := c.store.SaveExecMeta(ctx, execMeta); err != nil { c.logger.Warn().Err(err).Uint64("height", height).Msg("saveExecMeta: failed to save exec meta") @@ -1042,6 +1045,19 @@ func (c *EngineClient) saveExecMeta(ctx context.Context, height uint64, timestam Msg("saveExecMeta: saved execution metadata") } +func hashTxs(txs [][]byte) []byte { + if len(txs) == 0 { + return nil + } + + h := sha256.New() + for _, tx := range txs { + h.Write(tx) + } + + return h.Sum(nil) +} + // GetLatestHeight returns the current block height of the execution layer func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) { header, err := c.ethClient.HeaderByNumber(ctx, nil) // nil = latest block diff --git a/execution/evm/execution_reconcile_test.go b/execution/evm/execution_reconcile_test.go new file mode 100644 index 0000000000..5b25e6b4b8 --- /dev/null +++ b/execution/evm/execution_reconcile_test.go @@ -0,0 +1,130 @@ +package evm + +import ( + "context" + "errors" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/core/types" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +func TestReconcileExecutionAtHeight_StartedExecMeta(t *testing.T) { + t.Parallel() + + specs := map[string]struct { + execMetaTimestamp int64 + execMetaTxs [][]byte + requestedTxs [][]byte + requestedTime time.Time + expectFound bool + expectPayloadID bool + expectGetPayloads int + }{ + "resume_when_inputs_match": { + execMetaTimestamp: 1700000012, + execMetaTxs: [][]byte{[]byte("tx-1")}, + requestedTxs: [][]byte{[]byte("tx-1")}, + requestedTime: time.Unix(1700000012, 0), + expectFound: true, + expectPayloadID: true, + expectGetPayloads: 1, + }, + "ignore_when_timestamp_differs": { + execMetaTimestamp: 1700000010, + execMetaTxs: [][]byte{[]byte("tx-1")}, + requestedTxs: [][]byte{[]byte("tx-1")}, + requestedTime: time.Unix(1700000012, 0), + expectFound: false, + expectPayloadID: false, + expectGetPayloads: 0, + }, + "ignore_when_txs_differ": { + execMetaTimestamp: 1700000012, + execMetaTxs: [][]byte{[]byte("tx-old")}, + requestedTxs: [][]byte{[]byte("tx-new")}, + requestedTime: time.Unix(1700000012, 0), + expectFound: false, + expectPayloadID: false, + expectGetPayloads: 0, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + t.Parallel() + + store := NewEVMStore(dssync.MutexWrap(ds.NewMapDatastore())) + payloadID := engine.PayloadID{1, 2, 3, 4, 5, 6, 7, 8} + require.NoError(t, store.SaveExecMeta(t.Context(), &ExecMeta{ + Height: 12, + PayloadID: payloadID[:], + TxHash: hashTxs(spec.execMetaTxs), + Timestamp: spec.execMetaTimestamp, + Stage: ExecStageStarted, + })) + + engineRPC := &mockReconcileEngineRPCClient{ + payloads: map[engine.PayloadID]*engine.ExecutionPayloadEnvelope{ + payloadID: {}, + }, + } + client := &EngineClient{ + engineClient: engineRPC, + ethClient: mockReconcileEthRPCClient{}, + store: store, + logger: zerolog.Nop(), + } + + stateRoot, gotPayloadID, found, err := client.reconcileExecutionAtHeight(t.Context(), 12, spec.requestedTime, spec.requestedTxs) + + require.NoError(t, err) + require.Nil(t, stateRoot) + require.Equal(t, spec.expectFound, found) + require.Equal(t, spec.expectPayloadID, gotPayloadID != nil) + if spec.expectPayloadID { + require.Equal(t, payloadID, *gotPayloadID) + } + require.Equal(t, spec.expectGetPayloads, engineRPC.getPayloadCalls) + }) + } +} + +type mockReconcileEngineRPCClient struct { + payloads map[engine.PayloadID]*engine.ExecutionPayloadEnvelope + getPayloadCalls int +} + +func (m *mockReconcileEngineRPCClient) ForkchoiceUpdated(_ context.Context, _ engine.ForkchoiceStateV1, _ map[string]any) (*engine.ForkChoiceResponse, error) { + return nil, errors.New("unexpected ForkchoiceUpdated call") +} + +func (m *mockReconcileEngineRPCClient) GetPayload(_ context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) { + m.getPayloadCalls++ + payload, ok := m.payloads[payloadID] + if !ok { + return nil, errors.New("payload not found") + } + + return payload, nil +} + +func (m *mockReconcileEngineRPCClient) NewPayload(_ context.Context, _ *engine.ExecutableData, _ []string, _ string, _ [][]byte) (*engine.PayloadStatusV1, error) { + return nil, errors.New("unexpected NewPayload call") +} + +type mockReconcileEthRPCClient struct{} + +func (mockReconcileEthRPCClient) HeaderByNumber(_ context.Context, _ *big.Int) (*types.Header, error) { + return nil, errors.New("header not found") +} + +func (mockReconcileEthRPCClient) GetTxs(_ context.Context) ([]string, error) { + return nil, errors.New("unexpected GetTxs call") +} diff --git a/node/failover.go b/node/failover.go index c60e27a5f4..613c9d37fb 100644 --- a/node/failover.go +++ b/node/failover.go @@ -33,6 +33,8 @@ type failoverState struct { dataSyncService *evsync.DataSyncService rpcServer *http.Server bc *block.Components + raftNode *raft.Node + isAggregator bool // catchup fields — used when the aggregator needs to sync before producing catchupEnabled bool @@ -172,6 +174,8 @@ func setupFailoverState( dataSyncService: dataSyncService, rpcServer: rpcServer, bc: bc, + raftNode: raftNode, + isAggregator: isAggregator, store: rktStore, catchupEnabled: catchupEnabled, catchupTimeout: nodeConfig.Node.CatchupTimeout.Duration, @@ -179,6 +183,25 @@ func setupFailoverState( }, nil } +func (f *failoverState) shouldStartSyncInPublisherMode(ctx context.Context) bool { + if !f.isAggregator || f.raftNode == nil || !f.raftNode.IsLeader() { + return false + } + + height, err := f.store.Height(ctx) + if err != nil { + f.logger.Warn().Err(err).Msg("cannot determine local height; keeping blocking sync startup") + return false + } + if height > 0 { + return false + } + + f.logger.Info(). + Msg("raft leader with empty store: starting sync services in publisher mode") + return true +} + func (f *failoverState) Run(pCtx context.Context) (multiErr error) { stopService := func(stoppable func(context.Context) error, name string) { //nolint:contextcheck // shutdown uses context.Background intentionally // parent context is cancelled already, so we need to create a new one @@ -207,15 +230,28 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { }) // start header and data sync services concurrently to avoid cumulative startup delay. + startSyncInPublisherMode := f.shouldStartSyncInPublisherMode(ctx) syncWg, syncCtx := errgroup.WithContext(ctx) syncWg.Go(func() error { - if err := f.headerSyncService.Start(syncCtx); err != nil { + var err error + if startSyncInPublisherMode { + err = f.headerSyncService.StartForPublishing(syncCtx) + } else { + err = f.headerSyncService.Start(syncCtx) + } + if err != nil { return fmt.Errorf("header sync service: %w", err) } return nil }) syncWg.Go(func() error { - if err := f.dataSyncService.Start(syncCtx); err != nil { + var err error + if startSyncInPublisherMode { + err = f.dataSyncService.StartForPublishing(syncCtx) + } else { + err = f.dataSyncService.Start(syncCtx) + } + if err != nil { return fmt.Errorf("data sync service: %w", err) } return nil diff --git a/pkg/store/cached_store.go b/pkg/store/cached_store.go index 8a5cca5b38..f72ac342af 100644 --- a/pkg/store/cached_store.go +++ b/pkg/store/cached_store.go @@ -174,3 +174,8 @@ func (cs *CachedStore) Close() error { cs.ClearCache() return cs.Store.Close() } + +// Sync flushes the underlying store to durable storage. +func (cs *CachedStore) Sync(ctx context.Context) error { + return cs.Store.Sync(ctx) +} diff --git a/pkg/store/store.go b/pkg/store/store.go index 975db4e163..ef75f83ad3 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "errors" "fmt" + "time" ds "github.com/ipfs/go-datastore" "google.golang.org/protobuf/proto" @@ -30,7 +31,21 @@ func New(ds ds.Batching) Store { // Close safely closes underlying data storage, to ensure that data is actually saved. func (s *DefaultStore) Close() error { - return s.db.Close() + done := make(chan error, 1) + go func() { + syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _ = s.Sync(syncCtx) + done <- s.db.Close() + }() + + select { + case err := <-done: + return err + case <-time.After(4 * time.Second): + return nil + } } // Height returns height of the highest block saved in the Store. diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go index 68903e1c7c..529297ff1d 100644 --- a/pkg/store/store_test.go +++ b/pkg/store/store_test.go @@ -35,6 +35,22 @@ type mockBatch struct { commitError error } +type syncingBatchingDatastore struct { + ds.Batching + syncCalled bool + closeCalled bool +} + +func (m *syncingBatchingDatastore) Sync(ctx context.Context, key ds.Key) error { + m.syncCalled = true + return m.Batching.Sync(ctx, key) +} + +func (m *syncingBatchingDatastore) Close() error { + m.closeCalled = true + return m.Batching.Close() +} + func (m *mockBatchingDatastore) Put(ctx context.Context, key ds.Key, value []byte) error { if m.putError != nil { return m.putError @@ -141,6 +157,20 @@ func TestStoreHeight(t *testing.T) { } } +func TestStoreCloseSyncsBeforeClose(t *testing.T) { + t.Parallel() + + kv, err := NewTestInMemoryKVStore() + require.NoError(t, err) + + mock := &syncingBatchingDatastore{Batching: kv} + s := New(mock) + + require.NoError(t, s.Close()) + require.True(t, mock.syncCalled) + require.True(t, mock.closeCalled) +} + func TestStoreLoad(t *testing.T) { t.Parallel() chainID := "TestStoreLoad" diff --git a/pkg/store/tracing.go b/pkg/store/tracing.go index 259c6cb600..292f830cd6 100644 --- a/pkg/store/tracing.go +++ b/pkg/store/tracing.go @@ -265,6 +265,19 @@ func (t *tracedStore) Close() error { return t.inner.Close() } +func (t *tracedStore) Sync(ctx context.Context) error { + ctx, span := t.tracer.Start(ctx, "Store.Sync") + defer span.End() + + if err := t.inner.Sync(ctx); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } + + return nil +} + func (t *tracedStore) NewBatch(ctx context.Context) (Batch, error) { ctx, span := t.tracer.Start(ctx, "Store.NewBatch") defer span.End() diff --git a/pkg/store/tracing_test.go b/pkg/store/tracing_test.go index 3ae8d8902e..132f4aa802 100644 --- a/pkg/store/tracing_test.go +++ b/pkg/store/tracing_test.go @@ -32,6 +32,7 @@ type tracingMockStore struct { rollbackFn func(ctx context.Context, height uint64, aggregator bool) error pruneBlocksFn func(ctx context.Context, height uint64) error deleteStateAtHeightFn func(ctx context.Context, height uint64) error + syncFn func(ctx context.Context) error newBatchFn func(ctx context.Context) (Batch, error) } @@ -137,6 +138,13 @@ func (m *tracingMockStore) Close() error { return nil } +func (m *tracingMockStore) Sync(ctx context.Context) error { + if m.syncFn != nil { + return m.syncFn(ctx) + } + return nil +} + func (m *tracingMockStore) NewBatch(ctx context.Context) (Batch, error) { if m.newBatchFn != nil { return m.newBatchFn(ctx) diff --git a/pkg/store/types.go b/pkg/store/types.go index b1b1f2bd5e..b87287a076 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -46,6 +46,7 @@ type Store interface { Metadata Rollback Pruner + Syncer // Close safely closes underlying data storage, to ensure that data is actually saved. Close() error @@ -104,3 +105,8 @@ type Pruner interface { // It does not affect the current state or any states at other heights, allowing for targeted pruning of historical state snapshots. DeleteStateAtHeight(ctx context.Context, height uint64) error } + +// Syncer flushes buffered store state to durable storage. +type Syncer interface { + Sync(ctx context.Context) error +} diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index b65b855a4b..947a74c70b 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -143,12 +143,9 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } } - firstStart := false - if !syncService.syncerStatus.started.Load() { - firstStart = true - if err := syncService.startSyncer(ctx); err != nil { - return fmt.Errorf("failed to start syncer after initializing the store: %w", err) - } + firstStart, err := syncService.startSyncer(ctx) + if err != nil { + return fmt.Errorf("failed to start syncer after initializing the store: %w", err) } // Broadcast for subscribers @@ -190,20 +187,9 @@ func (s *SyncService[H]) AppendDAHint(ctx context.Context, daHeight uint64, heig // Start is a part of Service interface. func (syncService *SyncService[H]) Start(ctx context.Context) error { - // setup P2P infrastructure, but don't start Subscriber yet. - peerIDs, err := syncService.setupP2PInfrastructure(ctx) + peerIDs, err := syncService.prepareStart(ctx) if err != nil { - return fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err) - } - - // create syncer, must be before initFromP2PWithRetry which calls startSyncer. - if syncService.syncer, err = newSyncer( - syncService.ex, - syncService.store, - syncService.sub, - []goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)}, - ); err != nil { - return fmt.Errorf("failed to create syncer: %w", err) + return err } // initialize stores from P2P (blocking until genesis is fetched for followers) @@ -223,20 +209,61 @@ func (syncService *SyncService[H]) Start(ctx context.Context) error { return nil } -// startSyncer starts the SyncService's syncer -func (syncService *SyncService[H]) startSyncer(ctx context.Context) error { - if syncService.syncerStatus.isStarted() { - return nil +// StartForPublishing starts the sync service in publisher mode. +// +// This mode is used by a raft leader with an empty local store: no peer can serve +// height 1 yet, so waiting for initFromP2PWithRetry would deadlock block production. +// We still need the P2P exchange server and pubsub subscriber to be ready before the +// first block is produced, because WriteToStoreAndBroadcast relies on them to gossip +// the block that bootstraps the network. +func (syncService *SyncService[H]) StartForPublishing(ctx context.Context) error { + if _, err := syncService.prepareStart(ctx); err != nil { + return err } - if err := syncService.syncer.Start(ctx); err != nil { - return fmt.Errorf("failed to start syncer: %w", err) + if err := syncService.startSubscriber(ctx); err != nil { + return fmt.Errorf("failed to start subscriber: %w", err) } - syncService.syncerStatus.started.Store(true) return nil } +func (syncService *SyncService[H]) prepareStart(ctx context.Context) ([]peer.ID, error) { + // setup P2P infrastructure, but don't start Subscriber yet. + peerIDs, err := syncService.setupP2PInfrastructure(ctx) + if err != nil { + return nil, fmt.Errorf("failed to setup syncer P2P infrastructure: %w", err) + } + + // create syncer, must be before initFromP2PWithRetry which calls startSyncer. + if syncService.syncer, err = newSyncer( + syncService.ex, + syncService.store, + syncService.sub, + []goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.Node.BlockTime.Duration)}, + ); err != nil { + return nil, fmt.Errorf("failed to create syncer: %w", err) + } + + return peerIDs, nil +} + +// startSyncer starts the SyncService's syncer. +// It returns true when this call performed the actual start. +func (syncService *SyncService[H]) startSyncer(ctx context.Context) (bool, error) { + startedNow, err := syncService.syncerStatus.startOnce(func() error { + if err := syncService.syncer.Start(ctx); err != nil { + return fmt.Errorf("failed to start syncer: %w", err) + } + return nil + }) + if err != nil { + return false, err + } + + return startedNow, nil +} + // initStore initializes the store with the given initial header. // it is a no-op if the store is already initialized. // Returns true when the store was initialized by this call. @@ -371,7 +398,7 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee return false, fmt.Errorf("failed to initialize the store: %w", err) } } - if err := syncService.startSyncer(ctx); err != nil { + if _, err := syncService.startSyncer(ctx); err != nil { return false, err } return true, nil @@ -386,6 +413,8 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee p2pInitTimeout := 30 * time.Second timeoutTimer := time.NewTimer(p2pInitTimeout) defer timeoutTimer.Stop() + retryTimer := time.NewTimer(backoff) + defer retryTimer.Stop() for { ok, err := tryInit(ctx) @@ -403,13 +432,13 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee Dur("timeout", p2pInitTimeout). Msg("P2P header sync initialization timed out, deferring to DA sync") return nil - case <-time.After(backoff): + case <-retryTimer.C: } - backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } + retryTimer.Reset(backoff) } } @@ -424,9 +453,9 @@ func (syncService *SyncService[H]) Stop(ctx context.Context) error { syncService.ex.Stop(ctx), syncService.sub.Stop(ctx), ) - if syncService.syncerStatus.isStarted() { - err = errors.Join(err, syncService.syncer.Stop(ctx)) - } + err = errors.Join(err, syncService.syncerStatus.stopIfStarted(func() error { + return syncService.syncer.Stop(ctx) + })) // Stop the store adapter if it has a Stop method if stopper, ok := syncService.store.(interface{ Stop(context.Context) error }); ok { err = errors.Join(err, stopper.Stop(ctx)) diff --git a/pkg/sync/sync_service_test.go b/pkg/sync/sync_service_test.go index 593b95199a..49b1ef6942 100644 --- a/pkg/sync/sync_service_test.go +++ b/pkg/sync/sync_service_test.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -25,6 +26,76 @@ import ( "github.com/evstack/ev-node/types" ) +func TestHeaderSyncServiceStartForPublishingWithPeers(t *testing.T) { + mainKV := sync.MutexWrap(datastore.NewMapDatastore()) + pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) + require.NoError(t, err) + noopSigner, err := noop.NewNoopSigner(pk) + require.NoError(t, err) + rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only + mn := mocknet.New() + + chainID := "test-chain-id" + genesisDoc := genesispkg.Genesis{ + ChainID: chainID, + StartTime: time.Now(), + InitialHeight: 1, + ProposerAddress: []byte("test"), + } + + conf := config.DefaultConfig() + conf.RootDir = t.TempDir() + logger := zerolog.Nop() + + nodeKey1, err := key.LoadOrGenNodeKey(filepath.Join(conf.RootDir, "node1_key.json")) + require.NoError(t, err) + host1, err := mn.AddPeer(nodeKey1.PrivKey, multiaddr.StringCast("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + + nodeKey2, err := key.LoadOrGenNodeKey(filepath.Join(conf.RootDir, "node2_key.json")) + require.NoError(t, err) + host2, err := mn.AddPeer(nodeKey2.PrivKey, multiaddr.StringCast("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + + require.NoError(t, mn.LinkAll()) + require.NoError(t, mn.ConnectAllButSelf()) + + client1, err := p2p.NewClientWithHost(conf.P2P, nodeKey1.PrivKey, mainKV, chainID, logger, p2p.NopMetrics(), host1) + require.NoError(t, err) + client2, err := p2p.NewClientWithHost(conf.P2P, nodeKey2.PrivKey, mainKV, chainID, logger, p2p.NopMetrics(), host2) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, client1.Start(ctx)) + require.NoError(t, client2.Start(ctx)) + t.Cleanup(func() { _ = client1.Close() }) + t.Cleanup(func() { _ = client2.Close() }) + + require.Eventually(t, func() bool { + return len(client1.PeerIDs()) > 0 + }, time.Second, 10*time.Millisecond) + + evStore := store.New(mainKV) + svc, err := NewHeaderSyncService(evStore, conf, genesisDoc, client1, logger) + require.NoError(t, err) + require.NoError(t, svc.StartForPublishing(ctx)) + t.Cleanup(func() { _ = svc.Stop(context.Background()) }) + + headerConfig := types.HeaderConfig{ + Height: genesisDoc.InitialHeight, + DataHash: bytesN(rnd, 32), + AppHash: bytesN(rnd, 32), + Signer: noopSigner, + } + signedHeader, err := types.GetRandomSignedHeaderCustom(&headerConfig, genesisDoc.ChainID) + require.NoError(t, err) + require.NoError(t, signedHeader.Validate()) + + require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) + require.True(t, svc.storeInitialized.Load()) +} + func TestHeaderSyncServiceRestart(t *testing.T) { mainKV := sync.MutexWrap(datastore.NewMapDatastore()) pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) @@ -78,7 +149,7 @@ func TestHeaderSyncServiceRestart(t *testing.T) { require.NoError(t, signedHeader.Validate()) require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) - for i := genesisDoc.InitialHeight + 1; i < 2; i++ { + for i := genesisDoc.InitialHeight + 1; i < 10; i++ { signedHeader = nextHeader(t, signedHeader, genesisDoc.ChainID, noopSigner) t.Logf("signed header: %d", i) require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: signedHeader})) diff --git a/pkg/sync/syncer_status.go b/pkg/sync/syncer_status.go index 1fe26008c0..70364f2762 100644 --- a/pkg/sync/syncer_status.go +++ b/pkg/sync/syncer_status.go @@ -1,13 +1,49 @@ package sync -import "sync/atomic" +import "sync" // SyncerStatus is used by header and block exchange service for keeping track // of the status of the syncer in them. type SyncerStatus struct { - started atomic.Bool + mu sync.Mutex + started bool } func (syncerStatus *SyncerStatus) isStarted() bool { - return syncerStatus.started.Load() + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + return syncerStatus.started +} + +func (syncerStatus *SyncerStatus) startOnce(startFn func() error) (bool, error) { + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + if syncerStatus.started { + return false, nil + } + + if err := startFn(); err != nil { + return false, err + } + + syncerStatus.started = true + return true, nil +} + +func (syncerStatus *SyncerStatus) stopIfStarted(stopFn func() error) error { + syncerStatus.mu.Lock() + defer syncerStatus.mu.Unlock() + + if !syncerStatus.started { + return nil + } + + if err := stopFn(); err != nil { + return err + } + + syncerStatus.started = false + return nil } diff --git a/pkg/sync/syncer_status_test.go b/pkg/sync/syncer_status_test.go new file mode 100644 index 0000000000..01ecbdf490 --- /dev/null +++ b/pkg/sync/syncer_status_test.go @@ -0,0 +1,128 @@ +package sync + +import ( + "errors" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSyncerStatusStartOnce(t *testing.T) { + t.Parallel() + + specs := map[string]struct { + run func(*testing.T, *SyncerStatus) + }{ + "concurrent_start_only_runs_once": { + run: func(t *testing.T, status *SyncerStatus) { + t.Helper() + + var calls atomic.Int32 + started := make(chan struct{}) + release := make(chan struct{}) + var wg sync.WaitGroup + + for range 8 { + wg.Add(1) + go func() { + defer wg.Done() + _, err := status.startOnce(func() error { + if calls.Add(1) == 1 { + close(started) + } + <-release + return nil + }) + require.NoError(t, err) + }() + } + + <-started + close(release) + wg.Wait() + + require.Equal(t, int32(1), calls.Load()) + require.True(t, status.isStarted()) + }, + }, + "failed_start_can_retry": { + run: func(t *testing.T, status *SyncerStatus) { + t.Helper() + + var calls atomic.Int32 + errBoom := errors.New("boom") + + startedNow, err := status.startOnce(func() error { + calls.Add(1) + return errBoom + }) + require.ErrorIs(t, err, errBoom) + require.False(t, startedNow) + require.False(t, status.isStarted()) + + startedNow, err = status.startOnce(func() error { + calls.Add(1) + return nil + }) + require.NoError(t, err) + require.True(t, startedNow) + require.True(t, status.isStarted()) + require.Equal(t, int32(2), calls.Load()) + }, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + t.Parallel() + spec.run(t, &SyncerStatus{}) + }) + } +} + +func TestSyncerStatusStopIfStarted(t *testing.T) { + t.Parallel() + + specs := map[string]struct { + started bool + wantErr bool + }{ + "no_op_when_not_started": { + started: false, + wantErr: false, + }, + "stop_clears_started": { + started: true, + wantErr: false, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + t.Parallel() + + status := &SyncerStatus{started: spec.started} + var stopCalls atomic.Int32 + + err := status.stopIfStarted(func() error { + stopCalls.Add(1) + return nil + }) + + if spec.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + if spec.started { + require.Equal(t, int32(1), stopCalls.Load()) + } else { + require.Zero(t, stopCalls.Load()) + } + require.False(t, status.isStarted()) + }) + } +} diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 30d689d00a..71abb864d2 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -26,6 +26,7 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -34,6 +35,7 @@ import ( evmtest "github.com/evstack/ev-node/execution/evm/test" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" coreda "github.com/evstack/ev-node/pkg/da/types" + "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/rpc/client" rpcclient "github.com/evstack/ev-node/pkg/rpc/client" "github.com/evstack/ev-node/types" @@ -82,17 +84,20 @@ func TestLeaseFailoverE2E(t *testing.T) { clusterNodes := &raftClusterNodes{ nodes: make(map[string]*nodeDetails), } - node1P2PAddr := env.Endpoints.GetRollkitP2PAddress() - node2P2PAddr := env.Endpoints.GetFullNodeP2PAddress() - node3P2PAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PListen := env.Endpoints.GetRollkitP2PAddress() + node2P2PListen := env.Endpoints.GetFullNodeP2PAddress() + node3P2PListen := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node1", node1P2PListen) + node2P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node2", node2P2PListen) + node3P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node3", node3P2PListen) // Start node1 (bootstrap mode) go func() { p2pPeers := node2P2PAddr + "," + node3P2PAddr proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), - bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), env.Endpoints.GetRollkitP2PAddress(), + bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile) - clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) + clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) t.Log("Node1 is up") }() @@ -100,8 +105,8 @@ func TestLeaseFailoverE2E(t *testing.T) { go func() { t.Log("Starting Node2") p2pPeers := node1P2PAddr + "," + node3P2PAddr - proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) - clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) + proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) + clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) t.Log("Node2 is up") }() @@ -112,8 +117,8 @@ func TestLeaseFailoverE2E(t *testing.T) { p2pPeers := node1P2PAddr + "," + node2P2PAddr node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t)) ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort) - proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PAddr, ethEngineURL, node3EthAddr, true, passphraseFile) - clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PAddr, ethEngineURL, node3EthAddr) + proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile) + clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr) t.Log("Node3 is up") }() @@ -172,11 +177,11 @@ func TestLeaseFailoverE2E(t *testing.T) { } } oldDetails := clusterNodes.Details(oldLeader) - restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile) + restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile) t.Log("Restarted old leader to sync with cluster: " + oldLeader) if IsNodeUp(t, oldDetails.rpcAddr, NodeStartupTimeout) { - clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.engineURL, oldDetails.ethAddr) + clusterNodes.Set(oldLeader, oldDetails.rpcAddr, restartedNodeProcess, oldDetails.ethAddr, oldDetails.raftAddr, "", oldDetails.p2pPeerAddr, oldDetails.engineURL, oldDetails.ethAddr) } else { t.Log("+++ old leader did not recover on restart. Skipping node verification") } @@ -276,17 +281,20 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { clusterNodes := &raftClusterNodes{ nodes: make(map[string]*nodeDetails), } - node1P2PAddr := env.Endpoints.GetRollkitP2PAddress() - node2P2PAddr := env.Endpoints.GetFullNodeP2PAddress() - node3P2PAddr := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PListen := env.Endpoints.GetRollkitP2PAddress() + node2P2PListen := env.Endpoints.GetFullNodeP2PAddress() + node3P2PListen := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", mustGetAvailablePort(t)) + node1P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node1", node1P2PListen) + node2P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node2", node2P2PListen) + node3P2PAddr := mustNodeP2PMultiAddr(t, workDir, "node3", node3P2PListen) // Start node1 (bootstrap mode) go func() { p2pPeers := node2P2PAddr + "," + node3P2PAddr proc := setupRaftSequencerNode(t, sut, workDir, "node1", node1RaftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), - bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), env.Endpoints.GetRollkitP2PAddress(), + bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetRollkitRPCListen(), node1P2PListen, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL(), true, passphraseFile) - clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) + clusterNodes.Set("node1", env.Endpoints.GetRollkitRPCAddress(), proc, env.Endpoints.GetSequencerEthURL(), node1RaftAddr, node1P2PListen, node1P2PAddr, env.Endpoints.GetSequencerEngineURL(), env.Endpoints.GetSequencerEthURL()) t.Log("Node1 is up") }() @@ -294,8 +302,8 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { go func() { t.Log("Starting Node2") p2pPeers := node1P2PAddr + "," + node3P2PAddr - proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) - clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, env.Endpoints.GetFullNodeP2PAddress(), env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) + proc := setupRaftSequencerNode(t, sut, workDir, "node2", node2RaftAddr, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, env.Endpoints.GetFullNodeRPCListen(), node2P2PListen, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL(), true, passphraseFile) + clusterNodes.Set("node2", env.Endpoints.GetFullNodeRPCAddress(), proc, env.Endpoints.GetFullNodeEthURL(), node2RaftAddr, node2P2PListen, node2P2PAddr, env.Endpoints.GetFullNodeEngineURL(), env.Endpoints.GetFullNodeEthURL()) t.Log("Node2 is up") }() @@ -306,8 +314,8 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { p2pPeers := node1P2PAddr + "," + node2P2PAddr node3RPCListen := fmt.Sprintf("127.0.0.1:%d", mustGetAvailablePort(t)) ethEngineURL := fmt.Sprintf("http://127.0.0.1:%s", fullNode3EnginePort) - proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PAddr, ethEngineURL, node3EthAddr, true, passphraseFile) - clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PAddr, ethEngineURL, node3EthAddr) + proc := setupRaftSequencerNode(t, sut, workDir, "node3", node3RaftAddr, jwtSecret3, env.GenesisHash, env.Endpoints.GetDAAddress(), bootstrapDir, raftCluster, p2pPeers, node3RPCListen, node3P2PListen, ethEngineURL, node3EthAddr, true, passphraseFile) + clusterNodes.Set("node3", "http://"+node3RPCListen, proc, node3EthAddr, node3RaftAddr, node3P2PListen, node3P2PAddr, ethEngineURL, node3EthAddr) t.Log("Node3 is up") }() @@ -393,7 +401,7 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { nodeDetails.engineURL, nodeDetails.ethAddr, false, passphraseFile) clusterNodes.Set(nodeName, nodeDetails.rpcAddr, restartedProc, nodeDetails.ethAddr, - nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.engineURL, nodeDetails.ethAddr) + nodeDetails.raftAddr, nodeDetails.p2pAddr, nodeDetails.p2pPeerAddr, nodeDetails.engineURL, nodeDetails.ethAddr) } // Initial restart of all nodes @@ -721,6 +729,16 @@ func initChain(t *testing.T, sut *SystemUnderTest, workDir string) string { require.NoError(t, err, "failed to init node", output) return passphraseFile } + +func mustNodeP2PMultiAddr(t *testing.T, workDir, nodeID, listenAddr string) string { + t.Helper() + nodeKey, err := key.LoadOrGenNodeKey(filepath.Join(workDir, nodeID, "config")) + require.NoError(t, err) + peerID, err := peer.IDFromPrivateKey(nodeKey.PrivKey) + require.NoError(t, err) + return fmt.Sprintf("%s/p2p/%s", listenAddr, peerID.String()) +} + func setupRaftSequencerNode( t *testing.T, sut *SystemUnderTest, @@ -843,6 +861,7 @@ type nodeDetails struct { xRPCClient atomic.Pointer[rpcclient.Client] running atomic.Bool p2pAddr string + p2pPeerAddr string engineURL string ethURL string } @@ -895,10 +914,10 @@ type raftClusterNodes struct { nodes map[string]*nodeDetails } -func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, engineURL string, ethURL string) { +func (c *raftClusterNodes) Set(node string, listen string, proc *os.Process, eth string, raftAddr string, p2pAddr string, p2pPeerAddr string, engineURL string, ethURL string) { c.mx.Lock() defer c.mx.Unlock() - d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, engineURL: engineURL, ethURL: ethURL} + d := &nodeDetails{raftAddr: raftAddr, rpcAddr: listen, process: proc, ethAddr: eth, p2pAddr: p2pAddr, p2pPeerAddr: p2pPeerAddr, engineURL: engineURL, ethURL: ethURL} d.running.Store(true) c.nodes[node] = d } @@ -946,11 +965,9 @@ func leader(t require.TestingT, nodes map[string]*nodeDetails) (string, *nodeDet } resp, err := client.Get(details.rpcAddr + "/raft/node") require.NoError(t, err) - defer resp.Body.Close() - var status nodeStatus require.NoError(t, json.NewDecoder(resp.Body).Decode(&status)) - + _ = resp.Body.Close() if status.IsLeader { return node, details } @@ -973,16 +990,17 @@ func must[T any](r T, err error) T { func IsNodeUp(t *testing.T, rpcAddr string, timeout time.Duration) bool { t.Helper() t.Logf("Query node is up: %s", rpcAddr) - ctx, done := context.WithTimeout(context.Background(), timeout) + ctx, done := context.WithTimeout(t.Context(), timeout) defer done() - ticker := time.Tick(min(timeout/10, 200*time.Millisecond)) + ticker := time.NewTicker(min(timeout/10, 200*time.Millisecond)) + defer ticker.Stop() c := client.NewClient(rpcAddr) require.NotNil(t, c) var lastBlock uint64 for { select { - case <-ticker: + case <-ticker.C: switch s, err := c.GetState(ctx); { case err != nil: // ignore case lastBlock == 0: diff --git a/test/e2e/sut_helper.go b/test/e2e/sut_helper.go index f5783da8bb..633d1efdbb 100644 --- a/test/e2e/sut_helper.go +++ b/test/e2e/sut_helper.go @@ -162,8 +162,12 @@ func (s *SystemUnderTest) awaitProcessCleanup(cmd *exec.Cmd) { s.cmdToPids[cmdKey] = append(s.cmdToPids[cmdKey], pid) s.pidsLock.Unlock() go func() { - _ = cmd.Wait() // blocks until shutdown - s.logf("Process stopped, pid: %d\n", pid) + waitErr := cmd.Wait() // blocks until shutdown + if waitErr != nil { + s.logf("Process stopped, pid: %d, err: %v\n", pid, waitErr) + } else { + s.logf("Process stopped, pid: %d\n", pid) + } s.pidsLock.Lock() defer s.pidsLock.Unlock() delete(s.pids, pid) @@ -182,11 +186,9 @@ func (s *SystemUnderTest) watchLogs(prefix string, cmd *exec.Cmd) { outReader, err := cmd.StdoutPipe() require.NoError(s.t, err) - if s.debug { - logDir := filepath.Join(WorkDir, "testnet") + if logDir := s.processLogDir(); logDir != "" { require.NoError(s.t, os.MkdirAll(logDir, 0o750)) - testName := strings.ReplaceAll(s.t.Name(), "/", "-") - logfileName := filepath.Join(logDir, prefix+fmt.Sprintf("exec-%s-%s-%d.out", filepath.Base(cmd.Args[0]), testName, time.Now().UnixNano())) + logfileName := filepath.Join(logDir, prefix+fmt.Sprintf("exec-%s-%d.out", filepath.Base(cmd.Args[0]), time.Now().UnixNano())) logfile, err := os.Create(logfileName) require.NoError(s.t, err) errReader = io.NopCloser(io.TeeReader(errReader, logfile)) @@ -202,6 +204,19 @@ func (s *SystemUnderTest) watchLogs(prefix string, cmd *exec.Cmd) { }) } +func (s *SystemUnderTest) processLogDir() string { + logRoot := strings.TrimSpace(os.Getenv("EV_E2E_LOG_DIR")) + if logRoot == "" && s.debug { + logRoot = filepath.Join(WorkDir, "testnet") + } + if logRoot == "" { + return "" + } + + testName := strings.ReplaceAll(s.t.Name(), "/", "-") + return filepath.Join(logRoot, testName) +} + // PrintBuffer outputs the contents of outBuff and errBuff to stdout, prefixing each entry with "out>" or "err>", respectively. func (s *SystemUnderTest) PrintBuffer() { out := os.Stdout diff --git a/test/mocks/store.go b/test/mocks/store.go index 911832ebab..34542227e8 100644 --- a/test/mocks/store.go +++ b/test/mocks/store.go @@ -1051,3 +1051,54 @@ func (_c *MockStore_SetMetadata_Call) RunAndReturn(run func(ctx context.Context, _c.Call.Return(run) return _c } + +// Sync provides a mock function for the type MockStore +func (_mock *MockStore) Sync(ctx context.Context) error { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Sync") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockStore_Sync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Sync' +type MockStore_Sync_Call struct { + *mock.Call +} + +// Sync is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockStore_Expecter) Sync(ctx interface{}) *MockStore_Sync_Call { + return &MockStore_Sync_Call{Call: _e.mock.On("Sync", ctx)} +} + +func (_c *MockStore_Sync_Call) Run(run func(ctx context.Context)) *MockStore_Sync_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockStore_Sync_Call) Return(err error) *MockStore_Sync_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockStore_Sync_Call) RunAndReturn(run func(ctx context.Context) error) *MockStore_Sync_Call { + _c.Call.Return(run) + return _c +}