fabric源码解析16——peer的gossip服务之测试 - Go语言中文社区

fabric源码解析16——peer的gossip服务之测试


fabric源码解析16——peer的gossip服务之测试

彻底一点儿

原本打算在《fabric源码分析15》中将gossip服务的主题文章就此结束,毕竟前前后后有一个月了,但是确实仍留有一种不透彻的感觉。最近突然意识到,这种感觉是只看代码却没有测试验证想法所产生的。在《fabric源码分析1》中,就已经把fabric源码分为三类,其中一类就是辅助研究源码的,主要指项目中的test文件,一直没有好好利用。这里需要理解的是,test与源码之间相互关联,要看懂test,最起码对源码有一定理解的基础,否则连test在测什么内容都不知道。这篇文章就借着上篇文章不透彻的困扰,一方面介绍一下如何使用fabric中的test文件,方便读者自行测试自己感到困惑的地方;另一方面佐证对gossip源码的分析,驱除困扰。这里有一些前情概要:

  • 这里叙述的操作系统是Ubuntu16.04,64位。
  • 姑且假设各位和笔者一样,属于go菜鸟(如果大神觉得下文某些地方太小白的话)。各位的go编译已经部署到位。
  • 仍以block消息的传播为例。若涉及的步骤或模块不清楚,请参看《fabric源码分析14》和《fabric源码分析15》。

fabric test

  1. fabric源码使用到的第三方库不用担心,都在源码目录的vendor中。可以将vendor目录下的所有文件直接复制到GOPATH/src下。查看自己系统的GOPATH路径,可以执行go env查看。
  2. fabric源码中import进自身源码的路径均如github.com/hyperledger/fabric/...,因此要在GOPATH/src/github.com中新建hyperledger文件夹,然后将fabric项目整个文件夹以fabric的名字放入hyperledger中。
  3. fabric在编译过程中(test也是先编译然后在执行test)可能会需要一些系统/usr/下的头文件或库,基本上都是vendor下go的第三方库所用到的。这个顺序就是,fabric用go标准库和vendor,vendor用go标准库和另外一些第三方库。各位的系统中并不一定会有vendor使用的一些第三方的库,在编译过程中若出现这样的问题,就需要安装了。这个问题就需要根据错误提示自行安装了,基本上在联网状态下使用apt install xxx就可以搞定安装问题。
  4. go test命令,这里我们主要用到的是单测某个测试函数单测某个测试文件。单测某个测试函数,如/fabric/gossip/gossip/gossip_test.go中的TestDissemination,使用的命令是go test -test.run TestDissemination;单测某个测试文件,如/fabric/gossip/gossip/gossip_test.go,使用的命令是go test gossip_test.go XXX,其中XXX指gossip_test.go测试代码直接用到的各个源码文件。我们这里只使用单测一个测试函数的命令。

gossip test

测试的目的在于验证我们研究的gossip传播消息的过程,最基本的方法就是在适当的地方设置打印点打印输出信息。为了佐证,输出的信息基本上要围绕两点:(1)谁把哪些块消息在什么时候发送给了谁。(2)谁在什么时候接收了来自谁的哪些块消息

测试的难点在于,gossip服务是一个由多个模块相互配合完成各个结点间的消息传播的,各类模块和类型消息可能对打印点形成干扰。而且除了正常的消息散播,还有pull机制等非散播方式的消息传输的影响。应该在何处收集信息来打印还是需要注意的。还有,项目中所给的测试函数的本意不一定是我们想测试的结果,或者为了方便,需要我们进行稍微的修改。接下来以gossip_test.go中的TestDissemination测试函数为例,进行消息散播的测试。

测试函数TestDissemination

TestDissemination从名字上就可以看出来,是专用于测试消息散播的测试函数(以下的代码片段都引自该函数中),所做的是:

  1. 分为上下两部分,上部分主要用于测试传播DataMessage类型的GossipMessage,下部分主要用于LeadershipMessage类型的GossipMessage,中间还夹杂着对最后一个结点lastPeer的UpdateChannelMetadata的测试。这里我们只以DataMessage为例,也即可以指我们所说的block消息。所以可以将//Sending leadership messagest.Log("Stopping peers")之间关于测试LeadershipMessage消息的代码注释掉。
  2. 创建了10个gossip服务实例,一个角色为bootstrap的gossip服务实例,因为是测试,所以就以gossip服务实例代表结点。fabric中,bootstrap角色本是用于新加入频道的结点向bootstrap结点直接索要数据以快速与其他结点达到状态一致的情景的,这里却类似于DeliverService的作用,即作为gossip所散播的消息的来源和起点。而且,这里为了简便和容易分析,我们将结点数发送的消息条数都改为4个,即n := 10改为n := 4msgsCount2Send := 10改为msgsCount2Send := 4,相应的,boot := newGossipInstance(portPrefix, 0, 100)中的100要改为16(100=10x10,16=4x4),这个值指定的是msgstore模块最多存储的block消息数。
  3. for i := 1; i <= n; i++ { pI :=... }每次循环中,创建一个gossip服务实例pI,然后调用pI.Accept从pI中订阅一个专用于接收DataMessage消息的通道acceptChan,并新启一个goroutine,不断接收从acceptChan发来的消息,每接收一条消息,就将pI的接收消息数量槽(参看下文第7步)的值+1,一旦pI收满4条消息,立即wg.Done()并结束这个goroutine。这样,我们有了名为p0-p4五个结点,其中p0为bootstrap结点。对应使用的地址都是本机,端口为3610-3614。
  4. waitUntilOrFail(...,predfunc)函数用于等待,最多等待timeout(全局变量,180s),每隔3s执行一下predfunc,若该函数返回true则立即结束等待。checkPeersMembership(t, peers, n)则是检查每个结点所存储的成员关系是否已经处于饱和(discovery模块要干的事情)。这里所说的饱和指除自己外其余4个结点(包含bootstrap)的身份信息都已存在。有了这些总名单,gossip服务才能从中甄选一些进行散播消息。注意,这里测试中进行了成员关系饱和的等待,而实际操作中,一个结点的消息的散播不会等到成员关系饱和才开始。
  5. 待各个结点成员关系饱和后,for i := 1; i <= msgsCount2Send; i++ { boot.Gossip(...) },用bootstrap结点boot依次散播4条DataMessage消息。这里的DataMessage由createDataMsg生成,只标记了消息的Channel,Tag和SeqNum。第一条消息则SeqNum为1,第二条消息则SeqNum为2,…。
  6. waitUntilOrFailBlocking(t, wg.Wait),等待wg.Wait完成。第3步中每个结点新启的goroutine结束后,即每个结点接收到了4条消息,都会执行wg.Done()。等4个结点都Done过,wg.Wait就完成了,也就结束等待了。所以,这一步是在等待消息的完全传播,即等待每个结点都接收到4条消息。
  7. 接收消息数量槽指的是receivedMessages数组,容量是n,4个结点各占一个,用来表示每个结点接收到的消息数量。assert.Equal(t, msgsCount2Send, receivedMessages[i])就是用来验证在第6步的等待后,每个结点是否都接收到了4条消息,验证的方法就是看看每个结点的接收消息数量槽的值是否等于4。
  8. 对看/fabric/gossip/gossip/channel/channel.go中HandleMessageif m.IsDataMsg() || m.IsStateInfoMsg()分支,这里是每个结点最终处理接收到的DataMessage消息的地方,补充说三点。(1)每个结点只有调用gc.DeMultiplex(m)将接收到的DataMessage出版出来,上文第3步订阅DataMessage消息的acceptChan才会收到消息。而gc.DeMultiplex(m)if added { ... }分支中,即只有added = gc.blockMsgStore.Add()添加消息成功,才会进一步把消息出版。而blockMsgStore(msgstore模块)的存储机制上,当添加一条DataMessage时会验证该条消息是否已经存储,若已经存储则返回false。因此,当成功添加4条消息,说明添加了4条不同的消息,而bootstrap结点又只散播了SeqNum1-SeqNum4,即一个结点成功添加4条消息就是添加的SeqNum1-SeqNum4这4条消息,然后会在每次成功添加后出版这4条消息。如此,一个结点的acceptChan收到4次消息时(此时该节点接收消息数量槽的值为4),证明这个结点已经接收到了SeqNum1-SeqNum4这4条消息。(2)同样的,在添加成功的情况下,一个结点才会进一步调用gc.Gossip()将收到的4条消息进一步向其他结点散播。(3)第6步中,每隔3秒检查一下每个结点是否接收到4条消息,一旦接收到,等待立即结束,接着就会调用waitUntilOrFailBlocking(t, stop)开始停止每个gossip服务实例(关于测试leadership的代码已经注释掉),由于都是本机上的实例,这个操作是非常快的。而我们要测试查看的是每个结点的散播情况,所以这里会有这么一种情况:当最后一个结点刚刚接收完第4条消息时还没来得及进一步gc.Gossip()或没有到达我们的设置的出口打印点(参看下文),就进行完了第6步中的检查并并把这个结点给停掉了,因此我们看到的散播过程也不会完整。所以,这里我们在停止结点前等待3秒(足够了),即在t.Log("Stopping peers")之后紧接着插入一句time.Sleep(time.Second*3)

设置打印点

知道了TestDissemination所做的事情并做了适当的修改后,回归到我们打印点输出信息所要围绕的两点内容。这两点内容可以定位在一个结点发送消息的出口接收消息的入口,在这两个口处设置打印点最容易收集消息,收集的消息也最能表明和佐证gosisp散播消息的过程和方向。发送消息的出口为gossip/gossip_impl.go中的gossipInChan,接收消息的入口为gossip/channel/channel.go中HandleMessageif m.IsDataMsg() || m.IsStateInfoMsg()分支。

1.发送消息出口打印点代码:

    //行首有#的为原始代码,可据此定位。其余的即为自己写的打印点代码。
    str := fmt.Sprintf("33[41;32m%s have ",g.conf.ID)
    for _,mem := range membership {
        str += fmt.Sprintf(" p%s,",mem.Endpoint[11:])
    }
    str = str[:len(str)-1]
    str += "33[0m"
    strstr := fmt.Sprintf("33[41;32m%s select ",g.conf.ID)
    for _,mm := range peers2Send {
        strstr += fmt.Sprintf("p%s,",mm.PKIID[13:])
    }
    strstr += "send "
    msgnum := 0
#   // Send the messages to the remote peers
#   for _, msg := range messagesOfChannel {
#       g.comm.Send(msg, peers2Send...)
        mmm := msg.GetDataMsg()
        if mmm != nil && mmm.Payload != nil {
            strstr += fmt.Sprintf("SeqNum %d,",mmm.Payload.SeqNum)
            msgnum++
        }
#   }
    if msgnum != 0 {
        strstr += fmt.Sprintf("time:%v33[0m",time.Now().UnixNano())
        fmt.Println(str)
        fmt.Println(strstr) 
    }

发送消息出口这个打印点做了3件事:(1)将一个结点有哪些成员名单记录到str中,形成类似“p0 have p0 have p4, p2, p3, p1”的字符串。由于测试代码中执行了checkPeersMembership等待,所以每个节点都会有其余4个结点的信息。(2)将一个结点把哪些块消息在什么时候(以纳秒计)发送给了谁记录在strstr中,形成类似“p0 select p1,p4,p2,send SeqNum 1,SeqNum 2,SeqNum 3,SeqNum 4,time:1505618519599724473”的字符串。(3)若msgnum>0,则说明发送了>=1条的DataMessage消息,则同时打印str和strstr。这里还需要解释的是,这个出口是很多类型消息的出口,因此4条消息有可能不会在一批消息中发送,可能emitter模块发来的一批消息中只有2条是DataMessage类型的消息。代码中的33[41;32m33[0m是带颜色输出格式符号,下同。

2.接收消息入口打印点代码:

    //行首有#的为自己写的打印点代码,其余为原始代码,可据此定位。
    if m.IsDataMsg() || m.IsStateInfoMsg() {
        ...
        if added {
#           if m.IsDataMsg() {
#               strstr := fmt.Sprintf("33[42;31m%s recive SeqNum %d from p%s,time:%v33[0m",
#                       gc.GetConf().ID,
#                       m.GetDataMsg().Payload.SeqNum,
#                       msg.GetConnectionInfo().ID[13:],
#                       time.Now().UnixNano())
#               fmt.Println(strstr)
#           }
            // Forward the message
            gc.Gossip(msg.GetGossipMessage())
            // DeMultiplex to local subscribers
            gc.DeMultiplex(m)
            ...
        }
    }

接收消息入口这个打印点只做了1件事:将一个结点在什么时候接收了来自谁的哪条消息记录到strstr中,形成类似“p3 recive SeqNum 1 from p0,time:1505627597658625898”的字符串并打印出来。这里还需要解释的是,应该在if added分支中(说明消息添加成功了,同时也说明这是第一次收到这条消息,重复收到同一条消息的话进不了这个分支,也不会有进一步操作),并判断接收的是DataMessage类型的消息时才打印信息。

如果时间更长,消息更多,结点更多且每个结点处理消息的效率差异更大,则有可能需要在if m.IsPullMsg() && m.GetPullMsgType()分支中设置打印点,以查看一个结点是否通过pull机制向其他结点索要block消息(因为这样的话这个结点正常的gossip散播步骤可能缺失一部分)。更复杂的,msgstore模块还会自动清理自认为过期信息,等等,这些都会影响上面所设置的打印点的输出。

测试并分析打印信息

接下来我们就可以执行测试了,在终端中切换到gossip_test.go所在的目录中,执行go test -test.run TestDissemination命令,输出的原始信息如下(成功的情况下,因每个结点的散播都是同时进行的,散播过程也是随机的,输出终端又只有一个,所以每次散播的顺序基本上不会相同):

TestDissemination.png

红底绿字的为发送消息出口打印点输出的信息,绿底红字的为接收消息入口打印点输出的信息(大红配大绿,最炫民族风)。从上面的信息可以看出:

  • 每个结点都有其余4个结点的信息。
  • 每个结点都会随机选择3个结点散播消息。这次测试中每个结点只散播了1次,SeqNum1-SeqNum4没有出现分批散播的情况。有时候一个结点因其他类型的消息侵占了emitter模块一批缓存的大部分空间,会把SeqNum1-SeqNum4分两次或更多次散播。
  • 从上到下,从左到右,时间递增。从左到右的时间递增由gossip散播中循环的方法,打印点代码的方法和测试函数散播消息的循环的方法共同保证。

利用excel表对原始输出信息进行整理,把每个结点have的身份信息行删除,把时间点共有的前缀15056275976删除后按升序排列,整理如下:

excel_TestDissemination.png

以p3结点为例,在第3,4,5,7行,p3先后接收了来自p0的SeqNum1,SeqNum2,SeqNum3,SeqNum4消息,并依次调用了gc.Gossip()向emitter模块的缓存中添加消息,以求继续散播这4条消息。到了第21行的时间点,emitter模块由于攒足了一批消息(测试函数中设置的是一批20条,如果设置小点儿,应该会看到更零散的send发送信息),正好这4条DataMessage消息都在这一批消息中,所以p3随机选择了p4,p1,p2三个结点继续散播这4条消息。由于p4,p1,p2三个结点在21行之前均接收了4条消息,所以p3这次继续散播的消息即便到达了这三个结点中,这三个结点也不会added成功,也就不会再继续散播,而从p3来的这次继续散播也就到此为止了(第15,20,22行三个结点散播消息的动作不是因为p3的这次继续散播)。

其余的结点的接收和继续散播遵循同样的规则,均能相互映证。因而,测试对应验证了《fabric源码分析15》中所述的内容。

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/idsuf698987/article/details/78014924
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-01 22:27:50
  • 阅读 ( 1086 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢