Skip to content

p2p: support new msg broadcast features; #3043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions consensus/parlia/parlia.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ func (p *Parlia) Author(header *types.Header) (common.Address, error) {
return header.Coinbase, nil
}

// ConsensusAddress returns the consensus address of the validator
func (p *Parlia) ConsensusAddress() common.Address {
return p.val
}

// VerifyHeader checks whether a header conforms to the consensus rules.
func (p *Parlia) VerifyHeader(chain consensus.ChainHeaderReader, header *types.Header) error {
return p.verifyHeader(chain, header, nil)
Expand Down
27 changes: 15 additions & 12 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,18 +372,21 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
DirectBroadcast: config.DirectBroadcast,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: peers,
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
DirectBroadcast: config.DirectBroadcast,
EnableBroadcastFeatures: stack.Config().EnableBroadcastFeatures,
DirectBroadcastList: stack.Config().P2P.DirectBroadcastList,
ProxyedValidatorList: stack.Config().P2P.ProxyedValidatorList,
DisablePeerTxBroadcast: config.DisablePeerTxBroadcast,
PeerSet: peers,
}); err != nil {
return nil, err
}
Expand Down
117 changes: 83 additions & 34 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/parlia"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/monitor"
Expand Down Expand Up @@ -113,26 +114,32 @@ type votePool interface {
// handlerConfig is the collection of initialization parameters to create a full
// node network handler.
type handlerConfig struct {
NodeID enode.ID // P2P node ID used for tx propagation topology
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
VotePool votePool
Network uint64 // Network identifier to adfvertise
Sync ethconfig.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
DirectBroadcast bool
DisablePeerTxBroadcast bool
PeerSet *peerSet
NodeID enode.ID // P2P node ID used for tx propagation topology
Database ethdb.Database // Database for direct sync insertions
Chain *core.BlockChain // Blockchain to serve data from
TxPool txPool // Transaction pool to propagate from
VotePool votePool
Network uint64 // Network identifier to adfvertise
Sync ethconfig.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
DirectBroadcast bool
DisablePeerTxBroadcast bool
PeerSet *peerSet
EnableBroadcastFeatures bool
DirectBroadcastList []string
ProxyedValidatorList []string
}

type handler struct {
nodeID enode.ID
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
disablePeerTxBroadcast bool
nodeID enode.ID
networkID uint64
forkFilter forkid.Filter // Fork ID filter, constant across the lifetime of the node
disablePeerTxBroadcast bool
enableBroadcastFeatures bool
directBroadcastList []string
proxyedValidatorList []string

snapSync atomic.Bool // Flag whether snap sync is enabled (gets disabled if we already have blocks)
synced atomic.Bool // Flag whether we're considered synchronised (enables transaction processing)
Expand Down Expand Up @@ -187,23 +194,26 @@ func newHandler(config *handlerConfig) (*handler, error) {
config.PeerSet = newPeerSet() // Nicety initialization for tests
}
h := &handler{
nodeID: config.NodeID,
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
votepool: config.VotePool,
chain: config.Chain,
peers: config.PeerSet,
peersPerIP: make(map[string]int),
requiredBlocks: config.RequiredBlocks,
directBroadcast: config.DirectBroadcast,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
stopCh: make(chan struct{}),
nodeID: config.NodeID,
networkID: config.Network,
forkFilter: forkid.NewFilter(config.Chain),
disablePeerTxBroadcast: config.DisablePeerTxBroadcast,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
votepool: config.VotePool,
chain: config.Chain,
peers: config.PeerSet,
peersPerIP: make(map[string]int),
requiredBlocks: config.RequiredBlocks,
directBroadcast: config.DirectBroadcast,
enableBroadcastFeatures: config.EnableBroadcastFeatures,
directBroadcastList: config.DirectBroadcastList,
proxyedValidatorList: config.ProxyedValidatorList,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
handlerStartCh: make(chan struct{}),
stopCh: make(chan struct{}),
}
if config.Sync == ethconfig.FullSync {
// The database seems empty as the current block is the genesis. Yet the snap
Expand Down Expand Up @@ -328,13 +338,21 @@ func newHandler(config *handlerConfig) (*handler, error) {
// protoTracker tracks the number of active protocol handlers.
func (h *handler) protoTracker() {
defer h.wg.Done()
updateTicker := time.NewTicker(1 * time.Minute)
defer updateTicker.Stop()
var active int
for {
select {
case <-h.handlerStartCh:
active++
case <-h.handlerDoneCh:
active--
case <-updateTicker.C:
if h.enableBroadcastFeatures {
// TODO(galaio): add onchain validator p2p node list later, it will enable the direct broadcast + no tx broadcast feature
// here check & enable peer broadcast features periodically, and it's a simple way to handle the peer change and the list change scenarios.
h.peers.enablePeerFeatures(nil, h.directBroadcastList, nil, h.proxyedValidatorList)
}
case <-h.quitSync:
// Wait for all active handlers to finish.
for ; active > 0; active-- {
Expand Down Expand Up @@ -770,6 +788,19 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
peer.AsyncSendNewBlock(block, td)
}

if h.needMoreDirectBroadcastPeers(block) {
var morePeers []*ethPeer
for i := len(transfer); i < len(peers); i++ {
if peers[i].EnableDirectBroadcast.Load() {
log.Debug("add extra direct broadcast peer", "peer", peers[i].ID())
morePeers = append(morePeers, peers[i])
}
}
for _, peer := range morePeers {
peer.AsyncSendNewBlock(block, td)
}
}

log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
Expand All @@ -782,6 +813,24 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
}
}

// needMoreDirectBroadcastPeers checks if the block should be broadcast to all direct peers
// if the block is mined by self or received from proxyed validator, just broadcast to all direct peers
// if not, just gossip it.
func (h *handler) needMoreDirectBroadcastPeers(block *types.Block) bool {
if !h.enableBroadcastFeatures {
return false
}
parlia, ok := h.chain.Engine().(*parlia.Parlia)
if !ok {
return false
}
if parlia.ConsensusAddress() == block.Coinbase() {
return true
}

return h.peers.existProxyedValidator(block.Coinbase(), h.proxyedValidatorList)
}

// BroadcastTransactions will propagate a batch of transactions
// - To a square root of all peers for non-blob transactions
// - And, separately, as announcements to all peers which are not known to
Expand Down
55 changes: 55 additions & 0 deletions eth/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/big"
"slices"
"sync"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/protocols/trust"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
)

Expand Down Expand Up @@ -74,6 +76,8 @@ type peerSet struct {
peers map[string]*ethPeer // Peers connected on the `eth` protocol
snapPeers int // Number of `snap` compatible peers for connection prioritization

consensusAddressMap map[common.Address][]string

snapWait map[string]chan *snap.Peer // Peers connected on `eth` waiting for their snap extension
snapPend map[string]*snap.Peer // Peers connected on the `snap` protocol, but not yet on `eth`

Expand Down Expand Up @@ -433,6 +437,53 @@ func (ps *peerSet) peer(id string) *ethPeer {
return ps.peers[id]
}

// enablePeerFeatures enables the given features for the given peers.
func (ps *peerSet) enablePeerFeatures(validatorMap map[common.Address][]string, directList []string, noTxList []string, proxyedList []string) {
ps.lock.Lock()
defer ps.lock.Unlock()

ps.consensusAddressMap = validatorMap
var valNodeIDs []string
for _, nodeIDs := range validatorMap {
valNodeIDs = append(valNodeIDs, nodeIDs...)
}
for _, peer := range ps.peers {
nodeID := peer.ID()
if slices.Contains(directList, nodeID) || slices.Contains(valNodeIDs, nodeID) {
log.Debug("enable direct broadcast feature for", "peer", nodeID)
peer.EnableDirectBroadcast.Store(true)
}
// if the peer is in the noTxList and not in the proxyedList, enable the no tx broadcast feature
// the node also need to forward tx to the proxyedList
if slices.Contains(noTxList, nodeID) && !slices.Contains(proxyedList, nodeID) {
log.Debug("enable no tx broadcast feature for", "peer", nodeID)
peer.EnableNoTxBroadcast.Store(true)
}
}
log.Info("enable peer features", "total", len(ps.peers), "directList", len(directList), "noTxList", len(noTxList), "proxyedList", len(proxyedList))
}

// existProxyedValidator checks if the given address is a connected proxyed validator.
func (ps *peerSet) existProxyedValidator(address common.Address, proxyedList []string) bool {
ps.lock.RLock()
defer ps.lock.RUnlock()

if ps.consensusAddressMap == nil {
return false
}

peerIDs := ps.consensusAddressMap[address]
for _, peerID := range peerIDs {
if ps.peers[peerID] == nil {
continue
}
if slices.Contains(proxyedList, peerID) {
return true
}
}
return false
}

// headPeers retrieves a specified number list of peers.
func (ps *peerSet) headPeers(num uint) []*ethPeer {
ps.lock.RLock()
Expand Down Expand Up @@ -475,6 +526,10 @@ func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {

list := make([]*ethPeer, 0, len(ps.peers))
for _, p := range ps.peers {
if p.EnableNoTxBroadcast.Load() {
log.Trace("skip peer with no tx broadcast feature", "peer", p.ID())
continue
}
if !p.KnownTransaction(hash) {
list = append(list, p)
}
Expand Down
Loading