这篇文章主要讲解了“BlockKeeper的逻辑是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“BlockKeeper的逻辑是什么”吧!
首先我们先要在代码中定位到,比原到底是在什么时候来向对方节点发送请求的。
在前一篇讲的是如何建立连接并验证身份,那么发出数据请求的操作,一定在上次的代码之后。按照这个思路,我们在SyncManager
类中Switch
启动之后,找到了一个叫BlockKeeper
的类,相关的操作是在它里面完成的。
下面是老规矩,还是从启动开始,但是会更简化一些:
cmd/bytomd/main.go#L54
func main() { cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir())) cmd.Execute() }
cmd/bytomd/commands/run_node.go#L41
func runNode(cmd *cobra.Command, args []string) error { n := node.NewNode(config) if _, err := n.Start(); err != nil { // ... }
node/node.go#L169
func (n *Node) OnStart() error { // ... n.syncManager.Start() // ... }
netsync/handle.go#L141
func (sm *SyncManager) Start() { go sm.netStart() // ... go sm.syncer() }
注意sm.netStart()
,我们在一篇中建立连接并验证身份的操作,就是在它里面完成的。而这次的这个问题,是在下面的sm.syncer()
中完成的。
另外注意,由于这两个函数调用都使用了goroutine,所以它们是同时进行的。
sm.syncer()
的代码如下:
netsync/sync.go#L46
func (sm *SyncManager) syncer() { sm.fetcher.Start() defer sm.fetcher.Stop() // ... for { select { case <-sm.newPeerCh: log.Info("New peer connected.") // Make sure we have peers to select from, then sync if sm.sw.Peers().Size() < minDesiredPeerCount { break } go sm.synchronise() // .. } }
这里混入了一个叫fetcher
的奇怪的东西,名字看起来好像是专门去抓取数据的,我们要找的是它吗?
可惜不是,fetcher
的作用是从多个peer那里拿到了区块数据之后,对数据进行整理,把有用的放到本地链上。我们在以后会研究它,所以这里不展开讨论。
接着是一个for
循环,当发现通道newPeerCh
有了新数据(也就是有了新的节点连接上了),会判断一下当前自己连着的节点是否够多(大于等于minDesiredPeerCount
,值为5
),够多的话,就会进入sm.synchronise()
,进行数据同步。
这里为什么要多等几个节点,而不是一连上就马上同步呢?我想这是希望有更多选择的机会,找到一个数据够多的节点。
sm.synchronise()
还是属于SyncManager
的方法。在真正调用到BlockKeeper
的方法之前,它还做了一些比如清理已经断开的peer,找到最适合同步数据的peer等。其中“清理peer”的工作涉及到不同的对象持有的peer集合间的同步,略有些麻烦,但对当前问题帮助不大,所以我打算把它们放在以后的某个问题中回答(比如“当一个节点断开了,比原会有什么样的处理”),这里就先省略。
sm.synchronise()
代码如下:
netsync/sync.go#L77
func (sm *SyncManager) synchronise() { log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List()) // ... peer, bestHeight := sm.peers.BestPeer() // ... if bestHeight > sm.chain.BestBlockHeight() { // ... sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight) } }
可以看到,首先是从众多的peers中,找到最合适的那个。什么叫Best呢?看一下BestPeer()
的定义:
netsync/peer.go#L266
func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) { // ... for _, p := range ps.peers { if bestPeer == nil || p.height > bestHeight { bestPeer, bestHeight = p.swPeer, p.height } } return bestPeer, bestHeight }
其实就是持有区块链数据最长的那个。
找到了BestPeer之后,就调用sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
方法,从这里,正式进入BlockKeeper
-- 也就是本文的主角 -- 的世界。
blockKeeper.BlockRequestWorker
的逻辑比较复杂,它包含了:
根据自己持有的区块数据来计算需要同步的数据
向前面找到的最佳节点发送数据请求
拿到对方发过来的区块数据
对数据进行处理
广播新状态
处理各种出错情况,等等
由于本文中只关注“发送请求”,所以一些与之关系不大的逻辑我会忽略掉,留待以后再讲。
在“发送请求”这里,实际也包含了两种情形,一种简单的,一种复杂的:
简单的:假设不存在分叉,则直接检查本地高度最高的区块,然后请求下一个区块
复杂的:考虑分叉的情况,则当前本地的区块可能就存在分叉,那么到底应该请求哪个区块,就需要慎重考虑
由于第2种情况对于本文来说过于复杂(因为需要深刻理解比原链中分叉的处理逻辑),所以在本文中将把问题简化,只考虑第1种。而分叉的处理,将放在以后讲解。
下面是把blockKeeper.BlockRequestWorker
中的代码简化成了只包含第1种情况:
netsync/block_keeper.go#L72
func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error { num := bk.chain.BestBlockHeight() + 1 reqNum := uint64(0) reqNum = num // ... bkPeer, ok := bk.peers.Peer(peerID) swPeer := bkPeer.getPeer() // ... block, err := bk.BlockRequest(peerID, reqNum) // ... }
在这种情况下,我们可以认为bk.chain.BestBlockHeight()
中的Best
,指的是本地持有的不带分叉的区块链高度最高的那个。(需要提醒的是,如果存在分叉情况,则Best
不一定是高度最高的那个)
那么我们就可以直接向最佳peer请求下一个高度的区块,它是通过bk.BlockRequest(peerID, reqNum)
实现的:
netsync/block_keeper.go#L152
func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) { var block *types.Block if err := bk.blockRequest(peerID, height); err != nil { return nil, errReqBlock } // ... for { select { case pendingResponse := <-bk.pendingProcessCh: block = pendingResponse.block // ... return block, nil // ... } } }
在上面简化后的代码中,主要分成了两个部分。一个是发送请求bk.blockRequest(peerID, height)
,这是本文的重点;它下面的for-select
部分,已经是在等待并处理对方节点的返回数据了,这部分我们今天先略过不讲。
bk.blockRequest(peerID, height)
这个方法,从逻辑上又可以分成两部分:
构造出请求的信息
把信息发送给对方节点
bk.blockRequest(peerID, height)
经过一连串的方法调用之后,使用height
构造出了一个BlockRequestMessage
对象,代码如下:
netsync/block_keeper.go#L148
func (bk *blockKeeper) blockRequest(peerID string, height uint64) error { return bk.peers.requestBlockByHeight(peerID, height) }
netsync/peer.go#L332
func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error { peer, ok := ps.Peer(peerID) // ... return peer.requestBlockByHeight(height) }
netsync/peer.go#L73
func (p *peer) requestBlockByHeight(height uint64) error { msg := &BlockRequestMessage{Height: height} p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}) return nil }
到这里,终于构造出了所需要的BlockRequestMessage
,其实主要就是把height
告诉peer。
然后,通过Peer
的TrySend()
把该信息发出去。
在TrySend
中,主要是通过github.com/tendermint/go-wire
库将其序列化,再发送给对方。看起来应该是很简单的操作吧,先预个警,还是挺绕的。
当我们进入TrySend()
后:
p2p/peer.go#L242
func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } return p.mconn.TrySend(chID, msg) }
发现它把锅丢给了p.mconn.TrySend
方法,那么mconn
是什么?chID
又是什么?
mconn
是MConnection
的实例,它是从哪儿来的?它应该在之前的某个地方初始化了,否则我们没法直接调用它。所以我们先来找到它初始化的地方。
经过一番寻找,发现原来是在前一篇之后,即比原节点与另一个节点完成了身份验证之后,具体的位置在Switch
类启动的地方。
我们这次直接从Swtich
的OnStart
作为起点:
p2p/switch.go#L186
func (sw *Switch) OnStart() error { //... // Start listeners for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } return nil }
p2p/switch.go#L498
func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() // ... err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) // ... } }
p2p/switch.go#L645
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { // ... peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) // ... }
p2p/peer.go#L87
func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) }
p2p/peer.go#L91
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { conn := rawConn // ... if config.AuthEnc { // ... conn, err = MakeSecretConnection(conn, ourNodePrivKey) // ... } // Key and NodeInfo are set after Handshake p := &Peer{ outbound: outbound, conn: conn, config: config, Data: cmn.NewCMap(), } p.mconn = createMConnection(conn, p, reactorsByCh, chDescs, onPeerError, config.MConfig) p.BaseService = *cmn.NewBaseService(nil, "Peer", p) return p, nil }
终于找到了。上面方法中的MakeSecretConnection
就是与对方节点交换公钥并进行身份验证的地方,下面的p.mconn = createMConnection(...)
就是创建mconn
的地方。
继续进去:
p2p/peer.go#L292
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { if chID == PexChannel { return } else { cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) } } reactor.Receive(chID, p, msgBytes) } onError := func(r interface{}) { onPeerError(p, r) } return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) }
原来mconn
是MConnection
的实例,它是通过NewMConnectionWithConfig
创建的。
看了上面的代码,发现这个MConnectionWithConfig
与普通的net.Conn
并没有太大的区别,只不过是当收到了对方发来的数据后,会根据指定的chID
调用相应的Reactor
的Receive
方法来处理。所以它起到了将数据分发给Reactor
的作用。
为什么需要这样的分发操作呢?这是因为,在比原中,节点之间交换数据,有多种不同的方式:
一种是规定了详细的数据交互协议(比如有哪些信息类型,分别代表什么意思,什么情况下发哪个,如何应答等),在ProtocolReactor
中实现,它对应的chID
是BlockchainChannel
,值为byte(0x40)
另一种使用了与BitTorrent类似的文件共享协议,叫PEX,在PEXReactor
中实现,它对应的chID
是PexChannel
,值为byte(0x00)
所以节点之间发送信息的时候,需要知道对方发过来的数据对应的是哪一种方式,然后转交给相应的Reactor
去处理。
在比原中,前者是主要的方式,后者起到辅助作用。我们目前的文章中涉及到的都是前者,后者将在以后专门研究。
p.mconn.TrySend
当我们知道了p.mconn.TrySend
中的mconn
是什么,并且在什么时候初始化以后,下面就可以进入它的TrySend
方法了。
p2p/connection.go#L243
func (c *MConnection) TrySend(chID byte, msg interface{}) bool { // ... channel, ok := c.channelsIdx[chID] // ... ok = channel.trySendBytes(wire.BinaryBytes(msg)) if ok { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: default: } } return ok }
可以看到,它找到相应的channel后(在这里应该是ProtocolReactor
对应的channel),调用channel的trySendBytes
方法。在发送数据的时候,使用了github.com/tendermint/go-wire
库,将msg
序列化为二进制数组。
p2p/connection.go#L602
func (ch *Channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true default: return false } }
原来它是把要发送的数据,放到了该channel对应的sendQueue
中,交由别人来发送。具体是由谁来发送,我们马上要就找到它。
细心的同学会发现,Channel
除了trySendBytes
方法外,还有一个sendBytes
(在本文中没有用上):
p2p/connection.go#L589
func (ch *Channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true case <-time.After(defaultSendTimeout): return false } }
它们两个的区别是,前者尝试把待发送数据bytes
放入ch.sendQueue
时,如果能放进去,则返回true
,否则马上失败,返回false
,所以它是非阻塞的。而后者,如果放不进去(sendQueue
已满,那边还没处理完),则等待defaultSendTimeout
(值为10
秒),然后才会失败。另外,sendQueue
的容量默认为1
。
感谢各位的阅读,以上就是“BlockKeeper的逻辑是什么”的内容了,经过本文的学习后,相信大家对BlockKeeper的逻辑是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。