-
Notifications
You must be signed in to change notification settings - Fork 878
rpc sharding by tx sender for autobahn #3438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ffac0a1
4e7a82d
0f5af37
ed35c51
93cd15f
3086a71
1141f9f
c7ce3a0
956b0e5
eb2c2c7
b04b46f
7bba698
9df91b7
46faae0
ad576ab
149f9c8
3ec92c0
a92bad2
7744232
caf7a9a
a0d0a18
2c18b75
5034212
d5a44f6
24bded4
4f45db2
ad1280d
4d86336
f8d4d59
f455ccf
7e86151
661dada
2dd645f
c342b5c
51981c0
c136a19
655d393
d60cf36
8cac6bc
307c709
af313d9
5dd0813
641c0da
9beb38f
2b05133
0dbb903
eaa2ed9
b25b31c
bbee25c
e345b59
2a07fc2
2138fe1
46e1720
42cdb5a
6b5daef
2767591
16d45b4
8aec53a
5eb329d
d4d7902
6a6edaf
f99a6da
0feb1a1
bf85d26
2259bf4
904f8a0
646ecff
b795714
3fe7a82
93dd1cb
d7a97fd
0796293
46b74e1
9c3516e
51c11db
0b5649d
33794ff
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,8 +40,9 @@ var ( | |
| rpcTelemetryMeter = otel.Meter("evmrpc") | ||
|
|
||
| metrics = struct { | ||
| requestLatencySeconds metric.Float64Histogram | ||
| wsConnectionCount metric.Int64Counter | ||
| requestLatencySeconds metric.Float64Histogram | ||
| wsConnectionCount metric.Int64Counter | ||
| redirectedRequestCount metric.Int64Counter | ||
| }{ | ||
| requestLatencySeconds: must(rpcTelemetryMeter.Float64Histogram( | ||
| "evmrpc_request_latency_seconds", | ||
|
|
@@ -57,6 +58,11 @@ var ( | |
| metric.WithDescription("Number of new websocket connections"), | ||
| metric.WithUnit("{count}"), | ||
| )), | ||
| redirectedRequestCount: must(rpcTelemetryMeter.Int64Counter( | ||
| "evmrpc_redirected_requests_total", | ||
| metric.WithDescription("Number of EVM RPC requests redirected to another validator"), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can you clarify whether this is request you forwarded to someone else or it is request you received from redirection? |
||
| metric.WithUnit("{count}"), | ||
| )), | ||
| } | ||
| ) | ||
|
|
||
|
|
@@ -125,3 +131,12 @@ func recordRPCLatency(ctx context.Context, endpoint, connection string, success | |
| func recordWebsocketConnect(ctx context.Context) { | ||
| metrics.wsConnectionCount.Add(ctx, 1) | ||
| } | ||
|
|
||
| func recordRedirectedRequest(ctx context.Context, endpoint, connection string) { | ||
| metrics.redirectedRequestCount.Add(ctx, 1, | ||
| metric.WithAttributes( | ||
| attribute.String(endpointKey, endpoint), | ||
| attribute.String(connectionKey, connection), | ||
| ), | ||
| ) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "math/big" | ||
| "sync" | ||
| "time" | ||
|
|
||
|
|
@@ -77,25 +78,46 @@ func (s *SendAPI) SendRawTransaction(ctx context.Context, input hexutil.Bytes) ( | |
| return | ||
| } | ||
| hash = tx.Hash() | ||
| var txData ethtx.TxData | ||
| txData, err = ethtx.NewTxDataFromTx(tx) | ||
| if err != nil { | ||
| return | ||
| // getSender fails for AccessListTx, in which case we are not able to proxy or simulate, | ||
| // but we still need to handle it. | ||
| sender, senderErr := getSender(tx, s.keeper.ChainID(s.ctxProvider(LatestCtxHeight))) | ||
| if senderErr == nil { | ||
| if url, ok := s.tmClient.EvmProxy(sender); ok { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's our plan for handling the case where one validator is down? |
||
| recordRedirectedRequest(ctx, "eth_sendRawTransaction", string(s.connectionType)) | ||
| // HTTP transport pooling already happens globally underneath net/http, so | ||
| // creating a fresh RPC client per proxied request is fine here. If we | ||
| // start proxying over WebSocket, we'll need explicit custom pooling since | ||
| // the underlying TCP connection lifecycle is strictly bound to Dial -> Close calls. | ||
| client, err := rpc.DialContext(ctx, url.String()) | ||
| if err != nil { | ||
| return hash, fmt.Errorf("rpc.DialContext(%q): %w", url.String(), err) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would we see one error log per tx when one validator is down? That would be too spammy |
||
| } | ||
| defer client.Close() | ||
|
|
||
| if err := client.CallContext(ctx, &hash, "eth_sendRawTransaction", input); err != nil { | ||
| return hash, fmt.Errorf("eth_sendRawTransaction(%q): %w", url.String(), err) | ||
| } | ||
| return hash, nil | ||
| } | ||
| } | ||
| var msg *types.MsgEVMTransaction | ||
| msg, err = types.NewMsgEVMTransaction(txData) | ||
|
|
||
| txData, err := ethtx.NewTxDataFromTx(tx) | ||
| if err != nil { | ||
| return | ||
| return hash, err | ||
| } | ||
| var gasUsedEstimate uint64 | ||
| gasUsedEstimate, err = s.simulateTx(ctx, tx) | ||
| msg, err := types.NewMsgEVMTransaction(txData) | ||
| if err != nil { | ||
| tx, _ = msg.AsTransaction() | ||
| gasUsedEstimate = tx.Gas() // if issue simulating, fallback to gas limit | ||
| return hash, err | ||
| } | ||
| gasUsedEstimate := tx.Gas() // if issue simulating, fallback to gas limit | ||
| if senderErr == nil { // simulation requires sender. | ||
| if gas, err := s.simulateTx(ctx, sender, tx); err == nil { | ||
| gasUsedEstimate = gas | ||
| } | ||
| } | ||
| txBuilder := s.txConfigProvider(LatestCtxHeight).NewTxBuilder() | ||
| if err = txBuilder.SetMsgs(msg); err != nil { | ||
| return | ||
| if err := txBuilder.SetMsgs(msg); err != nil { | ||
| return hash, err | ||
| } | ||
| txBuilder.SetGasEstimate(gasUsedEstimate) | ||
| txbz, encodeErr := s.txConfigProvider(LatestCtxHeight).TxEncoder()(txBuilder.GetTx()) | ||
|
|
@@ -125,30 +147,30 @@ func (s *SendAPI) SendRawTransaction(ctx context.Context, input hexutil.Bytes) ( | |
| return | ||
| } | ||
|
|
||
| func (s *SendAPI) simulateTx(ctx context.Context, tx *ethtypes.Transaction) (estimate uint64, err error) { | ||
| var from common.Address | ||
| if tx.Type() == ethtypes.DynamicFeeTxType { | ||
| signer := ethtypes.NewLondonSigner(s.keeper.ChainID(s.ctxProvider(LatestCtxHeight))) | ||
| from, err = signer.Sender(tx) | ||
| func getSender(tx *ethtypes.Transaction, chainID *big.Int) (common.Address, error) { | ||
| switch { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we just use |
||
| case tx.Type() == ethtypes.DynamicFeeTxType: | ||
| from, err := ethtypes.NewLondonSigner(chainID).Sender(tx) | ||
| if err != nil { | ||
| err = fmt.Errorf("failed to get sender for dynamic fee tx: %w", err) | ||
| return | ||
| return common.Address{}, fmt.Errorf("failed to get sender for dynamic fee tx: %w", err) | ||
| } | ||
| } else if tx.Protected() { | ||
| signer := ethtypes.NewEIP155Signer(s.keeper.ChainID(s.ctxProvider(LatestCtxHeight))) | ||
| from, err = signer.Sender(tx) | ||
| return from, nil | ||
| case tx.Protected(): | ||
| from, err := ethtypes.NewEIP155Signer(chainID).Sender(tx) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would we always have NewEIP155Signer in tx.Protected()? Would |
||
| if err != nil { | ||
| err = fmt.Errorf("failed to get sender for protected tx: %w", err) | ||
| return | ||
| return common.Address{}, fmt.Errorf("failed to get sender for protected tx: %w", err) | ||
| } | ||
| } else { | ||
| signer := ethtypes.HomesteadSigner{} | ||
| from, err = signer.Sender(tx) | ||
| return from, nil | ||
| default: | ||
| from, err := ethtypes.HomesteadSigner{}.Sender(tx) | ||
| if err != nil { | ||
| err = fmt.Errorf("failed to get sender for homestead tx: %w", err) | ||
| return | ||
| return common.Address{}, fmt.Errorf("failed to get sender for homestead tx: %w", err) | ||
| } | ||
| return from, nil | ||
| } | ||
| } | ||
|
|
||
| func (s *SendAPI) simulateTx(ctx context.Context, sender common.Address, tx *ethtypes.Transaction) (estimate uint64, err error) { | ||
| input_ := (hexutil.Bytes)(tx.Data()) | ||
| gas_ := hexutil.Uint64(tx.Gas()) | ||
| nonce_ := hexutil.Uint64(tx.Nonce()) | ||
|
|
@@ -165,7 +187,7 @@ func (s *SendAPI) simulateTx(ctx context.Context, tx *ethtypes.Transaction) (est | |
| gp = nil | ||
| } | ||
| txArgs := export.TransactionArgs{ | ||
| From: &from, | ||
| From: &sender, | ||
| To: tx.To(), | ||
| Gas: &gas_, | ||
| GasPrice: (*hexutil.Big)(gp), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: would be nice if both ports come from a common config