阅读hyperledger fabric源码学习Go并发编程

Hyperledger (或 Hyperledger项目)是一个 开放源 的”区块链” 和相关工具的总括项目,[1] 由 Linux基金会在2015年12月发起该项目,[2] 以支持基于区块链技术的去中心化账本的写作开发。

源码地址:https://github.com/hyperledger/fabric

Go语言是为并发而生的语言,Go语言是为数不多的在语言层面实现并发的语言;也正是Go语言的并发特性,吸引了全球无数的开发者。那么我们看看hyperledger fabric是怎么实现Go并发编程的呢?

并发(concurrency)和并行(parallellism)

并发(concurrency):两个或两个以上的任务在一段时间内被执行。我们不必care这些任务在某一个时间点是否是同时执行,可能同时执行,也可能不是,我们只关心在一段时间内,哪怕是很短的时间(一秒或者两秒)是否执行解决了两个或两个以上任务。

并行(parallellism):两个或两个以上的任务在同一时刻被同时执行。

并发说的是逻辑上的概念,而并行,强调的是物理运行状态。并发“包含”并行。

Go的CSP并发模型

Go实现了两种并发形式。第一种是大家普遍认知的:多线程共享内存。其实就是Java或者C++等语言中的多线程开发。另外一种是Go语言特有的,也是Go语言推荐的:CSP(communicating sequential processes)并发模型。

Go的CSP并发模型,是通过goroutinechannel来实现的。

  • goroutine 是Go语言中并发的执行单位。有点抽象,其实就是和传统概念上的”线程“类似,可以理解为”线程“。
  • channel是Go语言中各个并发结构体(goroutine)之前的通信机制。 通俗的讲,就是各个goroutine之间通信的”管道“,有点类似于Linux中的管道。

在Frabric中fabric/gossip/gossip/channel/channel.go 就使用了CSP并发模型,

/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package channel

import (
  "bytes"
  "fmt"
  "strconv"
  "sync"
  "sync/atomic"
  "time"

  common_utils "github.com/hyperledger/fabric/common/util"
  "github.com/hyperledger/fabric/gossip/api"
  "github.com/hyperledger/fabric/gossip/comm"
  "github.com/hyperledger/fabric/gossip/common"
  "github.com/hyperledger/fabric/gossip/discovery"
  "github.com/hyperledger/fabric/gossip/election"
  "github.com/hyperledger/fabric/gossip/filter"
  "github.com/hyperledger/fabric/gossip/gossip/msgstore"
  "github.com/hyperledger/fabric/gossip/gossip/pull"
  "github.com/hyperledger/fabric/gossip/util"
  proto "github.com/hyperledger/fabric/protos/gossip"
  "github.com/op/go-logging"
  "github.com/pkg/errors"
)

// Config is a configuration item
// of the channel store
type Config struct {
  ID                          string
  PublishStateInfoInterval    time.Duration
  MaxBlockCountToStore        int
  PullPeerNum                 int
  PullInterval                time.Duration
  RequestStateInfoInterval    time.Duration
  BlockExpirationInterval     time.Duration
  StateInfoCacheSweepInterval time.Duration
}

// GossipChannel defines an object that deals with all channel-related messages
type GossipChannel interface {

  // GetPeers returns a list of peers with metadata as published by them
  GetPeers() []discovery.NetworkMember

  // PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
  // only peer identities that match the given criteria
  PeerFilter(api.SubChannelSelectionCriteria) filter.RoutingFilter

  // IsMemberInChan checks whether the given member is eligible to be in the channel
  IsMemberInChan(member discovery.NetworkMember) bool

  // UpdateStateInfo updates this channel's StateInfo message
  // that is periodically published
  UpdateStateInfo(msg *proto.SignedGossipMessage)

  // IsOrgInChannel returns whether the given organization is in the channel
  IsOrgInChannel(membersOrg api.OrgIdentityType) bool

  // EligibleForChannel returns whether the given member should get blocks
  // for this channel
  EligibleForChannel(member discovery.NetworkMember) bool

  // HandleMessage processes a message sent by a remote peer
  HandleMessage(proto.ReceivedMessage)

  // AddToMsgStore adds a given GossipMessage to the message store
  AddToMsgStore(msg *proto.SignedGossipMessage)

  // ConfigureChannel (re)configures the list of organizations
  // that are eligible to be in the channel
  ConfigureChannel(joinMsg api.JoinChannelMessage)

  // LeaveChannel makes the peer leave the channel
  LeaveChannel()

  // Stop stops the channel's activity
  Stop()
}

// Adapter enables the gossipChannel
// to communicate with gossipServiceImpl.
type Adapter interface {
  // GetConf returns the configuration that this GossipChannel will posses
  GetConf() Config

  // Gossip gossips a message in the channel
  Gossip(message *proto.SignedGossipMessage)

  // Forward sends a message to the next hops
  Forward(message proto.ReceivedMessage)

  // DeMultiplex de-multiplexes an item to subscribers
  DeMultiplex(interface{})

  // GetMembership returns the known alive peers and their information
  GetMembership() []discovery.NetworkMember

  // Lookup returns a network member, or nil if not found
  Lookup(PKIID common.PKIidType) *discovery.NetworkMember

  // Send sends a message to a list of peers
  Send(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer)

  // ValidateStateInfoMessage returns an error if a message
  // hasn't been signed correctly, nil otherwise.
  ValidateStateInfoMessage(message *proto.SignedGossipMessage) error

  // GetOrgOfPeer returns the organization ID of a given peer PKI-ID
  GetOrgOfPeer(pkiID common.PKIidType) api.OrgIdentityType

  // GetIdentityByPKIID returns an identity of a peer with a certain
  // pkiID, or nil if not found
  GetIdentityByPKIID(pkiID common.PKIidType) api.PeerIdentityType
}

type gossipChannel struct {
  Adapter
  sync.RWMutex
  shouldGossipStateInfo     int32
  mcs                       api.MessageCryptoService
  pkiID                     common.PKIidType
  selfOrg                   api.OrgIdentityType
  stopChan                  chan struct{}
  stateInfoMsg              *proto.SignedGossipMessage
  orgs                      []api.OrgIdentityType
  joinMsg                   api.JoinChannelMessage
  blockMsgStore             msgstore.MessageStore
  stateInfoMsgStore         *stateInfoCache
  leaderMsgStore            msgstore.MessageStore
  chainID                   common.ChainID
  blocksPuller              pull.Mediator
  logger                    *logging.Logger
  stateInfoPublishScheduler *time.Ticker
  stateInfoRequestScheduler *time.Ticker
  memFilter                 *membershipFilter
  ledgerHeight              uint64
  leftChannel               int32
}

type membershipFilter struct {
  adapter Adapter
  *gossipChannel
}

// GetMembership returns the known alive peers and their information
func (mf *membershipFilter) GetMembership() []discovery.NetworkMember {
  if mf.hasLeftChannel() {
    return nil
  }
  var members []discovery.NetworkMember
  for _, mem := range mf.adapter.GetMembership() {
    if mf.eligibleForChannelAndSameOrg(mem) {
      members = append(members, mem)
    }
  }
  return members
}

// NewGossipChannel creates a new GossipChannel
func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
  chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
  gc := &gossipChannel{
    selfOrg:                   org,
    pkiID:                     pkiID,
    mcs:                       mcs,
    Adapter:                   adapter,
    logger:                    util.GetLogger(util.LoggingChannelModule, adapter.GetConf().ID),
    stopChan:                  make(chan struct{}, 1),
    shouldGossipStateInfo:     int32(0),
    stateInfoPublishScheduler: time.NewTicker(adapter.GetConf().PublishStateInfoInterval),
    stateInfoRequestScheduler: time.NewTicker(adapter.GetConf().RequestStateInfoInterval),
    orgs:    []api.OrgIdentityType{},
    chainID: chainID,
  }

  gc.memFilter = &membershipFilter{adapter: gc.Adapter, gossipChannel: gc}

  comparator := proto.NewGossipMessageComparator(adapter.GetConf().MaxBlockCountToStore)

  gc.blocksPuller = gc.createBlockPuller()

  seqNumFromMsg := func(m interface{}) string {
    return fmt.Sprintf("%d", m.(*proto.SignedGossipMessage).GetDataMsg().Payload.SeqNum)
  }
  gc.blockMsgStore = msgstore.NewMessageStoreExpirable(comparator, func(m interface{}) {
    gc.blocksPuller.Remove(seqNumFromMsg(m))
  }, gc.GetConf().BlockExpirationInterval, nil, nil, func(m interface{}) {
    gc.blocksPuller.Remove(seqNumFromMsg(m))
  })

  hashPeerExpiredInMembership := func(o interface{}) bool {
    pkiID := o.(*proto.SignedGossipMessage).GetStateInfo().PkiId
    return gc.Lookup(pkiID) == nil
  }
  verifyStateInfoMsg := func(msg *proto.SignedGossipMessage, orgs ...api.OrgIdentityType) bool {
    si := msg.GetStateInfo()
    // No point in verifying ourselves
    if bytes.Equal(gc.pkiID, si.PkiId) {
      return true
    }
    peerIdentity := adapter.GetIdentityByPKIID(si.PkiId)
    if len(peerIdentity) == 0 {
      gc.logger.Warning("Identity for peer", si.PkiId, "doesn't exist")
      return false
    }
    isOrgInChan := func(org api.OrgIdentityType) bool {
      if len(orgs) == 0 {
        if !gc.IsOrgInChannel(org) {
          return false
        }
      } else {
        found := false
        for _, chanMember := range orgs {
          if bytes.Equal(chanMember, org) {
            found = true
            break
          }
        }
        if !found {
          return false
        }
      }
      return true
    }

    org := gc.GetOrgOfPeer(si.PkiId)
    if !isOrgInChan(org) {
      gc.logger.Warning("peer", peerIdentity, "'s organization(", string(org), ") isn't in the channel", string(chainID))
      return false
    }
    if err := gc.mcs.VerifyByChannel(chainID, peerIdentity, msg.Signature, msg.Payload); err != nil {
      gc.logger.Warningf("Peer %v isn't eligible for channel %s : %+v", peerIdentity, string(chainID), errors.WithStack(err))
      return false
    }
    return true
  }
  gc.stateInfoMsgStore = newStateInfoCache(gc.GetConf().StateInfoCacheSweepInterval, hashPeerExpiredInMembership, verifyStateInfoMsg)

  ttl := election.GetMsgExpirationTimeout()
  pol := proto.NewGossipMessageComparator(0)

  gc.leaderMsgStore = msgstore.NewMessageStoreExpirable(pol, msgstore.Noop, ttl, nil, nil, nil)

  gc.ConfigureChannel(joinMsg)

  // Periodically publish state info
  go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
  // Periodically request state info
  go gc.periodicalInvocation(gc.requestStateInfo, gc.stateInfoRequestScheduler.C)
  return gc
}

// Stop stop the channel operations
func (gc *gossipChannel) Stop() {
  gc.stopChan <- struct{}{}
  gc.blocksPuller.Stop()
  gc.stateInfoPublishScheduler.Stop()
  gc.stateInfoRequestScheduler.Stop()
  gc.leaderMsgStore.Stop()
  gc.stateInfoMsgStore.Stop()
  gc.blockMsgStore.Stop()
}

func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
  for {
    select {
    case <-c:
      fn()
    case <-gc.stopChan:
      gc.stopChan <- struct{}{}
      return
    }
  }
}

// LeaveChannel makes the peer leave the channel
func (gc *gossipChannel) LeaveChannel() {
  atomic.StoreInt32(&gc.leftChannel, 1)
}

func (gc *gossipChannel) hasLeftChannel() bool {
  return atomic.LoadInt32(&gc.leftChannel) == 1
}

// GetPeers returns a list of peers with metadata as published by them
func (gc *gossipChannel) GetPeers() []discovery.NetworkMember {
  members := []discovery.NetworkMember{}
  if gc.hasLeftChannel() {
    return members
  }

  for _, member := range gc.GetMembership() {
    if !gc.EligibleForChannel(member) {
      continue
    }
    stateInf := gc.stateInfoMsgStore.MsgByID(member.PKIid)
    if stateInf == nil {
      continue
    }
    props := stateInf.GetStateInfo().Properties
    if props != nil && props.LeftChannel {
      continue
    }
    member.Metadata = stateInf.GetStateInfo().Metadata
    member.Properties = stateInf.GetStateInfo().Properties
    members = append(members, member)
  }
  return members
}

func (gc *gossipChannel) requestStateInfo() {
  req, err := gc.createStateInfoRequest()
  if err != nil {
    gc.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))
    return
  }
  endpoints := filter.SelectPeers(gc.GetConf().PullPeerNum, gc.GetMembership(), gc.IsMemberInChan)
  gc.Send(req, endpoints...)
}

func (gc *gossipChannel) eligibleForChannelAndSameOrg(member discovery.NetworkMember) bool {
  sameOrg := func(networkMember discovery.NetworkMember) bool {
    return bytes.Equal(gc.GetOrgOfPeer(networkMember.PKIid), gc.selfOrg)
  }
  return filter.CombineRoutingFilters(gc.EligibleForChannel, sameOrg)(member)
}

func (gc *gossipChannel) publishStateInfo() {
  if atomic.LoadInt32(&gc.shouldGossipStateInfo) == int32(0) {
    return
  }
  gc.RLock()
  stateInfoMsg := gc.stateInfoMsg
  gc.RUnlock()
  gc.Gossip(stateInfoMsg)
  if len(gc.GetMembership()) > 0 {
    atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(0))
  }
}

func (gc *gossipChannel) createBlockPuller() pull.Mediator {
  conf := pull.Config{
    MsgType:           proto.PullMsgType_BLOCK_MSG,
    Channel:           []byte(gc.chainID),
    ID:                gc.GetConf().ID,
    PeerCountToSelect: gc.GetConf().PullPeerNum,
    PullInterval:      gc.GetConf().PullInterval,
    Tag:               proto.GossipMessage_CHAN_AND_ORG,
  }
  seqNumFromMsg := func(msg *proto.SignedGossipMessage) string {
    dataMsg := msg.GetDataMsg()
    if dataMsg == nil || dataMsg.Payload == nil {
      gc.logger.Warning("Non-data block or with no payload")
      return ""
    }
    return fmt.Sprintf("%d", dataMsg.Payload.SeqNum)
  }
  adapter := &pull.PullAdapter{
    Sndr:        gc,
    MemSvc:      gc.memFilter,
    IdExtractor: seqNumFromMsg,
    MsgCons: func(msg *proto.SignedGossipMessage) {
      gc.DeMultiplex(msg)
    },
  }

  adapter.IngressDigFilter = func(digestMsg *proto.DataDigest) *proto.DataDigest {
    gc.RLock()
    height := gc.ledgerHeight
    gc.RUnlock()
    digests := digestMsg.Digests
    digestMsg.Digests = nil
    for i := range digests {
      seqNum, err := strconv.ParseUint(digests[i], 10, 64)
      if err != nil {
        gc.logger.Warningf("Can't parse digest %s : %+v", digests[i], errors.WithStack(err))
        continue
      }
      if seqNum >= height {
        digestMsg.Digests = append(digestMsg.Digests, digests[i])
      }

    }
    return digestMsg
  }

  return pull.NewPullMediator(conf, adapter)
}

// IsMemberInChan checks whether the given member is eligible to be in the channel
func (gc *gossipChannel) IsMemberInChan(member discovery.NetworkMember) bool {
  org := gc.GetOrgOfPeer(member.PKIid)
  if org == nil {
    return false
  }

  return gc.IsOrgInChannel(org)
}

// PeerFilter receives a SubChannelSelectionCriteria and returns a RoutingFilter that selects
// only peer identities that match the given criteria
func (gc *gossipChannel) PeerFilter(messagePredicate api.SubChannelSelectionCriteria) filter.RoutingFilter {
  return func(member discovery.NetworkMember) bool {
    peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
    if len(peerIdentity) == 0 {
      return false
    }
    msg := gc.stateInfoMsgStore.MembershipStore.MsgByID(member.PKIid)
    if msg == nil {
      return false
    }

    return messagePredicate(api.PeerSignature{
      Message:      msg.Payload,
      Signature:    msg.Signature,
      PeerIdentity: peerIdentity,
    })
  }
}

// IsOrgInChannel returns whether the given organization is in the channel
func (gc *gossipChannel) IsOrgInChannel(membersOrg api.OrgIdentityType) bool {
  gc.RLock()
  defer gc.RUnlock()
  for _, orgOfChan := range gc.orgs {
    if bytes.Equal(orgOfChan, membersOrg) {
      return true
    }
  }
  return false
}

// EligibleForChannel returns whether the given member should get blocks
// for this channel
func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool {
  peerIdentity := gc.GetIdentityByPKIID(member.PKIid)
  if len(peerIdentity) == 0 {
    gc.logger.Warning("Identity for peer", member.PKIid, "doesn't exist")
    return false
  }
  msg := gc.stateInfoMsgStore.MsgByID(member.PKIid)
  if msg == nil {
    return false
  }
  return true
}

// AddToMsgStore adds a given GossipMessage to the message store
func (gc *gossipChannel) AddToMsgStore(msg *proto.SignedGossipMessage) {
  if msg.IsDataMsg() {
    gc.blockMsgStore.Add(msg)
    gc.blocksPuller.Add(msg)
  }

  if msg.IsStateInfoMsg() {
    gc.stateInfoMsgStore.Add(msg)
  }
}

// ConfigureChannel (re)configures the list of organizations
// that are eligible to be in the channel
func (gc *gossipChannel) ConfigureChannel(joinMsg api.JoinChannelMessage) {
  gc.Lock()
  defer gc.Unlock()

  if len(joinMsg.Members()) == 0 {
    gc.logger.Warning("Received join channel message with empty set of members")
    return
  }

  if gc.joinMsg == nil {
    gc.joinMsg = joinMsg
  }

  if gc.joinMsg.SequenceNumber() > (joinMsg.SequenceNumber()) {
    gc.logger.Warning("Already have a more updated JoinChannel message(", gc.joinMsg.SequenceNumber(), ") than", joinMsg.SequenceNumber())
    return
  }

  gc.orgs = joinMsg.Members()
  gc.joinMsg = joinMsg
  gc.stateInfoMsgStore.validate(joinMsg.Members())
}

// HandleMessage processes a message sent by a remote peer
func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
  if !gc.verifyMsg(msg) {
    gc.logger.Warning("Failed verifying message:", msg.GetGossipMessage().GossipMessage)
    return
  }
  m := msg.GetGossipMessage()
  if !m.IsChannelRestricted() {
    gc.logger.Warning("Got message", msg.GetGossipMessage(), "but it's not a per-channel message, discarding it")
    return
  }
  orgID := gc.GetOrgOfPeer(msg.GetConnectionInfo().ID)
  if len(orgID) == 0 {
    gc.logger.Debug("Couldn't find org identity of peer", msg.GetConnectionInfo())
    return
  }
  if !gc.IsOrgInChannel(orgID) {
    gc.logger.Warning("Point to point message came from", msg.GetConnectionInfo(),
      ", org(", string(orgID), ") but it's not eligible for the channel", string(gc.chainID))
    return
  }

  if m.IsStateInfoPullRequestMsg() {
    msg.Respond(gc.createStateInfoSnapshot(orgID))
    return
  }

  if m.IsStateInfoSnapshot() {
    gc.handleStateInfSnapshot(m.GossipMessage, msg.GetConnectionInfo().ID)
    return
  }

  if m.IsDataMsg() || m.IsStateInfoMsg() {
    added := false

    if m.IsDataMsg() {
      if m.GetDataMsg().Payload == nil {
        gc.logger.Warning("Payload is empty, got it from", msg.GetConnectionInfo().ID)
        return
      }
      // Would this block go into the message store if it was verified?
      if !gc.blockMsgStore.CheckValid(msg.GetGossipMessage()) {
        return
      }
      if !gc.verifyBlock(m.GossipMessage, msg.GetConnectionInfo().ID) {
        gc.logger.Warning("Failed verifying block", m.GetDataMsg().Payload.SeqNum)
        return
      }
      added = gc.blockMsgStore.Add(msg.GetGossipMessage())
    } else { // StateInfoMsg verification should be handled in a layer above
      //  since we don't have access to the id mapper here
      added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
    }

    if added {
      // Forward the message
      gc.Forward(msg)
      // DeMultiplex to local subscribers
      gc.DeMultiplex(m)

      if m.IsDataMsg() {
        gc.blocksPuller.Add(msg.GetGossipMessage())
      }
    }
    return
  }

  if m.IsPullMsg() && m.GetPullMsgType() == proto.PullMsgType_BLOCK_MSG {
    if gc.hasLeftChannel() {
      gc.logger.Info("Received Pull message from", msg.GetConnectionInfo().Endpoint, "but left the channel", string(gc.chainID))
      return
    }
    // If we don't have a StateInfo message from the peer,
    // no way of validating its eligibility in the channel.
    if gc.stateInfoMsgStore.MsgByID(msg.GetConnectionInfo().ID) == nil {
      gc.logger.Debug("Don't have StateInfo message of peer", msg.GetConnectionInfo())
      return
    }
    if !gc.eligibleForChannelAndSameOrg(discovery.NetworkMember{PKIid: msg.GetConnectionInfo().ID}) {
      gc.logger.Warning(msg.GetConnectionInfo(), "isn't eligible for pulling blocks of", string(gc.chainID))
      return
    }
    if m.IsDataUpdate() {
      // Iterate over the envelopes, and filter out blocks
      // that we already have in the blockMsgStore, or blocks that
      // are too far in the past.
      filteredEnvelopes := []*proto.Envelope{}
      for _, item := range m.GetDataUpdate().Data {
        gMsg, err := item.ToGossipMessage()
        if err != nil {
          gc.logger.Warningf("Data update contains an invalid message: %+v", errors.WithStack(err))
          return
        }
        if !bytes.Equal(gMsg.Channel, []byte(gc.chainID)) {
          gc.logger.Warning("DataUpdate message contains item with channel", gMsg.Channel, "but should be", gc.chainID)
          return
        }
        // Would this block go into the message store if it was verified?
        if !gc.blockMsgStore.CheckValid(msg.GetGossipMessage()) {
          return
        }
        if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {
          return
        }
        added := gc.blockMsgStore.Add(gMsg)
        if !added {
          // If this block doesn't need to be added, it means it either already
          // exists in memory or that it is too far in the past
          continue
        }
        filteredEnvelopes = append(filteredEnvelopes, item)
      }
      // Replace the update message with just the blocks that should be processed
      m.GetDataUpdate().Data = filteredEnvelopes
    }
    gc.blocksPuller.HandleMessage(msg)
  }

  if m.IsLeadershipMsg() {
    // Handling leadership message
    added := gc.leaderMsgStore.Add(m)
    if added {
      gc.DeMultiplex(m)
    }
  }
}

func (gc *gossipChannel) handleStateInfSnapshot(m *proto.GossipMessage, sender common.PKIidType) {
  chanName := string(gc.chainID)
  for _, envelope := range m.GetStateSnapshot().Elements {
    stateInf, err := envelope.ToGossipMessage()
    if err != nil {
      gc.logger.Warningf("Channel %s : StateInfo snapshot contains an invalid message: %+v", chanName, errors.WithStack(err))
      return
    }
    if !stateInf.IsStateInfoMsg() {
      gc.logger.Warning("Channel", chanName, ": Element of StateInfoSnapshot isn't a StateInfoMessage:",
        stateInf, "message sent from", sender)
      return
    }
    si := stateInf.GetStateInfo()
    orgID := gc.GetOrgOfPeer(si.PkiId)
    if orgID == nil {
      gc.logger.Debug("Channel", chanName, ": Couldn't find org identity of peer",
        string(si.PkiId), "message sent from", string(sender))
      return
    }

    if !gc.IsOrgInChannel(orgID) {
      gc.logger.Warning("Channel", chanName, ": Peer", stateInf.GetStateInfo().PkiId,
        "is not in an eligible org, can't process a stateInfo from it, sent from", sender)
      return
    }

    expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
    if !bytes.Equal(si.Channel_MAC, expectedMAC) {
      gc.logger.Warning("Channel", chanName, ": StateInfo message", stateInf,
        ", has an invalid MAC. Expected", expectedMAC, ", got", si.Channel_MAC, ", sent from", sender)
      return
    }
    err = gc.ValidateStateInfoMessage(stateInf)
    if err != nil {
      gc.logger.Warningf("Channel %s: Failed validating state info message: %v sent from %v : %+v", chanName, stateInf, sender, errors.WithStack(err))
      return
    }

    if gc.Lookup(si.PkiId) == nil {
      // Skip StateInfo messages that belong to peers
      // that have been expired
      continue
    }

    gc.stateInfoMsgStore.Add(stateInf)
  }
}

func (gc *gossipChannel) verifyBlock(msg *proto.GossipMessage, sender common.PKIidType) bool {
  if !msg.IsDataMsg() {
    gc.logger.Warning("Received from ", sender, "a DataUpdate message that contains a non-block GossipMessage:", msg)
    return false
  }
  payload := msg.GetDataMsg().Payload
  if payload == nil {
    gc.logger.Warning("Received empty payload from", sender)
    return false
  }
  seqNum := payload.SeqNum
  rawBlock := payload.Data
  err := gc.mcs.VerifyBlock(msg.Channel, seqNum, rawBlock)
  if err != nil {
    gc.logger.Warningf("Received fabricated block from %v in DataUpdate: %+v", sender, errors.WithStack(err))
    return false
  }
  return true
}

func (gc *gossipChannel) createStateInfoSnapshot(requestersOrg api.OrgIdentityType) *proto.GossipMessage {
  sameOrg := bytes.Equal(gc.selfOrg, requestersOrg)
  rawElements := gc.stateInfoMsgStore.Get()
  elements := []*proto.Envelope{}
  for _, rawEl := range rawElements {
    msg := rawEl.(*proto.SignedGossipMessage)
    orgOfCurrentMsg := gc.GetOrgOfPeer(msg.GetStateInfo().PkiId)
    // If we're in the same org as the requester, or the message belongs to a foreign org
    // don't do any filtering
    if sameOrg || !bytes.Equal(orgOfCurrentMsg, gc.selfOrg) {
      elements = append(elements, msg.Envelope)
      continue
    }
    // Else, the requester is in a different org, so disclose only StateInfo messages that their
    // corresponding AliveMessages have external endpoints
    if netMember := gc.Lookup(msg.GetStateInfo().PkiId); netMember == nil || netMember.Endpoint == "" {
      continue
    }
    elements = append(elements, msg.Envelope)
  }

  return &proto.GossipMessage{
    Channel: gc.chainID,
    Tag:     proto.GossipMessage_CHAN_OR_ORG,
    Nonce:   0,
    Content: &proto.GossipMessage_StateSnapshot{
      StateSnapshot: &proto.StateInfoSnapshot{
        Elements: elements,
      },
    },
  }
}

func (gc *gossipChannel) verifyMsg(msg proto.ReceivedMessage) bool {
  if msg == nil {
    gc.logger.Warning("Messsage is nil")
    return false
  }
  m := msg.GetGossipMessage()
  if m == nil {
    gc.logger.Warning("Message content is empty")
    return false
  }

  if msg.GetConnectionInfo().ID == nil {
    gc.logger.Warning("Message has nil PKI-ID")
    return false
  }

  if m.IsStateInfoMsg() {
    si := m.GetStateInfo()
    expectedMAC := GenerateMAC(si.PkiId, gc.chainID)
    if !bytes.Equal(expectedMAC, si.Channel_MAC) {
      gc.logger.Warning("Message contains wrong channel MAC(", si.Channel_MAC, "), expected", expectedMAC)
      return false
    }
    return true
  }

  if m.IsStateInfoPullRequestMsg() {
    sipr := m.GetStateInfoPullReq()
    expectedMAC := GenerateMAC(msg.GetConnectionInfo().ID, gc.chainID)
    if !bytes.Equal(expectedMAC, sipr.Channel_MAC) {
      gc.logger.Warning("Message contains wrong channel MAC(", sipr.Channel_MAC, "), expected", expectedMAC)
      return false
    }
    return true
  }

  if !bytes.Equal(m.Channel, []byte(gc.chainID)) {
    gc.logger.Warning("Message contains wrong channel(", m.Channel, "), expected", gc.chainID)
    return false
  }
  return true
}

func (gc *gossipChannel) createStateInfoRequest() (*proto.SignedGossipMessage, error) {
  return (&proto.GossipMessage{
    Tag:   proto.GossipMessage_CHAN_OR_ORG,
    Nonce: 0,
    Content: &proto.GossipMessage_StateInfoPullReq{
      StateInfoPullReq: &proto.StateInfoPullRequest{
        Channel_MAC: GenerateMAC(gc.pkiID, gc.chainID),
      },
    },
  }).NoopSign()
}

// UpdateStateInfo updates this channel's StateInfo message
// that is periodically published
func (gc *gossipChannel) UpdateStateInfo(msg *proto.SignedGossipMessage) {
  if !msg.IsStateInfoMsg() {
    return
  }

  gc.Lock()
  defer gc.Unlock()

  gc.stateInfoMsgStore.Add(msg)
  gc.ledgerHeight = msg.GetStateInfo().Properties.LedgerHeight
  gc.stateInfoMsg = msg
  atomic.StoreInt32(&gc.shouldGossipStateInfo, int32(1))
}

func newStateInfoCache(sweepInterval time.Duration, hasExpired func(interface{}) bool, verifyFunc membershipPredicate) *stateInfoCache {
  membershipStore := util.NewMembershipStore()
  pol := proto.NewGossipMessageComparator(0)

  s := &stateInfoCache{
    verify:          verifyFunc,
    MembershipStore: membershipStore,
    stopChan:        make(chan struct{}),
  }
  invalidationTrigger := func(m interface{}) {
    pkiID := m.(*proto.SignedGossipMessage).GetStateInfo().PkiId
    membershipStore.Remove(pkiID)
  }
  s.MessageStore = msgstore.NewMessageStore(pol, invalidationTrigger)

  go func() {
    for {
      select {
      case <-s.stopChan:
        return
      case <-time.After(sweepInterval):
        s.Purge(hasExpired)
      }
    }
  }()
  return s
}

// membershipPredicate receives a StateInfoMessage and optionally a slice of organization identifiers
// and returns whether the peer that signed the given StateInfoMessage is eligible
// to the channel or not
type membershipPredicate func(msg *proto.SignedGossipMessage, orgs ...api.OrgIdentityType) bool

// stateInfoCache is actually a messageStore
// that also indexes messages that are added
// so that they could be extracted later
type stateInfoCache struct {
  verify membershipPredicate
  *util.MembershipStore
  msgstore.MessageStore
  stopChan chan struct{}
}

func (cache *stateInfoCache) validate(orgs []api.OrgIdentityType) {
  for _, m := range cache.Get() {
    msg := m.(*proto.SignedGossipMessage)
    if !cache.verify(msg, orgs...) {
      cache.delete(msg)
    }
  }
}

// Add attempts to add the given message to the stateInfoCache,
// and if the message was added, also indexes it.
// Message must be a StateInfo message.
func (cache *stateInfoCache) Add(msg *proto.SignedGossipMessage) bool {
  if !cache.MessageStore.CheckValid(msg) {
    return false
  }
  if !cache.verify(msg) {
    return false
  }
  added := cache.MessageStore.Add(msg)
  if added {
    pkiID := msg.GetStateInfo().PkiId
    cache.MembershipStore.Put(pkiID, msg)
  }
  return added
}

func (cache *stateInfoCache) delete(msg *proto.SignedGossipMessage) {
  cache.Purge(func(o interface{}) bool {
    pkiID := o.(*proto.SignedGossipMessage).GetStateInfo().PkiId
    return bytes.Equal(pkiID, msg.GetStateInfo().PkiId)
  })
  cache.Remove(msg.GetStateInfo().PkiId)
}

func (cache *stateInfoCache) Stop() {
  cache.stopChan <- struct{}{}
}

// GenerateMAC returns a byte slice that is derived from the peer's PKI-ID
// and a channel name
func GenerateMAC(pkiID common.PKIidType, channelID common.ChainID) []byte {
  // Hash is computed on (PKI-ID || channel ID)
  preImage := append([]byte(pkiID), []byte(channelID)...)
  return common_utils.ComputeSHA256(preImage)
}

代码有点多,看起来是不是很懵逼,没关系!我们来抽丝剥茧

type gossipChannel struct {
  stopChan chan struct{}
}

// NewGossipChannel creates a new GossipChannel
func NewGossipChannel(pkiID common.PKIidType, org api.OrgIdentityType, mcs api.MessageCryptoService,
  chainID common.ChainID, adapter Adapter, joinMsg api.JoinChannelMessage) GossipChannel {
  gc := &gossipChannel{
    stopChan: make(chan struct{}, 1),  
  }
  // Periodically publish state info
  go gc.periodicalInvocation(gc.publishStateInfo, gc.stateInfoPublishScheduler.C)
  // Periodically request state info
  go gc.periodicalInvocation(gc.requestStateInfo, gc.stateInfoRequestScheduler.C)
  return gc
}

// Stop stop the channel operations
func (gc *gossipChannel) Stop() {
  gc.stopChan <- struct{}{} 
}

func (gc *gossipChannel) periodicalInvocation(fn func(), c <-chan time.Time) {
  for {
    select {
    case <-c:
      fn()
    case <-gc.stopChan:
      gc.stopChan <- struct{}{}
      return
    }
  }
}

是不是还是有点懵逼,看Demo:

package main

import (
  "fmt"
  "runtime"
  "time"
)

type Xiequna struct {
  xch chan string
  bch chan bool
}

func main() {
  x := New()
  for {
    x.sendingGoRoutine()  // 阻塞程序,模拟不同数据写入
    time.Sleep(time.Second * 2)
    x.sendingGoRoutineBool()
  }

}

func New() (x *Xiequna) {
  x = &Xiequna{
    xch: make(chan string, 10), //初始化channel,缓冲为10
    bch: make(chan bool, 10),
  }
  for i := 0; i < runtime.NumCPU(); i++ {
    go x.receivingGoRoutine()
  }
  return x
}

func (x *Xiequna) sendingGoRoutine() {
  x.xch <- "xiequan.info"
}

func (x *Xiequna) sendingGoRoutineBool() {
  x.bch <- true
}

func (x *Xiequna) receivingGoRoutine() {
  for {
    select { select 处理多个不同的channel 数据
    case v := <-x.xch:
      fmt.Println("Received value ", v)
    case u := <-x.bch:
      fmt.Println("Received bool ", u)
    default:
    }
  }
}

看看效果:

打赏作者

您的支持将鼓励我们继续创作!

[微信] 扫描二维码打赏

[支付宝] 扫描二维码打赏

1 thought on “阅读hyperledger fabric源码学习Go并发编程”

  1. 感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/dz3djl 欢迎点赞支持!
    使用开发者头条 App 搜索 4410 即可订阅《谢权blog》

Leave a Reply

Your email address will not be published. Required fields are marked *