Skip to content
50 changes: 35 additions & 15 deletions sei-tendermint/internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,20 +135,33 @@ func NewBlockPool(
func (pool *BlockPool) OnStart(ctx context.Context) error {
pool.lastAdvance = time.Now()
pool.lastHundredBlockTimeStamp = pool.lastAdvance
go pool.makeRequestersRoutine(ctx)
pool.Spawn("makeRequestersRoutine", func(ctx context.Context) error {
pool.makeRequestersRoutine(ctx)
return nil
})

return nil
}

func (pool *BlockPool) OnStop() {
// Requester shutdown must not block behind a full requestsCh; Stop cancels ctx
// and waits for the Spawn-managed requester goroutine to exit.
pool.mtx.Lock()
defer pool.mtx.Unlock()
cancels := pool.cancels
pool.cancels = nil
requesters := make([]*bpRequester, 0, len(pool.requesters))
for _, requester := range pool.requesters {
requesters = append(requesters, requester)
}
pool.mtx.Unlock()

// cancel all running requesters if any
for _, cancel := range pool.cancels {
// Stop requesters outside pool.mtx; their shutdown path may observe pool state.
for _, cancel := range cancels {
cancel()
}
pool.cancels = []context.CancelFunc{}
for _, requester := range requesters {
requester.Stop()
}
}

// spawns requesters as needed
Expand Down Expand Up @@ -500,11 +513,16 @@ func (pool *BlockPool) requestersLen() int64 {
return int64(len(pool.requesters))
}

func (pool *BlockPool) sendRequest(height int64, peerID types.NodeID) {
func (pool *BlockPool) sendRequest(ctx context.Context, height int64, peerID types.NodeID) bool {
if !pool.IsRunning() {
return
return false
}
select {
case pool.requestsCh <- BlockRequest{height, peerID}:
return true
case <-ctx.Done():
return false
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unprotected sendError can deadlock shutdown under new wg tracking

Low Severity

sendRequest was correctly updated to select on ctx.Done() to avoid blocking during shutdown, but sendError still performs an unconditional channel send. Previously this was benign because makeRequestersRoutine wasn't tracked by the WaitGroup. Now that it's routed through Spawn, if makeRequestersRoutine calls removeTimedoutPeerssendError while errorsCh is full, it blocks. Since pool.wg.Wait() inside pool.Stop() now waits for makeRequestersRoutine, and pool.Stop() is called from Reactor.OnStop (which runs before reactor.wg.Wait), the reactor's requestRoutine that drains errorsCh may have already exited via its own ctx.Done(), creating a deadlock. The 1000-entry buffer makes this extremely unlikely in practice.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 8c54806. Configure here.

}
pool.requestsCh <- BlockRequest{height, peerID}
}

func (pool *BlockPool) sendError(err error, peerID types.NodeID) {
Expand Down Expand Up @@ -622,7 +640,10 @@ func newBPRequester(pool *BlockPool, height int64) *bpRequester {
}

func (bpr *bpRequester) OnStart(ctx context.Context) error {
go bpr.requestRoutine(ctx)
bpr.Spawn("requestRoutine", func(ctx context.Context) error {
bpr.requestRoutine(ctx)
return nil
})
return nil
}

Expand Down Expand Up @@ -695,14 +716,15 @@ func (bpr *bpRequester) redo(peerID types.NodeID, retryReason RetryReason) {
// Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called)
func (bpr *bpRequester) requestRoutine(ctx context.Context) {
defer bpr.timeoutTicker.Stop()

OUTER_LOOP:
for {
// Pick a peer to send request to.
var peer *bpPeer
PICK_PEER_LOOP:
for {
if !bpr.IsRunning() || !bpr.pool.IsRunning() || ctx.Err() != nil {
bpr.timeoutTicker.Stop()
return
}
if ctx.Err() != nil {
Expand All @@ -724,13 +746,14 @@ OUTER_LOOP:
bpr.mtx.Unlock()

// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
if !bpr.pool.sendRequest(ctx, bpr.height, peer.id) {
return
}
bpr.timeoutTicker.Reset(peerTimeout)
WAIT_LOOP:
for {
select {
case <-ctx.Done():
bpr.timeoutTicker.Stop()
return
case redoOp := <-bpr.redoCh:
// if we don't have an existing block or this is a bad block
Expand All @@ -747,9 +770,6 @@ OUTER_LOOP:
}
case <-bpr.gotBlockCh:
// We got a block!
// Stop the goroutine to avoid leak
bpr.timeoutTicker.Stop()
bpr.Stop()
return
}
}
Expand Down
85 changes: 67 additions & 18 deletions sei-tendermint/internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func GetChannelDescriptor() p2p.ChannelDescriptor[*pb.Message] {
type consensusReactor interface {
// For when we switch from block sync reactor to the consensus
// machine.
SwitchToConsensus(ctx context.Context, state sm.State, skipWAL bool)
SwitchToConsensus(state sm.State, skipWAL bool)
}

type peerError struct {
Expand All @@ -84,6 +84,8 @@ func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}

type blocksyncResult struct{ stateSynced bool }

// Reactor handles long-term catchup syncing.
type Reactor struct {
service.BaseService
Expand All @@ -100,6 +102,16 @@ type Reactor struct {
blockSync *atomicBool
previousMaxPeerHeight int64

// blocksyncReady fires when blocksync should start processing blocks —
// either at OnStart (if blockSync was initially set) or via
// SwitchToBlockSync. Pre-spawned requestRoutine and poolRoutine wait on
// it before doing any work.
blocksyncReady utils.AtomicSend[utils.Option[blocksyncResult]]
// consensusReady fires once the blocksync->consensus handoff has
// happened. The pre-spawned autoRestartIfBehind monitor gates on this
// signal.
consensusReady utils.AtomicSend[bool]

router *p2p.Router
channel *p2p.Channel[*pb.Message]

Expand Down Expand Up @@ -150,8 +162,9 @@ func NewReactor(
blocksBehindThreshold: selfRemediationConfig.BlocksBehindThreshold,
blocksBehindCheckInterval: time.Duration(selfRemediationConfig.BlocksBehindCheckIntervalSeconds) * time.Second, //nolint:gosec // validated in config.ValidateBasic against MaxInt64
restartCooldownSeconds: selfRemediationConfig.RestartCooldownSeconds,
blocksyncReady: utils.NewAtomicSend(utils.None[blocksyncResult]()),
consensusReady: utils.NewAtomicSend(false),
}

r.BaseService = *service.NewBaseService("BlockSync", r)
return r, nil
}
Expand Down Expand Up @@ -186,23 +199,60 @@ func (r *Reactor) OnStart(ctx context.Context) error {
r.requestsCh = requestsCh
r.errorsCh = errorsCh

// Pre-spawn all long-running routines so their lifetime is bound to the
// BaseService WaitGroup. Conditional routines gate on AtomicSend[bool]
// signals so SwitchToBlockSync (and the in-poolRoutine consensus handoff
// for autoRestartIfBehind) can wake them later without spawning fresh
// goroutines from outside OnStart.
r.Spawn("requestRoutine", func(ctx context.Context) error {
_, err := r.blocksyncReady.Wait(ctx, func(o utils.Option[blocksyncResult]) bool {
return o.IsPresent()
})
if err != nil {
return err
}
r.requestRoutine(ctx)
return nil
})
r.Spawn("poolRoutine", func(ctx context.Context) error {
result, err := r.blocksyncReady.Wait(ctx, func(o utils.Option[blocksyncResult]) bool {
return o.IsPresent()
})
if err != nil {
return err
}
res := result.OrPanic("no blocksync result")
r.poolRoutine(ctx, res.stateSynced)
return nil
})
r.SpawnCritical("processBlockSyncCh", func(ctx context.Context) error {
r.processBlockSyncCh(ctx)
return nil
})
r.SpawnCritical("processPeerUpdates", func(ctx context.Context) error {
r.processPeerUpdates(ctx)
return nil
})
r.SpawnCritical("autoRestartIfBehind", func(ctx context.Context) error {
if _, err := r.consensusReady.Wait(ctx, func(ready bool) bool { return ready }); err != nil {
return err
}
r.autoRestartIfBehind(ctx)
return nil
})

if r.blockSync.IsSet() {
if err := r.pool.Start(ctx); err != nil {
return err
}
go r.requestRoutine(ctx)

go r.poolRoutine(ctx, false)
r.blocksyncReady.Store(utils.Some(blocksyncResult{false}))
}

go r.processBlockSyncCh(ctx)
go r.processPeerUpdates(ctx)

return nil
}

// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
// OnStop stops the BlockPool. The reactor's own long-running goroutines were
// registered with the BaseService WaitGroup via Spawn in OnStart, so the
// BaseService blocks Stop() on their exit before this method returns.
func (r *Reactor) OnStop() {
if r.blockSync.IsSet() {
r.pool.Stop()
Expand Down Expand Up @@ -380,9 +430,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
}

r.syncStartTime = time.Now()

go r.requestRoutine(ctx)
go r.poolRoutine(ctx, true)
r.blocksyncReady.Store(utils.Some(blocksyncResult{true}))

if err := r.PublishStatus(types.EventDataBlockSyncStatus{
Complete: false,
Expand Down Expand Up @@ -475,10 +523,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {

if r.consReactor != nil {
logger.Info("switching to consensus reactor", "height", height, "blocks_synced", blocksSynced, "state_synced", stateSynced, "max_peer_height", r.pool.MaxPeerHeight())
r.consReactor.SwitchToConsensus(ctx, state, blocksSynced > 0 || stateSynced)

// Auto restart should only be checked after switching to consensus mode
go r.autoRestartIfBehind(ctx)
Comment thread
pompon0 marked this conversation as resolved.
// Use the node-scoped context: SwitchToConsensus is a handoff
// to a peer reactor whose lifecycle is not tied to blocksync.
r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
// Wake the pre-spawned auto-restart monitor.
r.consensusReady.Store(true)
}

return
Expand Down
81 changes: 81 additions & 0 deletions sei-tendermint/internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package blocksync

import (
"context"
"runtime"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -242,6 +244,85 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
rts.network.Remove(t, rts.nodes[0])
}

// TestReactor_OnStopWaitsForGoroutines is a regression test for the
// "panic: leveldb/table: reader released" shutdown panic seen on v6.4.4
// sentry nodes. Before the fix, blocksync's long-running goroutines
// (Reactor.requestRoutine, Reactor.poolRoutine, Reactor.processBlockSyncCh,
// Reactor.processPeerUpdates, Reactor.autoRestartIfBehind, and
// BlockPool.makeRequestersRoutine) were started with raw `go fn(ctx)` using
// the outer ctx, instead of `Spawn(...)` which would register them with the
// BaseService WaitGroup and bind them to BaseService.inner.ctx. As a result,
// Reactor.Stop() / BlockPool.Stop() — which cancels only the inner ctx —
// did not signal these goroutines to exit, let alone wait for them. The
// node's OnStop then proceeded to n.blockStore.Close() while poolRoutine
// was still mid-SaveBlock -> Base() -> bs.db.Iterator, causing goleveldb to
// panic when the table reader was released underneath the live iterator.
//
// This test asserts the fix: after `reactor.Stop()` returns, the
// blocksync-package goroutines have exited. The outer ctx is still live at
// this point in the test, so the unfixed code keeps them running and the
// assertion fails deterministically. On failure the live goroutine stacks
// are dumped to make the leak obvious.
func TestReactor_OnStopWaitsForGoroutines(t *testing.T) {
ctx := t.Context()

cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_stop_test")
require.NoError(t, err)

valSet, privVals := factory.ValidatorSet(ctx, 1, 30)
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams())

rts := setup(ctx, t, genDoc, privVals[0], []int64{0})

reactor := rts.reactors[rts.nodes[0]]
require.True(t, reactor.IsRunning())

dumpBlocksyncGoroutines := func() (string, int) {
buf := make([]byte, 1<<20)
n := runtime.Stack(buf, true)
var out strings.Builder
count := 0
for _, g := range strings.Split(string(buf[:n]), "\n\n") {
if !strings.Contains(g, "/internal/blocksync.") {
continue
}
// The test functions themselves live in the blocksync package, so
// runtime.Stack reports them as matches. Only count background
// routines spawned by Reactor.OnStart and BlockPool.OnStart,
// which are created by libs/service.Spawn, not testing.tRunner.
if strings.Contains(g, "testing.tRunner") {
continue
}
out.WriteString(g)
out.WriteString("\n\n")
count++
}
return out.String(), count
}

// OnStart Spawns 5 reactor routines and BlockPool.OnStart Spawns 1.
require.Eventually(t, func() bool {
_, c := dumpBlocksyncGoroutines()
return c >= 6
}, 5*time.Second, 10*time.Millisecond, "blocksync goroutines did not start")

reactor.Stop()
require.False(t, reactor.IsRunning())

deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if _, c := dumpBlocksyncGoroutines(); c == 0 {
return
}
time.Sleep(time.Millisecond)
}
dump, c := dumpBlocksyncGoroutines()
t.Fatalf("%d blocksync goroutine(s) still alive after Reactor.Stop() returned. "+
"This means at least one routine was not registered with the "+
"BaseService WaitGroup via Spawn(), so Stop did not wait for it. "+
"Live stacks:\n\n%s", c, dump)
}

func TestReactor_SyncTime(t *testing.T) {
ctx := t.Context()

Expand Down
2 changes: 1 addition & 1 deletion sei-tendermint/internal/consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ package consensus
//
// for _, reactor := range rts.reactors {
// reactor.StopWaitSync()
// reactor.SwitchToConsensus(ctx, reactor.state.GetState(), false)
// reactor.SwitchToConsensus(reactor.state.GetState(), false)
// }
//
// // Evidence should be submitted and committed at the third height but
Expand Down
4 changes: 2 additions & 2 deletions sei-tendermint/internal/consensus/invalid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGossipVotesForHeightPoisonedProposalPOL(t *testing.T) {
reactor := rts.reactors[nodeIDs[0]]
peerID := nodeIDs[1]
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)

require.Eventually(t, func() bool {
_, ok := reactor.GetPeerState(peerID)
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {

for _, reactor := range rts.reactors {
state := reactor.state.GetState()
reactor.SwitchToConsensus(ctx, state, false)
reactor.SwitchToConsensus(state, false)
}

// this val sends a random precommit at each height
Expand Down
2 changes: 1 addition & 1 deletion sei-tendermint/internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (r *Reactor) WaitSync() bool {

// SwitchToConsensus switches from block-sync mode to consensus mode. It resets
// the state, turns off block-sync, and starts the consensus state-machine.
func (r *Reactor) SwitchToConsensus(ctx context.Context, state sm.State, skipWAL bool) {
func (r *Reactor) SwitchToConsensus(state sm.State, skipWAL bool) {
logger.Info("switching to consensus")

d := types.EventDataBlockSyncStatus{Complete: true, Height: state.LastBlockHeight}
Expand Down
Loading
Loading