创建P2P server

func (n *Node) Start() error {...// Initialize the p2p server. This creates the node key and// discovery databases.n.serverConfig = n.config.P2Pn.serverConfig.PrivateKey = n.config.NodeKey()n.serverConfig.Name = n.config.NodeName()n.serverConfig.Logger = n.logif n.serverConfig.StaticNodes == nil {n.serverConfig.StaticNodes = n.config.StaticNodes()}if n.serverConfig.TrustedNodes == nil {n.serverConfig.TrustedNodes = n.config.TrustedNodes()}if n.serverConfig.NodeDatabase == "" {n.serverConfig.NodeDatabase = n.config.NodeDB()}running := &p2p.Server{Config: n.serverConfig}n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)....
}

代码首先做了一些检查工作:加锁、判断结点是否已经运行、检查datadir是否可以打开,然后初始化P2P server配置,最后用该配置创建了一个p2p.Server实例。首先初始化Node中的services字段,然后遍历serviceFuncs,也就是之前注册的所有Service的构造函数列表。在创建Service实例之前,先为每个Service创建一个ServiceContext,之前提到过,ServiceContext里存储的是从Node继承过来的一些信息。接着通过构造函数创建Service实例,然后加入到service这个map中。

创建Service

// Otherwise copy and specialize the P2P configurationservices := make(map[reflect.Type]Service)for _, constructor := range n.serviceFuncs {// Create a new context for the particular servicectx := &ServiceContext{config:         n.config,services:       make(map[reflect.Type]Service),EventMux:       n.eventmux,AccountManager: n.accman,}for kind, s := range services { // copy needed for threaded accessctx.services[kind] = s}// Construct and save the serviceservice, err := constructor(ctx)if err != nil {return err}kind := reflect.TypeOf(service)if _, exists := services[kind]; exists {return &DuplicateServiceError{Kind: kind}}services[kind] = service}

首先初始化Node中的services字段,然后遍历serviceFuncs,也就是之前注册的所有Service的构造函数列表。在创建Service实例之前,先为每个Service创建一个ServiceContext,之前提到过,ServiceContext里存储的是从Node继承过来的一些信息。接着通过构造函数创建Service实例,然后加入到service这个map中。

启动P2P server

// Gather the protocols and start the freshly assembled P2P server  for _, service := range services {  running.Protocols = append(running.Protocols, service.Protocols()...)  }  if err := running.Start(); err != nil {  return convertFileLockError(err)  }  

首先把所有Service支持的协议集合到一起,然后调用p2p.Server的Start()方法启动P2P server(代码位于p2p/server.go)。P2P server会绑定一个UDP端口和一个TCP端口,端口号是相同的(默认30303)。UDP端口主要用于结点发现,TCP端口主要用于业务数据传输,基于RLPx加密传输协议。所以具体来说,Start()方法做了以下几件事情:

  • 侦听UDP端口:用于结点发现

  • 发起UDP请求获取结点表:内部会启动goroutine来完成

  • 侦听TCP端口:用于业务数据传输,基于RLPx协议

  • 发起TCP请求连接到其他结点:也是启动goroutine完成

// p2p/server.go
// Servers can not be re-used after stopping.
func (srv *Server) Start() (err error) {srv.lock.Lock()defer srv.lock.Unlock()if srv.running {return errors.New("server already running")}srv.running = truesrv.log = srv.Config.Loggerif srv.log == nil {srv.log = log.New()}srv.log.Info("Starting P2P networking")// static fieldsif srv.PrivateKey == nil {return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")}if srv.newTransport == nil {srv.newTransport = newRLPX}if srv.Dialer == nil {srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}}srv.quit = make(chan struct{})srv.addpeer = make(chan *conn)srv.delpeer = make(chan peerDrop)srv.posthandshake = make(chan *conn)srv.addstatic = make(chan *discover.Node)srv.removestatic = make(chan *discover.Node)srv.peerOp = make(chan peerOpFunc)srv.peerOpDone = make(chan struct{})var (conn      *net.UDPConnsconn     *sharedUDPConnrealaddr  *net.UDPAddrunhandled chan discover.ReadPacket)if !srv.NoDiscovery || srv.DiscoveryV5 {addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)if err != nil {return err}conn, err = net.ListenUDP("udp", addr)if err != nil {return err}realaddr = conn.LocalAddr().(*net.UDPAddr)if srv.NAT != nil {if !realaddr.IP.IsLoopback() {go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")}// TODO: react to external IP changes over time.if ext, err := srv.NAT.ExternalIP(); err == nil {realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}}}}if !srv.NoDiscovery && srv.DiscoveryV5 {unhandled = make(chan discover.ReadPacket, 100)sconn = &sharedUDPConn{conn, unhandled}}// node tableif !srv.NoDiscovery {cfg := discover.Config{PrivateKey:   srv.PrivateKey,AnnounceAddr: realaddr,NodeDBPath:   srv.NodeDatabase,NetRestrict:  srv.NetRestrict,Bootnodes:    srv.BootstrapNodes,Unhandled:    unhandled,}ntab, err := discover.ListenUDP(conn, cfg)if err != nil {return err}srv.ntab = ntab}if srv.DiscoveryV5 {var (ntab *discv5.Networkerr  error)if sconn != nil {ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)} else {ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)}if err != nil {return err}if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {return err}srv.DiscV5 = ntab}dynPeers := srv.maxDialedConns()dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)// handshakesrv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}for _, p := range srv.Protocols {srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())}// listen/dialif srv.ListenAddr != "" {if err := srv.startListening(); err != nil {return err}}if srv.NoDial && srv.ListenAddr == "" {srv.log.Warn("P2P server will be useless, neither dialing nor listening")}srv.loopWG.Add(1)go srv.run(dialer)srv.running = truereturn nil
}

启动Service

// Start each of the servicesstarted := []reflect.Type{}for kind, service := range services {// Start the next service, stopping all previous upon failureif err := service.Start(running); err != nil {for _, kind := range started {services[kind].Stop()}running.Stop()return err}// Mark the service started for potential cleanupstarted = append(started, kind)}

主要就是依次调用每个Service的Start()方法,然后把启动的Service的类型存储到started表中。之前提到 Ethereum 作为一个service,被Node注册进去。Node start的时候会启动其注册的所有服务,Ethereum service也是一样。

ethereum service

ethereum service的初始化

eth/backend.go
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {if config.SyncMode == downloader.LightSync {return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")}if !config.SyncMode.IsValid() {return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)}chainDb, err := CreateDB(ctx, config, "chaindata")if err != nil {return nil, err}chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {return nil, genesisErr}log.Info("Initialised chain configuration", "config", chainConfig)eth := &Ethereum{config:         config,chainDb:        chainDb,chainConfig:    chainConfig,eventMux:       ctx.EventMux,accountManager: ctx.AccountManager,engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),shutdownChan:   make(chan bool),networkId:      config.NetworkId,gasPrice:       config.GasPrice,etherbase:      config.Etherbase,bloomRequests:  make(chan chan *bloombits.Retrieval),bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),}log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)if !config.SkipBcVersionCheck {bcVersion := rawdb.ReadDatabaseVersion(chainDb)if bcVersion != core.BlockChainVersion && bcVersion != 0 {return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion)}rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)}var (vmConfig    = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording}cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout})eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig)if err != nil {return nil, err}// Rewind the chain in case of an incompatible config upgrade.if compat, ok := genesisErr.(*params.ConfigCompatError); ok {log.Warn("Rewinding chain to upgrade configuration", "err", compat)eth.blockchain.SetHead(compat.RewindTo)rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig)}eth.bloomIndexer.Start(eth.blockchain)if config.TxPool.Journal != "" {config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)}eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {return nil, err}eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)eth.miner.SetExtra(makeExtraData(config.ExtraData))eth.APIBackend = &EthAPIBackend{eth, nil}gpoParams := config.GPOif gpoParams.Default == nil {gpoParams.Default = config.GasPrice}eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)return eth, nil
}
  • 如果config.SyncMode 是 downloader.LightSync,走的是les/backend.go的初始化方法。
  • chainDb, err := CreateDB(ctx, config, “chaindata”)打开leveldb,leveldb是eth存储数据库。
  • stopDbUpgrade := upgradeDeduplicateData(chainDb) 检查chainDb版本,如果需要的话,启动后台进程进行升级。
  • chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)装载创世区块。 根据节点条件判断是从数据库里面读取,还是从默认配置文件读取,还是从自定义配置文件读取,或者是从代码里面获取默认值。并返回区块链的config和创世块的hash。
  • 装载Etherum struct的各个成员。eventMux和accountManager 是Node 启动 eth service的时候传入的。eventMux可以认为是一个全局的事件多路复用器,accountManager认为是一个全局的账户管理器。engine创建共识引擎。etherbase 配置此Etherum的主账号地址。初始化bloomRequests 通道和bloom过滤器。
  • 判断客户端版本号和数据库版本号是否一致
  • eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) 初始化eth的blockchain,也就是eth的区块链
  • eth.blockchain.SetHead(compat.RewindTo) 根据创始区块设置区块头
  • eth.bloomIndexer.Start(eth.blockchain)启动bloomIndexer
  • eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) 初始化eth 区块链的交易池,存储本地生产的和P2P网络同步过来的交易。
  • eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb)初始化以太坊协议管理器,用于区块链P2P通讯
  • miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) 初始化矿工
  • eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) 创建预言最新gasprice的预言机

ethereum service 启动

func (s *Ethereum) Start(srvr *p2p.Server) error {// Start the bloom bits servicing goroutiness.startBloomHandlers()// Start the RPC services.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())// Figure out a max peers count based on the server limitsmaxPeers := srvr.MaxPeersif s.config.LightServ > 0 {if s.config.LightPeers >= srvr.MaxPeers {return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)}maxPeers -= s.config.LightPeers}// Start the networking layer and the light server if requesteds.protocolManager.Start(maxPeers)if s.lesServer != nil {s.lesServer.Start(srvr)}return nil
}

首先启动bloom过滤器 eth 的net 相关Api 加入RPC 服务。
s.protocolManager.Start(maxPeers) 设置最大同步节点数,并启动eth P2P通讯。
如果ethereum service 出问题了才会启动lesServer。

ProtocolManager 以太坊P2P通讯协议管理

ethereum service的初始化 也会调用 NewProtocolManager

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
...if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {return nil, err}....
}

ProtocolManager 的初始化方法

func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {// Create the protocol manager with the base fieldsmanager := &ProtocolManager{networkId:   networkId,eventMux:    mux,txpool:      txpool,blockchain:  blockchain,chainconfig: config,peers:       newPeerSet(),newPeerCh:   make(chan *peer),noMorePeers: make(chan struct{}),txsyncCh:    make(chan *txsync),quitSync:    make(chan struct{}),}// Figure out whether to allow fast sync or notif mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {log.Warn("Blockchain not empty, fast sync disabled")mode = downloader.FullSync}if mode == downloader.FastSync {manager.fastSync = uint32(1)}// Initiate a sub-protocol for every implemented version we can handlemanager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))for i, version := range ProtocolVersions {// Skip protocol version if incompatible with the mode of operationif mode == downloader.FastSync && version < eth63 {continue}// Compatible; initialise the sub-protocolversion := version // Closure for the runmanager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{Name:    ProtocolName,Version: version,Length:  ProtocolLengths[i],Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {peer := manager.newPeer(int(version), p, rw)select {case manager.newPeerCh <- peer:manager.wg.Add(1)defer manager.wg.Done()return manager.handle(peer)case <-manager.quitSync:return p2p.DiscQuitting}},NodeInfo: func() interface{} {return manager.NodeInfo()},PeerInfo: func(id discover.NodeID) interface{} {if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {return p.Info()}return nil},})}if len(manager.SubProtocols) == 0 {return nil, errIncompatibleConfig}// Construct the different synchronisation mechanismsmanager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)validator := func(header *types.Header) error {return engine.VerifyHeader(blockchain, header, true)}heighter := func() uint64 {return blockchain.CurrentBlock().NumberU64()}inserter := func(blocks types.Blocks) (int, error) {// If fast sync is running, deny importing weird blocksif atomic.LoadUint32(&manager.fastSync) == 1 {log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())return 0, nil}atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher importreturn manager.blockchain.InsertChain(blocks)}manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)return manager, nil
}
  • peers 为以太坊临近的同步网络节点,newPeerCh、noMorePeers、txsyncCh、quitSync对应同步的通知
  • manager.SubProtocols 创建以太坊 P2P server 的 通讯协议,通常只有一个值。manager.SubProtocols,在Node start的时候传给以太坊 P2P server并同时start P2P server。协议里面三个函数指针(Run、NodeInfo、PeerInfo)非常重要,后面会用到。
  • manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
    创建了一个下载器,从远程网络节点中获取hashes和blocks。
  • manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。初始化传入的几个参数 都是用于处理同步区块链数据的函数指针

Ethereum service 启动的时候会同时启动 ProtocolManager。

ProtocolManager的start()方法:

func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers

// broadcast transactions
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()// start sync handlers
go pm.syncer()
go pm.txsyncLoop()

}

  • 创建一个新交易的订阅通道,并启动交易广播的goroutine
  • 创建一个挖坑的订阅通道,并启动
  • pm.syncer() 启动同步goroutine,定时的和网络其他节点同步,并处理网络节点的相关通知
  • pm.txsyncLoop() 启动交易同步goroutine,把新的交易均匀的同步给网路节点

ProtocolManager主动向网络节点广播

ProtocolManager Start()方法里面的4个goroutine都是处理ProtocolManager向以太坊网络节点进行广播的。

  • pm.txBroadcastLoop()方法
func (pm *ProtocolManager) txBroadcastLoop() {for {select {case event := <-pm.txsCh:pm.BroadcastTxs(event.Txs)// Err() channel will be closed when unsubscribing.case <-pm.txsSub.Err():return}}
}

core/tx_pool.go 产生新的交易的时候会send self.txCh,这时候会激活 self.BroadcastTx(event.Tx.Hash(), event.Tx)

func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {// Broadcast transaction to a batch of peers not knowing about itpeers := pm.peers.PeersWithoutTx(hash)//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]for _, peer := range peers {peer.SendTransactions(types.Transactions{tx})}log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}

向缓存的没有这个交易hash的网络节点广播此次交易。

  • pm.minedBroadcastLoop()方法
// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {// automatically stops if unsubscribefor obj := range self.minedBlockSub.Chan() {switch ev := obj.Data.(type) {case core.NewMinedBlockEvent:self.BroadcastBlock(ev.Block, true)  // First propagate block to peersself.BroadcastBlock(ev.Block, false) // Only then announce to the rest}}
}

收到 miner.go 里面 NewMinedBlockEvent 挖到新区块的事件通知,激活self.BroadcastBlock(ev.Block, true)

func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {hash := block.Hash()peers := pm.peers.PeersWithoutBlock(hash)// If propagation is requested, send to a subset of the peerif propagate {// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)var td *big.Intif parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))} else {log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)return}// Send the block to a subset of our peerstransfer := peers[:int(math.Sqrt(float64(len(peers))))]for _, peer := range transfer {peer.SendNewBlock(block, td)}log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))return}// Otherwise if the block is indeed in out own chain, announce itif pm.blockchain.HasBlock(hash, block.NumberU64()) {for _, peer := range peers {peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})}log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))}
}

如果propagate为true 向网络节点广播整个挖到的block,为false 只广播挖到的区块的hash值和number值。广播的区块还包括这个区块打包的所有交易。

  • pm.syncer() 方法
func (pm *ProtocolManager) syncer() {// Start and ensure cleanup of sync mechanismspm.fetcher.Start()defer pm.fetcher.Stop()defer pm.downloader.Terminate()// Wait for different events to fire synchronisation operationsforceSync := time.NewTicker(forceSyncCycle)defer forceSync.Stop()for {select {case <-pm.newPeerCh:// Make sure we have peers to select from, then syncif pm.peers.Len() < minDesiredPeerCount {break}go pm.synchronise(pm.peers.BestPeer())case <-forceSync.C:// Force a sync even if not enough peers are presentgo pm.synchronise(pm.peers.BestPeer())case <-pm.noMorePeers:return}}
}

pm.fetcher.Start()启动 fetcher,辅助同步区块数据

当P2P server执行 ProtocolManager 的p2p.Protocol 的Run指针的时候会send pm.newPeerCh,这时候选择最优的网络节点(TD 总难度最大的)启动pm.synchronise(pm.peers.BestPeer()) goroutine。

  • pm.txsyncLoop()方法
func (pm *ProtocolManager) txsyncLoop() {var (pending = make(map[discover.NodeID]*txsync)sending = false               // whether a send is activepack    = new(txsync)         // the pack that is being sentdone    = make(chan error, 1) // result of the send)// send starts a sending a pack of transactions from the sync.send := func(s *txsync) {// Fill pack with transactions up to the target size.size := common.StorageSize(0)pack.p = s.ppack.txs = pack.txs[:0]for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {pack.txs = append(pack.txs, s.txs[i])size += s.txs[i].Size()}// Remove the transactions that will be sent.s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]if len(s.txs) == 0 {delete(pending, s.p.ID())}// Send the pack in the background.s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)sending = truego func() { done <- pack.p.SendTransactions(pack.txs) }()}// pick chooses the next pending sync.pick := func() *txsync {if len(pending) == 0 {return nil}n := rand.Intn(len(pending)) + 1for _, s := range pending {if n--; n == 0 {return s}}return nil}for {select {case s := <-pm.txsyncCh:pending[s.p.ID()] = sif !sending {send(s)}case err := <-done:sending = false// Stop tracking peers that cause send failures.if err != nil {pack.p.Log().Debug("Transaction send failed", "err", err)delete(pending, pack.p.ID())}// Schedule the next send.if s := pick(); s != nil {send(s)}case <-pm.quitSync:return}}
}

当从网络节点同步过来最新的交易数据后,本地也会把新同步下来的交易数据广播给网络中的其他节点。这四个goroutine 基本上就在不停的做广播区块、广播交易,同步到区块、同步到交易,再广播区块、广播交易。

以太坊源码解析 - 以太坊P2P协议相关推荐

  1. 以太坊源码解析 - RLP(理论)

    RLP(Recursive Length Prefix),叫递归长度前缀编码,它是以太坊序列化所采用的编码方式.RLP主要用于以太坊中数据的网络传输和持久化存储. 定义 RLP实际只给以下两种类型数据 ...

  2. 【以太坊源码】以太坊黄皮书参数

    由于参数比较多,直接贴出来比较杂乱,我将其整理在百度脑图,按照黄皮书章节顺序进行分类,并调整了部分常数顺序 需要查阅特定的参数,可以使用ctrl+F直接查询,由于百度脑图不支持下标(或者我不知道),所 ...

  3. 以太坊Geth 共识算法源码解析

    共识算法 目前以太坊中有两个公式算法的实现,分别为clique和ethash.其中clique是PoA共识的实现,ethash是PoW共识的实现,其相应的代码位于go-ethereum/consens ...

  4. 以太坊源码阅读2——RLP编码

    以太坊源码阅读2--RLP编码 RLP介绍 目前网上的资料都是RLP(Recursive Length prefix),叫递归长度前缀编码,但目前源码的doc.go的第20行里面的注释写的是 The ...

  5. 3 v4 中心节点固定_死磕以太坊源码分析之p2p节点发现

    死磕以太坊源码分析之p2p节点发现 在阅读节点发现源码之前必须要理解kadmilia算法,可以参考:KAD算法详解. 节点发现概述 节点发现,使本地节点得知其他节点的信息,进而加入到p2p网络中. 以 ...

  6. go-ethereum-code-analysis 以太坊源码分析

    分析go-ethereum的过程,我希望从依赖比较少的底层技术组件开始,慢慢深入到核心逻辑. 目录 go-ethereum代码阅读环境搭建 以太坊黄皮书 符号索引 rlp源码解析 trie源码分析 e ...

  7. 以太坊源码学习(一) 正本清源

    以太坊源码学习(一)正本清源 背景 geth源码一直在不断增加,优化,发展到现在已经非常庞大,第一次看geth源码,会有不小的难度.虽然如此,还是可以从geth仓库的第一个commit开始,这时的代码 ...

  8. 以太坊源码分析(2)——以太坊APP对象

    前言 从这一节开始,我将开始以太坊代码全覆盖讲解,讲解的流程是: 以太坊程序入口 基本框架 以太坊协议 发送一笔交易后发生了什么 启动挖矿 以太坊共识 p2p 网络 阅读本系列文章,将默认读者具备一定 ...

  9. 以太坊源码分析-交易

    以太坊源码分析-交易 机理 先说一点区块链转账的基本概念和流程 用户输入转账的地址和转入的地址和转出的金额 系统通过转出的地址的私钥对转账信息进行签名(用于证明这 笔交易确实有本人进行) 系统对交易信 ...

最新文章

  1. demo17 clean-webpack-plugin (清除模式)
  2. HUD 1043 Eight 八数码问题 A*算法 1667 The Rotation Game IDA*算法
  3. 基于Apache Thrift的公路涵洞数据交互实现原理
  4. idea启动多个tomcat失败
  5. Unsafe类方法详解
  6. Bootstrap4+MySQL前后端综合实训-Day07-PM【用户信息管理页面——功能展示(分页显示数据、添加用户、批量删除用户、编辑用户信息)、servlet项目代码整理汇总】
  7. Redis数据库(二)——Redis高可用、持久化及性能管理
  8. java akka_用于大型事件处理的Akka Java
  9. Linux NULL定义
  10. 同一台电脑安装python2python3
  11. java获取tomcat路径
  12. 微软 Edge bug 导致黑客窃取用户在任意站点的机密信息,颁发2万美元奖金
  13. C++ 智能指针shared_ptr、weak_ptr的简单实现
  14. 计算机表格外边框颜色怎么设置,#表格外部框线设置颜色#如何把excel里所有边框颜色改变...
  15. Hibernate【映射】续篇
  16. 绘画教程:日式温泉场景怎么画?露天浴场的正确画法!
  17. AD936x 系列快速入口
  18. Android修改默认打开WLAN随时都可扫描
  19. 操作系统 计算机操作系统教程笔记
  20. Python数据可视化(处理天气数据)

热门文章

  1. python(十四)--Django学习快速入门
  2. ssh登录、配置免密登录
  3. centos 服务器性能测试工具UnixBench
  4. Captcha验证码使用,算术,中文,数字
  5. 【常见电路】稳压电路以及元器件的选型
  6. 百度智能云:AI工业化时代的方法论
  7. Handlebar嵌套遍历数据
  8. 领域建模应对软件复杂性初体验
  9. Leetcode100——判断两树相同——c++版本循序渐进学习
  10. AutoLeaders控制组——C语言指针和字符串学习笔记