博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
开源代码memberlist源码分析
阅读量:4031 次
发布时间:2019-05-24

本文共 22847 字,大约阅读时间需要 76 分钟。

本文微信公众号链接:

 

memberlist是go语言开发的,基于Gossip协议来传播消息,用来管理分布式集群内节点发现、 节点失效探测、节点列表的软件包。

 

对于Gossip协议之前写过一篇文章: 

 

源码地址 https://github.com/hashicorp/memberlist

 

为了学习memberlist的原理设计,遵循个人从低版本代码研究的习惯。这里一提交号fe04265为分析。

 

再次备注:学习早期版本,只是为了学习开源代码的设计原理,底层工作原理。以及版本在进化过程中,源码的改进。

 

源码目录:

 

整体代码风格像面向对象c的风格。模块划分刚好以文件名为划分

1、broadcast.go :广播模块

2、net.go:传输与协议处理模块

3、state.go:节点状态管理模块

4、memberlist.go:主模块

 

 

github.com/hashicorp/memberlist/memberlist.go

Memberlist

在结构体Memberlist中,成员变量也是按照功能不同分隔

type Memberlist struct {config *Config  //配置shutdown bool  //本地服务关闭的标志位leave bool  //本节点退出的标志位udpListener *net.UDPConntcpListener *net.TCPListener//udp和tcp的链接管理。对应的net.go,传输与协议管理sequenceNum uint32 // Local sequence number//本地seq numincarnation uint32 // Local incarnation number//本地inc numnodeLock sync.RWMutexnodes []*NodeState // Known nodesnodeMap map[string]*NodeState // Maps Addr.String() -> NodeState//node管理以及state管理对应state.gotickerLock sync.Mutextickers []*time.TickerstopTick chan struct{}probeIndex intackLock sync.MutexackHandlers map[uint32]*ackHandlerbroadcastLock sync.MutexbcQueue broadcasts//broadcast管理,对应broadcast.go}

 

 

Config

在config中,前面是一些基本的配置项,注释也都有解释。

type Config struct {Name string // Node name (FQDN)BindAddr string // Binding addressUDPPort int // UDP port to listen onTCPPort int // TCP port to listen onTCPTimeout time.Duration // TCP timeoutIndirectChecks int // Number of indirect checks to useRetransmitMult int // Retransmits = RetransmitMult * log(N+1)SuspicionMult int // Suspicion time = SuspcicionMult * log(N+1) * IntervalPushPullInterval time.Duration // How often we do a Push/Pull updateRTT time.Duration // 99% precentile of round-trip-timeProbeInterval time.Duration // Failure probing interval lengthGossipNodes int // Number of nodes to gossip to per GossipIntervalGossipInterval time.Duration // Gossip interval for non-piggyback messages (only if GossipNodes > 0)JoinCh chan<- *NodeLeaveCh chan<- *Node}

 

在最后两行

JoinCh:这个是对外提供的一个接口,用于做新增node的时候,作为外部注册通知处理

LeaveCh:这个是对外提供的一个接口,用于做对去除一个node的时候,做为外部注册通知处理

这两个chan,在更早的版本中是在结构体memberlist中。后来移到了config中。

 

开始进入流程

Create

// Create will start memberlist and create a new gossip pool, but// will not connect to an existing node. This should only be used// for the first node in the cluster.func Create(conf *Config) (*Memberlist, error) {  m, err := newMemberlist(conf)//newMemberlist,中开启了tcplisten和udplisten  if err != nil {  return nil, err  }  if err := m.setAlive(); err != nil {  m.Shutdown()  return nil, err  }  m.schedule()//schedule中开启了三个服务:probe、pushpull、gossip  return m, nil}
这里面有两个重要步骤
1、newMemberlist
2、m.schedule

 

newMemberlist
// newMemberlist creates the network listeners.// Does not schedule exeuction of background maintenence.func newMemberlist(conf *Config) (*Memberlist, error) {      tcpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.TCPPort)    tcpLn, err := net.Listen("tcp", tcpAddr)    if err != nil {        return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)    }//上面是创建tcplisten    udpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.UDPPort)    udpLn, err := net.ListenPacket("udp", udpAddr)    if err != nil {        tcpLn.Close()        return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)    }//上面是创建udplisten    m := &Memberlist{config: conf,        udpListener: udpLn.(*net.UDPConn),        tcpListener: tcpLn.(*net.TCPListener),        nodeMap: make(map[string]*NodeState),        stopTick: make(chan struct{}, 32),        ackHandlers: make(map[uint32]*ackHandler),      }//构建Memberlist实例    go m.tcpListen() //开启tcp服务    go m.udpListen() //开启udp服务    return m, nil}
在newMemberlist中,最主要的动作就是开启了tcp服务和udp服务
那么就看看net服务(tcp和udp)
github.com/hashicorp/memberlist/net.go
tcp
tcplisten
// tcpListen listens for and handles incoming connectionsfunc (m *Memberlist) tcpListen() {    for {        //tcp accept        conn, err := m.tcpListener.AcceptTCP()        if err != nil {            if m.shutdown {                break            }            log.Printf("[ERR] Error accepting TCP connection: %s", err)            continue        }        //每个链接都有一个处理部分handleConn        go m.handleConn(conn)    }}
继续看
handleConn
// handleConn handles a single incoming TCP connectionfunc (m *Memberlist) handleConn(conn *net.TCPConn) {    defer conn.Close()//读取Remote的状态    remoteNodes, err := readRemoteState(conn)    if err != nil {        log.Printf("[ERR] Failed to receive remote state: %s", err)    return    }//发送本地节点的状态    if err := m.sendLocalState(conn); err != nil {        log.Printf("[ERR] Failed to push local state: %s", err)    }//将收到的Remote状态进行更新    m.mergeState(remoteNodes)}
tcp服务提供的功能就是:同步节点状态。
readRemoteState
读取节点状态信息,并返回
// recvRemoteState is used to read the remote state from a connectionfunc readRemoteState(conn net.Conn) ([]pushNodeState, error) {// Read the message type//读取数据    buf := []byte{0}    if _, err := conn.Read(buf); err != nil {        return nil, err    }//读取消息类型    msgType := uint8(buf[0])// Quit if not push/pull//支持push和pull消息    if msgType != pushPullMsg {        err := fmt.Errorf("received invalid msgType (%d)", msgType)        return nil, err    }// Read the push/pull header//解码    var header pushPullHeader    hd := codec.MsgpackHandle{}    dec := codec.NewDecoder(conn, &hd)    if err := dec.Decode(&header); err != nil {        return nil, err    }// Allocate space for the transfer//解码所有的节点信息    remoteNodes := make([]pushNodeState, header.Nodes)// Try to decode all the states    for i := 0; i < header.Nodes; i++ {        if err := dec.Decode(&remoteNodes[i]); err != nil {            return remoteNodes, err        }    }//返回节点状态信息    return remoteNodes, nil}
 
sendLocalState
发送本地存储的节点状态信息
// sendLocalState is invoked to send our local state over a tcp connectionfunc (m *Memberlist) sendLocalState(conn net.Conn) error {// Prepare the local node state//收集本地存储的节点状态信息    m.nodeLock.RLock()    localNodes := make([]pushNodeState, len(m.nodes))    for idx, n := range m.nodes {        localNodes[idx].Name = n.Name        localNodes[idx].Addr = n.Addr        localNodes[idx].Incarnation = n.Incarnation        localNodes[idx].State = n.State    }    m.nodeLock.RUnlock()//添加头部信息// Send our node state    header := pushPullHeader{Nodes: len(localNodes)}    hd := codec.MsgpackHandle{}    enc := codec.NewEncoder(conn, &hd)// Begin state push    conn.Write([]byte{pushPullMsg})//编码并发送    if err := enc.Encode(&header); err != nil {        return err    }    for i := 0; i < header.Nodes; i++ {        if err := enc.Encode(&localNodes[i]); err != nil {            return err        }    }    return nil}
 
mergeState
更新节点状态
// mergeState is invoked by the network layer when we get a Push/Pull// state transferfunc (m *Memberlist) mergeState(remote []pushNodeState) {   for _, r := range remote {      // Look for a matching local node      m.nodeLock.RLock()      local, ok := m.nodeMap[r.Name]      m.nodeLock.RUnlock()      // Skip if we agree on states      if ok && local.State == r.State {  //若状态与本地存储状态一直,则跳过        continue      }//三种状态      switch r.State {//StateAlive      case StateAlive:         a := alive{Incarnation: r.Incarnation, Node: r.Name, Addr: r.Addr}         m.aliveNode(&a)//StateSupect      case StateSuspect:         s := suspect{Incarnation: r.Incarnation, Node: r.Name}         m.suspectNode(&s)//StateDead      case StateDead:         d := dead{Incarnation: r.Incarnation, Node: r.Name}         m.deadNode(&d)      }   }}
存在三种状态:
StateAlive:处理函数aliveNode
StateSupect:处理函数supectNode
StateDead:处理函数deadNode
这三者的处理在后续解读
tcp小结:
tcp链接,主要处理节点状态信息的同步与更新。
 
udp
udpListen
代码还是很简单的,不断读取数据,然后交给handleCommand处理
// udpListen listens for and handles incoming UDP packetsfunc (m *Memberlist) udpListen() {   mainBuf := make([]byte, udpBufSize)   var n int   var addr net.Addr   var err error   for {      // Reset buffer      buf := mainBuf[0:udpBufSize]      // Read a packet//不断从udplisten中读取数据      n, addr, err = m.udpListener.ReadFrom(buf)      if err != nil {         if m.shutdown {            break         }         log.Printf("[ERR] Error reading UDP packet: %s", err)         continue      }      // Check the length      if n < 1 {         log.Printf("[ERR] UDP packet too short (%d bytes). From: %s", len(buf), addr)         continue      }      // Handle the command//真正处理部分    m.handleCommand(buf[:n], addr)   }}
 
handleCommand
func (m *Memberlist) handleCommand(buf []byte, from net.Addr) {   // Decode the message type//解码消息类型   msgType := uint8(buf[0])   buf = buf[1:]   // Switch on the msgType//根据消息不同消息类型,进行不同的处理switch msgType {   case compoundMsg:      m.handleCompound(buf, from)   case pingMsg:      m.handlePing(buf, from)   case indirectPingMsg:      m.handleIndirectPing(buf, from)   case ackRespMsg:      m.handleAck(buf, from)   case suspectMsg:      m.handleSuspect(buf, from)   case aliveMsg:      m.handleAlive(buf, from)   case deadMsg:      m.handleDead(buf, from)   default:      log.Printf("[ERR] UDP msg type (%d) not supported. From: %s", msgType, from)   }}
一共有:
compoundMsg:处理函数为handleCompound
    多个消息聚合在一起,进行分割,然后再重新调用handleCommand
func (m *Memberlist) handleCompound(buf []byte, from net.Addr) {   // Decode the parts//消息分割   trunc, parts, err := decodeCompoundMessage(buf)   if err != nil {      log.Printf("[ERR] Failed to decode compound request: %s", err)      return   }   // Log any truncation   if trunc > 0 {      log.Printf("[WARN] Compound request had %d truncated messages", trunc)   }   // Handle each message   for _, part := range parts {//分割的消息重新调用handleCommand      m.handleCommand(part, from)   }}
pingMsg:处理函数为:handlePing
indirectPingMsg:处理函数为handleindirectPing
ackRespMsg:处理函数为handleAck
    上面三个消息,都比较简单
suspectMsg:处理函数handleSuspect
    调用的函数为suspectNode
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {   var sus suspect   if err := decode(buf, &sus); err != nil {      log.Printf("[ERR] Failed to decode suspect message: %s", err)      return   }   m.suspectNode(&sus)}
 
aliveMsg:处理函数handleAlive
    调用的函数为aliveNode
func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {   var live alive   if err := decode(buf, &live); err != nil {      log.Printf("[ERR] Failed to decode alive message: %s", err)      return   }   m.aliveNode(&live)}
 
deadMsg:处理函数handleDead
    调用的函数为deadNode
func (m *Memberlist) handleDead(buf []byte, from net.Addr) {   var d dead   if err := decode(buf, &d); err != nil {      log.Printf("[ERR] Failed to decode dead message: %s", err)      return   }   m.deadNode(&d)}
 
udp小结:
udp服务提供了一些基本的Command操作
 
github.com/hashicorp/memberlist/state.go
节点状态信息管理
在github.com/hashicorp/memberlist/memberlist.go中
Create,最后调用的函数schedule
// Schedule is used to ensure the Tick is performed periodicallyfunc (m *Memberlist) schedule() {   m.tickerLock.Lock()   defer m.tickerLock.Unlock()   // Create a new probeTicker//开启了probe协程   if m.config.ProbeInterval > 0 {      t := time.NewTicker(m.config.ProbeInterval)      go m.triggerFunc(t.C, m.probe)      m.tickers = append(m.tickers, t)   }   // Create a push pull ticker if needed//开启了pushpull协程   if m.config.PushPullInterval > 0 {      t := time.NewTicker(m.config.PushPullInterval)      go m.triggerFunc(t.C, m.pushPull)      m.tickers = append(m.tickers, t)   }   // Create a gossip ticker if needed//开启了gossip协程   if m.config.GossipNodes > 0 {      t := time.NewTicker(m.config.GossipInterval)      go m.triggerFunc(t.C, m.gossip)      m.tickers = append(m.tickers, t)   }}
在这里面一共开启了三个定时任务
probe、pushpull、gossip
 
probe

当节点启动后,每隔一定时间间隔,会选取一个节点对其发送PING消息,当PING消息失败后,会随机选取 IndirectChecks 个节点发起间接PING的请求和直接更其再发起一个tcp PING消息。 收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。 如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。

https://www.colabug.com/1010287.html

 

// Tick is used to perform a single round of failure detection and gossipfunc (m *Memberlist) probe() {   // Track the number of indexes we've considered probing   numCheck := 0START:   // Make sure we don't wrap around infinitely   if numCheck >= len(m.nodes) {      return   }   // Handle the wrap around case//probeIndex是node索引,循环进行探测   if m.probeIndex >= len(m.nodes) {      m.resetNodes()      m.probeIndex = 0      numCheck++      goto START   }   // Determine if we should probe this node   skip := false   var node *NodeState   m.nodeLock.RLock()   node = m.nodes[m.probeIndex]   if node.Name == m.config.Name {      skip = true//当node在配置文件中} else if node.State == StateDead {      skip = true//当node为dead时候}   // Potentially skip   m.nodeLock.RUnlock()   if skip {//node在配置文件中或者为dead时候则跳过      numCheck++      m.probeIndex++      goto START   }   // Probe the specific node//进行probem.probeNode(node)}
// probeNode handles a single round of failure checking on a nodefunc (m *Memberlist) probeNode(node *NodeState) {   // Send a ping to the node   ping := ping{SeqNo: m.nextSeqNo()}   destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}   // Setup an ack handler   ackCh := make(chan bool, m.config.IndirectChecks+1)   m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)   // Send the ping message//发送pingMsg   if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {      log.Printf("[ERR] Failed to send ping: %s", err)      return   }   // Wait for response or round-trip-time   select {   case v := <-ackCh:      if v == true {         return      }   case <-time.After(m.config.RTT):   }   // Get some random live nodes   m.nodeLock.RLock()   excludes := []string{m.config.Name, node.Name}//随机获取一些节点   kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes)   m.nodeLock.RUnlock()   // Attempt an indirect ping   ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr}   for _, peer := range kNodes {      destAddr := &net.UDPAddr{IP: peer.Addr, Port: m.config.UDPPort}//发送indirectPingMsg      if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {         log.Printf("[ERR] Failed to send indirect ping: %s", err)      }   }   // Wait for the acks or timeout   select {   case v := <-ackCh:      if v == true {         return      }   }   // No acks received from target, suspect   s := suspect{Incarnation: node.Incarnation, Node: node.Name}//若探测结果失败则将node设置为suspect   m.suspectNode(&s)}
 
pushpull

每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,然后将本地的全部节点 状态、用户数据发送过去,然后对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。 此动作可以加速集群内信息的收敛速度。

https://www.jianshu.com/p/e2173b44db65

// pushPull is invoked periodically to randomly perform a state// exchange. Used to ensure a high level of convergence.func (m *Memberlist) pushPull() {   // Get a random live node   m.nodeLock.RLock()   excludes := []string{m.config.Name}//随机选取1个节点   nodes := kRandomNodes(1, excludes, m.nodes)   m.nodeLock.RUnlock()   // If no nodes, bail   if len(nodes) == 0 {      return   }   node := nodes[0]   // Attempt a push pull//调用pushPullNodeif err := m.pushPullNode(node.Addr); err != nil {      log.Printf("[ERR] Push/Pull with %s failed: %s", node.Name, err)   }}
上面随机选取一个节点
 
// pushPullNode is invoked to do a state exchange with// a given nodefunc (m *Memberlist) pushPullNode(addr []byte) error {   // Attempt to send and receive with the node//发送并获取状态信息   remote, err := m.sendAndReceiveState(addr)   if err != nil {      return nil   }   // Merge the state//合并更新节点状态信息   m.mergeState(remote)   return nil}
// sendState is used to initiate a push/pull over TCP with a remote nodefunc (m *Memberlist) sendAndReceiveState(addr []byte) ([]pushNodeState, error) {   // Attempt to connect//创建tcp client链接   dialer := net.Dialer{Timeout: m.config.TCPTimeout}   dest := net.TCPAddr{IP: addr, Port: m.config.TCPPort}   conn, err := dialer.Dial("tcp", dest.String())   if err != nil {      return nil, err   }   // Send our state//发送本地节点状态信息   if err := m.sendLocalState(conn); err != nil {      return nil, err   }   // Read remote state//读取Remote节点状态信息并返回remote, err := readRemoteState(conn)   if err != nil {      return nil, err   }   // Return the remote state   return remote, nil}
 
gossip

节点通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。发送次数参考Config 里的 RetransmitMul的注释。

https://www.jianshu.com/p/e2173b44db65

// gossip is invoked every GossipInterval period to broadcast our gossip// messages to a few random nodes.func (m *Memberlist) gossip() {   // Get some random live nodes   m.nodeLock.RLock()   excludes := []string{m.config.Name}//随机获取gossipNodes配置项个数的节点   kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes)   m.nodeLock.RUnlock()   // Compute the bytes available   bytesAvail := udpSendBuf - compoundHeaderOverhead   for _, node := range kNodes {      // Get any pending broadcasts//获取能够广播消息大小      msgs := m.getBroadcasts(compoundOverhead, bytesAvail)      if len(msgs) == 0 {         return      }      // Create a compound message//创建一个合并的消息      compound := makeCompoundMessage(msgs)      // Send the compound message      destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}//发送消息      if err := m.rawSendMsg(destAddr, compound); err != nil {         log.Printf("[ERR] Failed to send gossip to %s: %s", destAddr, err)      }   }}
 
 
那么节点的三个状态有
alive
用于标识活跃节点
aliveNode
// aliveNode is invoked by the network layer when we get a message// about a live nodefunc (m *Memberlist) aliveNode(a *alive) {   m.nodeLock.Lock()   defer m.nodeLock.Unlock()在节点信息中进行查找   state, ok := m.nodeMap[a.Node]   // Check if we've never seen this node before  if !ok {//以下为添加新的节点       state = &NodeState{         Node: Node{            Name: a.Node,            Addr: a.Addr,         },         State: StateDead,      }      // Add to map      m.nodeMap[a.Node] = state      // Get a random offset. This is important to ensure      // the failure detection bound is low on average. If all      // nodes did an append, failure detection bound would be      // very high.      n := len(m.nodes)      offset := randomOffset(n)      // Add at the end and swap with the node at the offset      m.nodes = append(m.nodes, state)      m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]   }   // Bail if the incarnation number is old//inc若是更新的则返回if a.Incarnation <= state.Incarnation {      return   }//inc信息为旧的,则广播alivemsg   // Re-Broadcast   m.encodeAndBroadcast(a.Node, aliveMsg, a)   // Update the state and incarnation number   oldState := state.State   state.Incarnation = a.Incarnation   if state.State != StateAlive {      state.State = StateAlive      state.StateChange = time.Now()   }//节点状态发生变化,则通知到joinch   // if Dead -> Alive, notify of join   if oldState == StateDead {      notify(m.config.JoinCh, &state.Node)   }}
 
suspect

当探测一些节点失败时,或者suspect某个节点的信息时,会将本地对应的信息标记为suspect,然后启动一个 定时器,并发出一个suspect广播,此期间内如果收到其他节点发来的相同的suspect信息时,将本地suspect的 确认数+1,当定时器超时后,该节点信息仍然不是alive的,且确认数达到要求,会将该节点标记为dead。 当本节点收到别的节点发来的suspect消息时,会发送alive广播,从而清除其他节点上的suspect标记。

https://www.colabug.com/1010287.html

suspectNode

// suspectNode is invoked by the network layer when we get a message// about a suspect nodefunc (m *Memberlist) suspectNode(s *suspect) {   m.nodeLock.Lock()   defer m.nodeLock.Unlock()   state, ok := m.nodeMap[s.Node]   // If we've never heard about this node before, ignore it   if !ok {      return   }   // Ignore old incarnation numbers   if s.Incarnation < state.Incarnation {      return   }   // Ignore non-alive nodes   if state.State != StateAlive {      return   }   // If this is us we need to refute, otherwise re-broadcast   if state.Name == m.config.Name {      inc := m.nextIncarnation()      a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr}      m.encodeAndBroadcast(s.Node, aliveMsg, a)      state.Incarnation = inc      return // Do not mark ourself suspect   } else {      m.encodeAndBroadcast(s.Node, suspectMsg, s)   }   // Update the state   state.Incarnation = s.Incarnation   state.State = StateSuspect   changeTime := time.Now()   state.StateChange = changeTime   // Setup a timeout for this   timeout := suspicionTimeout(m.config.SuspicionMult, len(m.nodes), m.config.ProbeInterval)   time.AfterFunc(timeout, func() {      if state.State == StateSuspect && state.StateChange == changeTime {         m.suspectTimeout(state)      }   })}
 
dead

当本节点离开集群时或者本地探测的其他节点超时被标记死亡,会向集群发送本节点dead广播。收到dead广播 消息的节点会跟本地的记录比较,当本地记录也是dead时会忽略消息,当本地的记录不是dead时,会删除本地 的记录再将dead消息再次广播出去,形成再次传播。 如果从其他节点收到自身的dead广播消息时,说明本节点相对于其他节点网络分区,此时会发起一个alive广播 以修正其他节点上存储的本节点数据。

https://www.colabug.com/1010287.html

 

deadNode
// deadNode is invoked by the network layer when we get a message// about a dead nodefunc (m *Memberlist) deadNode(d *dead) {   m.nodeLock.Lock()   defer m.nodeLock.Unlock()   state, ok := m.nodeMap[d.Node]   // If we've never heard about this node before, ignore it   if !ok {      return   }   // Ignore old incarnation numbers   if d.Incarnation < state.Incarnation {      return   }   // Ignore if node is already dead   if state.State == StateDead {      return   }   // If this is us we need to refute, otherwise re-broadcast   if state.Name == m.config.Name && !m.leave {      inc := m.nextIncarnation()      a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr}      m.encodeAndBroadcast(d.Node, aliveMsg, a)      state.Incarnation = inc      return // Do not mark ourself dead   } else {      m.encodeAndBroadcast(d.Node, deadMsg, d)   }   // Update the state   state.Incarnation = d.Incarnation   state.State = StateDead   state.StateChange = time.Now()   // Notify of death//将dead节点信息通知到LeaveCh   notify(m.config.LeaveCh, &state.Node)}
github.com/hashicorp/memberlist/broadcast.go
broadcast模块是广播模块
提供了三个函数,最主要的函数是
getBroadcasts
返回一个广播的最大size,主要是用于填充udp包。很简单代码如下
// getBroadcasts is used to return a slice of broadcasts to send up to// a maximum byte size, while imposing a per-broadcast overhead. This is used// to fill a UDP packet with piggybacked datafunc (m *Memberlist) getBroadcasts(overhead, limit int) []*bytes.Buffer {   m.broadcastLock.Lock()   defer m.broadcastLock.Unlock()   transmitLimit := retransmitLimit(m.config.RetransmitMult, len(m.nodes))   bytesUsed := 0   var toSend []*bytes.Buffer   for i := len(m.bcQueue) - 1; i >= 0; i-- {      // Check if this is within our limits      b := m.bcQueue[i]      if bytesUsed+overhead+b.msg.Len() > limit {         continue      }      // Add to slice to send      bytesUsed += overhead + b.msg.Len()      toSend = append(toSend, b.msg)      // Check if we should stop transmission      b.transmits++      if b.transmits >= transmitLimit {         n := len(m.bcQueue)         m.bcQueue[i], m.bcQueue[n-1] = m.bcQueue[n-1], nil         m.bcQueue = m.bcQueue[:n-1]      }   }   // If we are sending anything, we need to re-sort to deal   // with adjusted transmit counts   if len(toSend) > 0 {      m.bcQueue.Sort()   }   return toSend}
 
 
总结:
上面这个代码版本,算是一个比较小集的版本,功能模块分的比较清晰
但其还未提供对用户的使用接口。在其后续的版本中,模块代码做了一
些调整,并抽象出了一个接口,供给使用者。
 
关于Delegate的接口使用,后续会有新的文章案例来说明。
 

 

 

 

龚浩华

月牙寂道长

qq:29185807

2019年06月13日

如果你觉得本文对你有帮助,可以转到你的朋友圈,让更多人一起学习。

第一时间获取文章,可以关注本人公众号:月牙寂道长,也可以扫码关注

 

 

转载地址:http://ntebi.baihongyu.com/

你可能感兴趣的文章
各种排序算法的分析及java实现
查看>>
SSH框架总结(框架分析+环境搭建+实例源码下载)
查看>>
js弹窗插件
查看>>
自定义 select 下拉框 多选插件
查看>>
js判断数组内是否有重复值
查看>>
js获取url链接携带的参数值
查看>>
gdb 调试core dump
查看>>
gdb debug tips
查看>>
arm linux 生成火焰图
查看>>
jtag dump内存数据
查看>>
linux和windows内存布局验证
查看>>
linux config
查看>>
linux insmod error -1 required key invalid
查看>>
linux kconfig配置
查看>>
linux不同模块completion通信
查看>>
linux printf获得时间戳
查看>>
C语言位扩展
查看>>
linux dump_backtrace
查看>>
linux irqdebug
查看>>
git 常用命令
查看>>