本文共 22847 字,大约阅读时间需要 76 分钟。
本文微信公众号链接:
memberlist是go语言开发的,基于Gossip协议来传播消息,用来管理分布式集群内节点发现、 节点失效探测、节点列表的软件包。
对于Gossip协议之前写过一篇文章:
源码地址 https://github.com/hashicorp/memberlist
为了学习memberlist的原理设计,遵循个人从低版本代码研究的习惯。这里一提交号fe04265为分析。
再次备注:学习早期版本,只是为了学习开源代码的设计原理,底层工作原理。以及版本在进化过程中,源码的改进。
源码目录:
整体代码风格像面向对象c的风格。模块划分刚好以文件名为划分
1、broadcast.go :广播模块
2、net.go:传输与协议处理模块
3、state.go:节点状态管理模块
4、memberlist.go:主模块
github.com/hashicorp/memberlist/memberlist.go
Memberlist
在结构体Memberlist中,成员变量也是按照功能不同分隔
type Memberlist struct {config *Config //配置shutdown bool //本地服务关闭的标志位leave bool //本节点退出的标志位udpListener *net.UDPConntcpListener *net.TCPListener//udp和tcp的链接管理。对应的net.go,传输与协议管理sequenceNum uint32 // Local sequence number//本地seq numincarnation uint32 // Local incarnation number//本地inc numnodeLock sync.RWMutexnodes []*NodeState // Known nodesnodeMap map[string]*NodeState // Maps Addr.String() -> NodeState//node管理以及state管理对应state.gotickerLock sync.Mutextickers []*time.TickerstopTick chan struct{}probeIndex intackLock sync.MutexackHandlers map[uint32]*ackHandlerbroadcastLock sync.MutexbcQueue broadcasts//broadcast管理,对应broadcast.go}
Config
在config中,前面是一些基本的配置项,注释也都有解释。
type Config struct {Name string // Node name (FQDN)BindAddr string // Binding addressUDPPort int // UDP port to listen onTCPPort int // TCP port to listen onTCPTimeout time.Duration // TCP timeoutIndirectChecks int // Number of indirect checks to useRetransmitMult int // Retransmits = RetransmitMult * log(N+1)SuspicionMult int // Suspicion time = SuspcicionMult * log(N+1) * IntervalPushPullInterval time.Duration // How often we do a Push/Pull updateRTT time.Duration // 99% precentile of round-trip-timeProbeInterval time.Duration // Failure probing interval lengthGossipNodes int // Number of nodes to gossip to per GossipIntervalGossipInterval time.Duration // Gossip interval for non-piggyback messages (only if GossipNodes > 0)JoinCh chan<- *NodeLeaveCh chan<- *Node}
在最后两行
JoinCh:这个是对外提供的一个接口,用于做新增node的时候,作为外部注册通知处理
LeaveCh:这个是对外提供的一个接口,用于做对去除一个node的时候,做为外部注册通知处理
这两个chan,在更早的版本中是在结构体memberlist中。后来移到了config中。
开始进入流程
Create
// Create will start memberlist and create a new gossip pool, but// will not connect to an existing node. This should only be used// for the first node in the cluster.func Create(conf *Config) (*Memberlist, error) { m, err := newMemberlist(conf)//newMemberlist,中开启了tcplisten和udplisten if err != nil { return nil, err } if err := m.setAlive(); err != nil { m.Shutdown() return nil, err } m.schedule()//schedule中开启了三个服务:probe、pushpull、gossip return m, nil}
这里面有两个重要步骤
1、newMemberlist
2、m.schedule
newMemberlist
// newMemberlist creates the network listeners.// Does not schedule exeuction of background maintenence.func newMemberlist(conf *Config) (*Memberlist, error) { tcpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.TCPPort) tcpLn, err := net.Listen("tcp", tcpAddr) if err != nil { return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err) }//上面是创建tcplisten udpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.UDPPort) udpLn, err := net.ListenPacket("udp", udpAddr) if err != nil { tcpLn.Close() return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err) }//上面是创建udplisten m := &Memberlist{config: conf, udpListener: udpLn.(*net.UDPConn), tcpListener: tcpLn.(*net.TCPListener), nodeMap: make(map[string]*NodeState), stopTick: make(chan struct{}, 32), ackHandlers: make(map[uint32]*ackHandler), }//构建Memberlist实例 go m.tcpListen() //开启tcp服务 go m.udpListen() //开启udp服务 return m, nil}
在newMemberlist中,最主要的动作就是开启了tcp服务和udp服务
那么就看看net服务(tcp和udp)
github.com/hashicorp/memberlist/net.go
tcp
tcplisten
// tcpListen listens for and handles incoming connectionsfunc (m *Memberlist) tcpListen() { for { //tcp accept conn, err := m.tcpListener.AcceptTCP() if err != nil { if m.shutdown { break } log.Printf("[ERR] Error accepting TCP connection: %s", err) continue } //每个链接都有一个处理部分handleConn go m.handleConn(conn) }}
继续看
handleConn
// handleConn handles a single incoming TCP connectionfunc (m *Memberlist) handleConn(conn *net.TCPConn) { defer conn.Close()//读取Remote的状态 remoteNodes, err := readRemoteState(conn) if err != nil { log.Printf("[ERR] Failed to receive remote state: %s", err) return }//发送本地节点的状态 if err := m.sendLocalState(conn); err != nil { log.Printf("[ERR] Failed to push local state: %s", err) }//将收到的Remote状态进行更新 m.mergeState(remoteNodes)}
tcp服务提供的功能就是:同步节点状态。
readRemoteState
读取节点状态信息,并返回
// recvRemoteState is used to read the remote state from a connectionfunc readRemoteState(conn net.Conn) ([]pushNodeState, error) {// Read the message type//读取数据 buf := []byte{0} if _, err := conn.Read(buf); err != nil { return nil, err }//读取消息类型 msgType := uint8(buf[0])// Quit if not push/pull//支持push和pull消息 if msgType != pushPullMsg { err := fmt.Errorf("received invalid msgType (%d)", msgType) return nil, err }// Read the push/pull header//解码 var header pushPullHeader hd := codec.MsgpackHandle{} dec := codec.NewDecoder(conn, &hd) if err := dec.Decode(&header); err != nil { return nil, err }// Allocate space for the transfer//解码所有的节点信息 remoteNodes := make([]pushNodeState, header.Nodes)// Try to decode all the states for i := 0; i < header.Nodes; i++ { if err := dec.Decode(&remoteNodes[i]); err != nil { return remoteNodes, err } }//返回节点状态信息 return remoteNodes, nil}
sendLocalState
发送本地存储的节点状态信息
// sendLocalState is invoked to send our local state over a tcp connectionfunc (m *Memberlist) sendLocalState(conn net.Conn) error {// Prepare the local node state//收集本地存储的节点状态信息 m.nodeLock.RLock() localNodes := make([]pushNodeState, len(m.nodes)) for idx, n := range m.nodes { localNodes[idx].Name = n.Name localNodes[idx].Addr = n.Addr localNodes[idx].Incarnation = n.Incarnation localNodes[idx].State = n.State } m.nodeLock.RUnlock()//添加头部信息// Send our node state header := pushPullHeader{Nodes: len(localNodes)} hd := codec.MsgpackHandle{} enc := codec.NewEncoder(conn, &hd)// Begin state push conn.Write([]byte{pushPullMsg})//编码并发送 if err := enc.Encode(&header); err != nil { return err } for i := 0; i < header.Nodes; i++ { if err := enc.Encode(&localNodes[i]); err != nil { return err } } return nil}
mergeState
更新节点状态
// mergeState is invoked by the network layer when we get a Push/Pull// state transferfunc (m *Memberlist) mergeState(remote []pushNodeState) { for _, r := range remote { // Look for a matching local node m.nodeLock.RLock() local, ok := m.nodeMap[r.Name] m.nodeLock.RUnlock() // Skip if we agree on states if ok && local.State == r.State { //若状态与本地存储状态一直,则跳过 continue }//三种状态 switch r.State {//StateAlive case StateAlive: a := alive{Incarnation: r.Incarnation, Node: r.Name, Addr: r.Addr} m.aliveNode(&a)//StateSupect case StateSuspect: s := suspect{Incarnation: r.Incarnation, Node: r.Name} m.suspectNode(&s)//StateDead case StateDead: d := dead{Incarnation: r.Incarnation, Node: r.Name} m.deadNode(&d) } }}
存在三种状态:
StateAlive:处理函数aliveNode
StateSupect:处理函数supectNode
StateDead:处理函数deadNode
这三者的处理在后续解读
tcp小结:
tcp链接,主要处理节点状态信息的同步与更新。
udp
udpListen
代码还是很简单的,不断读取数据,然后交给handleCommand处理
// udpListen listens for and handles incoming UDP packetsfunc (m *Memberlist) udpListen() { mainBuf := make([]byte, udpBufSize) var n int var addr net.Addr var err error for { // Reset buffer buf := mainBuf[0:udpBufSize] // Read a packet//不断从udplisten中读取数据 n, addr, err = m.udpListener.ReadFrom(buf) if err != nil { if m.shutdown { break } log.Printf("[ERR] Error reading UDP packet: %s", err) continue } // Check the length if n < 1 { log.Printf("[ERR] UDP packet too short (%d bytes). From: %s", len(buf), addr) continue } // Handle the command//真正处理部分 m.handleCommand(buf[:n], addr) }}
handleCommand
func (m *Memberlist) handleCommand(buf []byte, from net.Addr) { // Decode the message type//解码消息类型 msgType := uint8(buf[0]) buf = buf[1:] // Switch on the msgType//根据消息不同消息类型,进行不同的处理switch msgType { case compoundMsg: m.handleCompound(buf, from) case pingMsg: m.handlePing(buf, from) case indirectPingMsg: m.handleIndirectPing(buf, from) case ackRespMsg: m.handleAck(buf, from) case suspectMsg: m.handleSuspect(buf, from) case aliveMsg: m.handleAlive(buf, from) case deadMsg: m.handleDead(buf, from) default: log.Printf("[ERR] UDP msg type (%d) not supported. From: %s", msgType, from) }}
一共有:
compoundMsg:处理函数为handleCompound
多个消息聚合在一起,进行分割,然后再重新调用handleCommand
func (m *Memberlist) handleCompound(buf []byte, from net.Addr) { // Decode the parts//消息分割 trunc, parts, err := decodeCompoundMessage(buf) if err != nil { log.Printf("[ERR] Failed to decode compound request: %s", err) return } // Log any truncation if trunc > 0 { log.Printf("[WARN] Compound request had %d truncated messages", trunc) } // Handle each message for _, part := range parts {//分割的消息重新调用handleCommand m.handleCommand(part, from) }}
pingMsg:处理函数为:handlePing
indirectPingMsg:处理函数为handleindirectPing
ackRespMsg:处理函数为handleAck
上面三个消息,都比较简单
suspectMsg:处理函数handleSuspect
调用的函数为suspectNode
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) { var sus suspect if err := decode(buf, &sus); err != nil { log.Printf("[ERR] Failed to decode suspect message: %s", err) return } m.suspectNode(&sus)}
aliveMsg:处理函数handleAlive
调用的函数为aliveNode
func (m *Memberlist) handleAlive(buf []byte, from net.Addr) { var live alive if err := decode(buf, &live); err != nil { log.Printf("[ERR] Failed to decode alive message: %s", err) return } m.aliveNode(&live)}
deadMsg:处理函数handleDead
调用的函数为deadNode
func (m *Memberlist) handleDead(buf []byte, from net.Addr) { var d dead if err := decode(buf, &d); err != nil { log.Printf("[ERR] Failed to decode dead message: %s", err) return } m.deadNode(&d)}
udp小结:
udp服务提供了一些基本的Command操作
github.com/hashicorp/memberlist/state.go
节点状态信息管理
在github.com/hashicorp/memberlist/memberlist.go中
Create,最后调用的函数schedule
// Schedule is used to ensure the Tick is performed periodicallyfunc (m *Memberlist) schedule() { m.tickerLock.Lock() defer m.tickerLock.Unlock() // Create a new probeTicker//开启了probe协程 if m.config.ProbeInterval > 0 { t := time.NewTicker(m.config.ProbeInterval) go m.triggerFunc(t.C, m.probe) m.tickers = append(m.tickers, t) } // Create a push pull ticker if needed//开启了pushpull协程 if m.config.PushPullInterval > 0 { t := time.NewTicker(m.config.PushPullInterval) go m.triggerFunc(t.C, m.pushPull) m.tickers = append(m.tickers, t) } // Create a gossip ticker if needed//开启了gossip协程 if m.config.GossipNodes > 0 { t := time.NewTicker(m.config.GossipInterval) go m.triggerFunc(t.C, m.gossip) m.tickers = append(m.tickers, t) }}
在这里面一共开启了三个定时任务
probe、pushpull、gossip
probe
当节点启动后,每隔一定时间间隔,会选取一个节点对其发送PING消息,当PING消息失败后,会随机选取 IndirectChecks 个节点发起间接PING的请求和直接更其再发起一个tcp PING消息。 收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。 如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。
https://www.colabug.com/1010287.html
// Tick is used to perform a single round of failure detection and gossipfunc (m *Memberlist) probe() { // Track the number of indexes we've considered probing numCheck := 0START: // Make sure we don't wrap around infinitely if numCheck >= len(m.nodes) { return } // Handle the wrap around case//probeIndex是node索引,循环进行探测 if m.probeIndex >= len(m.nodes) { m.resetNodes() m.probeIndex = 0 numCheck++ goto START } // Determine if we should probe this node skip := false var node *NodeState m.nodeLock.RLock() node = m.nodes[m.probeIndex] if node.Name == m.config.Name { skip = true//当node在配置文件中} else if node.State == StateDead { skip = true//当node为dead时候} // Potentially skip m.nodeLock.RUnlock() if skip {//node在配置文件中或者为dead时候则跳过 numCheck++ m.probeIndex++ goto START } // Probe the specific node//进行probem.probeNode(node)}
// probeNode handles a single round of failure checking on a nodefunc (m *Memberlist) probeNode(node *NodeState) { // Send a ping to the node ping := ping{SeqNo: m.nextSeqNo()} destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort} // Setup an ack handler ackCh := make(chan bool, m.config.IndirectChecks+1) m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval) // Send the ping message//发送pingMsg if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil { log.Printf("[ERR] Failed to send ping: %s", err) return } // Wait for response or round-trip-time select { case v := <-ackCh: if v == true { return } case <-time.After(m.config.RTT): } // Get some random live nodes m.nodeLock.RLock() excludes := []string{m.config.Name, node.Name}//随机获取一些节点 kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes) m.nodeLock.RUnlock() // Attempt an indirect ping ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr} for _, peer := range kNodes { destAddr := &net.UDPAddr{IP: peer.Addr, Port: m.config.UDPPort}//发送indirectPingMsg if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil { log.Printf("[ERR] Failed to send indirect ping: %s", err) } } // Wait for the acks or timeout select { case v := <-ackCh: if v == true { return } } // No acks received from target, suspect s := suspect{Incarnation: node.Incarnation, Node: node.Name}//若探测结果失败则将node设置为suspect m.suspectNode(&s)}
pushpull
每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,然后将本地的全部节点 状态、用户数据发送过去,然后对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。 此动作可以加速集群内信息的收敛速度。
https://www.jianshu.com/p/e2173b44db65
// pushPull is invoked periodically to randomly perform a state// exchange. Used to ensure a high level of convergence.func (m *Memberlist) pushPull() { // Get a random live node m.nodeLock.RLock() excludes := []string{m.config.Name}//随机选取1个节点 nodes := kRandomNodes(1, excludes, m.nodes) m.nodeLock.RUnlock() // If no nodes, bail if len(nodes) == 0 { return } node := nodes[0] // Attempt a push pull//调用pushPullNodeif err := m.pushPullNode(node.Addr); err != nil { log.Printf("[ERR] Push/Pull with %s failed: %s", node.Name, err) }}
上面随机选取一个节点
// pushPullNode is invoked to do a state exchange with// a given nodefunc (m *Memberlist) pushPullNode(addr []byte) error { // Attempt to send and receive with the node//发送并获取状态信息 remote, err := m.sendAndReceiveState(addr) if err != nil { return nil } // Merge the state//合并更新节点状态信息 m.mergeState(remote) return nil}
// sendState is used to initiate a push/pull over TCP with a remote nodefunc (m *Memberlist) sendAndReceiveState(addr []byte) ([]pushNodeState, error) { // Attempt to connect//创建tcp client链接 dialer := net.Dialer{Timeout: m.config.TCPTimeout} dest := net.TCPAddr{IP: addr, Port: m.config.TCPPort} conn, err := dialer.Dial("tcp", dest.String()) if err != nil { return nil, err } // Send our state//发送本地节点状态信息 if err := m.sendLocalState(conn); err != nil { return nil, err } // Read remote state//读取Remote节点状态信息并返回remote, err := readRemoteState(conn) if err != nil { return nil, err } // Return the remote state return remote, nil}
gossip
节点通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。发送次数参考Config 里的 RetransmitMul的注释。
https://www.jianshu.com/p/e2173b44db65
// gossip is invoked every GossipInterval period to broadcast our gossip// messages to a few random nodes.func (m *Memberlist) gossip() { // Get some random live nodes m.nodeLock.RLock() excludes := []string{m.config.Name}//随机获取gossipNodes配置项个数的节点 kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes) m.nodeLock.RUnlock() // Compute the bytes available bytesAvail := udpSendBuf - compoundHeaderOverhead for _, node := range kNodes { // Get any pending broadcasts//获取能够广播消息大小 msgs := m.getBroadcasts(compoundOverhead, bytesAvail) if len(msgs) == 0 { return } // Create a compound message//创建一个合并的消息 compound := makeCompoundMessage(msgs) // Send the compound message destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}//发送消息 if err := m.rawSendMsg(destAddr, compound); err != nil { log.Printf("[ERR] Failed to send gossip to %s: %s", destAddr, err) } }}
那么节点的三个状态有
alive
用于标识活跃节点
aliveNode
// aliveNode is invoked by the network layer when we get a message// about a live nodefunc (m *Memberlist) aliveNode(a *alive) { m.nodeLock.Lock() defer m.nodeLock.Unlock()在节点信息中进行查找 state, ok := m.nodeMap[a.Node] // Check if we've never seen this node before if !ok {//以下为添加新的节点 state = &NodeState{ Node: Node{ Name: a.Node, Addr: a.Addr, }, State: StateDead, } // Add to map m.nodeMap[a.Node] = state // Get a random offset. This is important to ensure // the failure detection bound is low on average. If all // nodes did an append, failure detection bound would be // very high. n := len(m.nodes) offset := randomOffset(n) // Add at the end and swap with the node at the offset m.nodes = append(m.nodes, state) m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset] } // Bail if the incarnation number is old//inc若是更新的则返回if a.Incarnation <= state.Incarnation { return }//inc信息为旧的,则广播alivemsg // Re-Broadcast m.encodeAndBroadcast(a.Node, aliveMsg, a) // Update the state and incarnation number oldState := state.State state.Incarnation = a.Incarnation if state.State != StateAlive { state.State = StateAlive state.StateChange = time.Now() }//节点状态发生变化,则通知到joinch // if Dead -> Alive, notify of join if oldState == StateDead { notify(m.config.JoinCh, &state.Node) }}
suspect
当探测一些节点失败时,或者suspect某个节点的信息时,会将本地对应的信息标记为suspect,然后启动一个 定时器,并发出一个suspect广播,此期间内如果收到其他节点发来的相同的suspect信息时,将本地suspect的 确认数+1,当定时器超时后,该节点信息仍然不是alive的,且确认数达到要求,会将该节点标记为dead。 当本节点收到别的节点发来的suspect消息时,会发送alive广播,从而清除其他节点上的suspect标记。
https://www.colabug.com/1010287.html
suspectNode
// suspectNode is invoked by the network layer when we get a message// about a suspect nodefunc (m *Memberlist) suspectNode(s *suspect) { m.nodeLock.Lock() defer m.nodeLock.Unlock() state, ok := m.nodeMap[s.Node] // If we've never heard about this node before, ignore it if !ok { return } // Ignore old incarnation numbers if s.Incarnation < state.Incarnation { return } // Ignore non-alive nodes if state.State != StateAlive { return } // If this is us we need to refute, otherwise re-broadcast if state.Name == m.config.Name { inc := m.nextIncarnation() a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr} m.encodeAndBroadcast(s.Node, aliveMsg, a) state.Incarnation = inc return // Do not mark ourself suspect } else { m.encodeAndBroadcast(s.Node, suspectMsg, s) } // Update the state state.Incarnation = s.Incarnation state.State = StateSuspect changeTime := time.Now() state.StateChange = changeTime // Setup a timeout for this timeout := suspicionTimeout(m.config.SuspicionMult, len(m.nodes), m.config.ProbeInterval) time.AfterFunc(timeout, func() { if state.State == StateSuspect && state.StateChange == changeTime { m.suspectTimeout(state) } })}
dead
当本节点离开集群时或者本地探测的其他节点超时被标记死亡,会向集群发送本节点dead广播。收到dead广播 消息的节点会跟本地的记录比较,当本地记录也是dead时会忽略消息,当本地的记录不是dead时,会删除本地 的记录再将dead消息再次广播出去,形成再次传播。 如果从其他节点收到自身的dead广播消息时,说明本节点相对于其他节点网络分区,此时会发起一个alive广播 以修正其他节点上存储的本节点数据。
https://www.colabug.com/1010287.html
deadNode
// deadNode is invoked by the network layer when we get a message// about a dead nodefunc (m *Memberlist) deadNode(d *dead) { m.nodeLock.Lock() defer m.nodeLock.Unlock() state, ok := m.nodeMap[d.Node] // If we've never heard about this node before, ignore it if !ok { return } // Ignore old incarnation numbers if d.Incarnation < state.Incarnation { return } // Ignore if node is already dead if state.State == StateDead { return } // If this is us we need to refute, otherwise re-broadcast if state.Name == m.config.Name && !m.leave { inc := m.nextIncarnation() a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr} m.encodeAndBroadcast(d.Node, aliveMsg, a) state.Incarnation = inc return // Do not mark ourself dead } else { m.encodeAndBroadcast(d.Node, deadMsg, d) } // Update the state state.Incarnation = d.Incarnation state.State = StateDead state.StateChange = time.Now() // Notify of death//将dead节点信息通知到LeaveCh notify(m.config.LeaveCh, &state.Node)}
github.com/hashicorp/memberlist/broadcast.go
broadcast模块是广播模块
提供了三个函数,最主要的函数是
getBroadcasts
返回一个广播的最大size,主要是用于填充udp包。很简单代码如下
// getBroadcasts is used to return a slice of broadcasts to send up to// a maximum byte size, while imposing a per-broadcast overhead. This is used// to fill a UDP packet with piggybacked datafunc (m *Memberlist) getBroadcasts(overhead, limit int) []*bytes.Buffer { m.broadcastLock.Lock() defer m.broadcastLock.Unlock() transmitLimit := retransmitLimit(m.config.RetransmitMult, len(m.nodes)) bytesUsed := 0 var toSend []*bytes.Buffer for i := len(m.bcQueue) - 1; i >= 0; i-- { // Check if this is within our limits b := m.bcQueue[i] if bytesUsed+overhead+b.msg.Len() > limit { continue } // Add to slice to send bytesUsed += overhead + b.msg.Len() toSend = append(toSend, b.msg) // Check if we should stop transmission b.transmits++ if b.transmits >= transmitLimit { n := len(m.bcQueue) m.bcQueue[i], m.bcQueue[n-1] = m.bcQueue[n-1], nil m.bcQueue = m.bcQueue[:n-1] } } // If we are sending anything, we need to re-sort to deal // with adjusted transmit counts if len(toSend) > 0 { m.bcQueue.Sort() } return toSend}
总结:
上面这个代码版本,算是一个比较小集的版本,功能模块分的比较清晰
但其还未提供对用户的使用接口。在其后续的版本中,模块代码做了一
些调整,并抽象出了一个接口,供给使用者。
关于Delegate的接口使用,后续会有新的文章案例来说明。
龚浩华
月牙寂道长
qq:29185807
2019年06月13日
如果你觉得本文对你有帮助,可以转到你的朋友圈,让更多人一起学习。
第一时间获取文章,可以关注本人公众号:月牙寂道长,也可以扫码关注
转载地址:http://ntebi.baihongyu.com/