go-kit实践之5:go-kit微服务请求跟踪实现 - Go语言中文社区

go-kit实践之5:go-kit微服务请求跟踪实现


一、介绍

go-kit 提供了两种tracing请求跟踪

1、opentracing【跟踪标准】

2、zipkin【zipkin的go封装】 
我们下面来介绍下zipkin在go-kit中的使用方法。

二、zipkin安装启动

1、ZipKin入门介绍

Zipkin是一款开源的分布式实时数据追踪系统(Distributed Tracking System),基于 Google Dapper的论文设计而来,由 Twitter 公司开发贡献。其主要功能是聚集来自各个异构系统的实时监控数据。分布式跟踪系统还有其他比较成熟的实现,例如:Naver的Pinpoint、Apache的HTrace、阿里的鹰眼Tracing、京东的Hydra、新浪的Watchman,美团点评的CAT,skywalking等。

2、ZipKin架构

ZipKin可以分为两部分,一部分是zipkin server,用来作为数据的采集存储、数据分析与展示;zipkin client是zipkin基于不同的语言及框架封装的一些列客户端工具,这些工具完成了追踪数据的生成与上报功能,架构如下:

Zipkin Server主要包括四个模块:
(1)Collector 接收或收集各应用传输的数据
(2)Storage 存储接受或收集过来的数据,当前支持Memory,MySQL,Cassandra,ElasticSearch等,默认存储在内存中。
(3)API(Query) 负责查询Storage中存储的数据,提供简单的JSON API获取数据,主要提供给web UI使用
(4)Web 提供简单的web界面
服务追踪流程如下:

┌─────────────┐ ┌───────────────────────┐  ┌─────────────┐  ┌──────────────────┐
│ User Code   │ │ Trace Instrumentation │  │ Http Client │  │ Zipkin Collector │
└─────────────┘ └───────────────────────┘  └─────────────┘  └──────────────────┘
       │                 │                         │                 │
           ┌─────────┐
       │ ──┤GET /foo ├─▶ │ ────┐                   │                 │
           └─────────┘         │ record tags
       │                 │ ◀───┘                   │                 │
                           ────┐
       │                 │     │ add trace headers │                 │
                           ◀───┘
       │                 │ ────┐                   │                 │
                               │ record timestamp
       │                 │ ◀───┘                   │                 │
                             ┌─────────────────┐
       │                 │ ──┤GET /foo         ├─▶ │                 │
                             │X-B3-TraceId: aa │     ────┐
       │                 │   │X-B3-SpanId: 6b  │   │     │           │
                             └─────────────────┘         │ invoke
       │                 │                         │     │ request   │
                                                         │
       │                 │                         │     │           │
                                 ┌────────┐          ◀───┘
       │                 │ ◀─────┤200 OK  ├─────── │                 │
                           ────┐ └────────┘
       │                 │     │ record duration   │                 │
            ┌────────┐     ◀───┘
       │ ◀──┤200 OK  ├── │                         │                 │
            └────────┘       ┌────────────────────────────────┐
       │                 │ ──┤ asynchronously report span     ├────▶ │
                             │                                │
                             │{                               │
                             │  "traceId": "aa",              │
                             │  "id": "6b",                   │
                             │  "name": "get",                │
                             │  "timestamp": 1483945573944000,│
                             │  "duration": 386000,           │
                             │  "annotations": [              │
                             │--snip--                        │
                             └────────────────────────────────┘

Instrumented client和server是分别使用了ZipKin Client的服务,Zipkin Client会根据配置将追踪数据发送到Zipkin Server中进行数据存储、分析和展示。

3、ZipKin几个概念

在追踪日志中,有几个基本概念spanId、traceId、parentId

traceId:用来确定一个追踪链的16字符长度的字符串,在某个追踪链中保持不变。

spanId:区域Id,在一个追踪链中spanId可能存在多个,每个spanId用于表明在某个服务中的身份,也是16字符长度的字符串。

parentId:在跨服务调用者的spanId会传递给被调用者,被调用者会将调用者的spanId作为自己的parentId,然后自己再生成spanId。

4、window安装zipkin

(1)下载:

下载地址:https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec

(2)运行:

进入到下载的文件路径下,执行:java -jar xxx  (注:xxx:为下载的文件名,譬如下载的版本为:zipkin-server-2.12.8-exec.jar,则执行:java -jar zipkin-server-2.12.8-exec.jar 。特别说明:Java-jar 需要安装jdk)

(3)访问:zipkin Server 运行后默认的访问地址:http://localhost:9411

(4)调用链分析

三、go-kit的zipkin

1、核心代码说明

服务端trace:

//创建zipkin上报管理器
reporter := http.NewReporter("http://localhost:9411/api/v2/spans")

//运行结束,关闭上报管理器的for-select协程
defer reporter.Close()  

//创建trace跟踪器
zkTracer, err := opzipkin.NewTracer(reporter)  

//添加grpc请求的before after finalizer 事件对应要处理的trace操作方法
zkServerTrace := zipkin.GRPCServerTrace(zkTracer)  

//通过options的方式运行trace
bookListHandler := grpctransport.NewServer(  
   bookListEndPoint,  
   decodeRequest,  
   encodeResponse,  
   zkServerTrace,  
)

客户端trance:

与服务端trace的区别在于kitzipkin.GRPCClientTrace

reporter := http.NewReporter("http://localhost:9411/api/v2/spans")  
defer reporter.Close()  

zkTracer, err := opzipkin.NewTracer(reporter)  
zkClientTrace := zipkin.GRPCClientTrace(zkTracer)

可以通过span组装span结构树

parentSpan := zkTracer.StartSpan("bookCaller")  
defer parentSpan.Flush()  
ctx = opzipkin.NewContext(context.Background(), parentSpan)

2、实例

1、protobuf文件及生成对应的go文件


syntax = "proto3";
 
// 请求书详情的参数结构  book_id 32位整形
message BookInfoParams {
    int32 book_id = 1;
}
 
 
// 书详情信息的结构   book_name字符串类型
message BookInfo {
    int32 book_id = 1;
    string  book_name = 2;
}
 
// 请求书列表的参数结构  page、limit   32位整形
message BookListParams {
    int32 page = 1;
    int32 limit = 2;
}
 
 
// 书列表的结构    BookInfo结构数组
message BookList {
    repeated BookInfo book_list = 1;
}
// 定义 获取书详情  和 书列表服务   入参出参分别为上面所定义的结构
service BookService {
    rpc GetBookInfo (BookInfoParams) returns (BookInfo) {}
    rpc GetBookList (BookListParams) returns (BookList) {}
}

生成对应的go语言代码文件:protoc --go_out=plugins=grpc:. book.proto  (其中:protobuf文件名为:book.proto)

2、Server端代码

package main

import (
	"MyKit"
	"context"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/ratelimit"
	"github.com/go-kit/kit/sd/etcdv3"
	"github.com/go-kit/kit/tracing/zipkin"
	grpctransport "github.com/go-kit/kit/transport/grpc"
	opzipkin "github.com/openzipkin/zipkin-go"
	"github.com/openzipkin/zipkin-go/reporter/http"
	"golang.org/x/time/rate"
	"google.golang.org/grpc"
	"math/rand"
	"net"
	"time"
)

type BookServer struct {
	bookListHandler grpctransport.Handler
	bookInfoHandler grpctransport.Handler
}

//通过grpc调用GetBookInfo时,GetBookInfo只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookInfo(ctx context.Context, in *book.BookInfoParams) (*book.BookInfo, error) {
	_, rsp, err := s.bookInfoHandler.ServeGRPC(ctx, in)
	if err != nil {
		return nil, err
	}
	return rsp.(*book.BookInfo), err
}

//通过grpc调用GetBookList时,GetBookList只做数据透传, 调用BookServer中对应Handler.ServeGRPC转交给go-kit处理
func (s *BookServer) GetBookList(ctx context.Context, in *book.BookListParams) (*book.BookList, error) {
	_, rsp, err := s.bookListHandler.ServeGRPC(ctx, in)
	if err != nil {
		return nil, err
	}
	return rsp.(*book.BookList), err
}

//创建bookList的EndPoint
func makeGetBookListEndpoint() endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		rand.Seed(time.Now().Unix())
		randInt := rand.Int63n(200)
		time.Sleep(time.Duration(randInt) * time.Millisecond)
		//请求列表时返回 书籍列表
		bl := new(book.BookList)
		bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 1, BookName: "Go入门到精通"})
		bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "微服务入门到精通"})
		bl.BookList = append(bl.BookList, &book.BookInfo{BookId: 2, BookName: "区块链入门到精通"})
		return bl, nil
	}
}

//创建bookInfo的EndPoint
func makeGetBookInfoEndpoint() endpoint.Endpoint {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		rand.Seed(time.Now().Unix())
		randInt := rand.Int63n(200)
		time.Sleep(time.Duration(randInt) * time.Microsecond)
		//请求详情时返回 书籍信息
		req := request.(*book.BookInfoParams)
		b := new(book.BookInfo)
		b.BookId = req.BookId
		b.BookName = "Go入门系列"
		return b, nil
	}
}

func decodeRequest(_ context.Context, req interface{}) (interface{}, error) {
	return req, nil
}

func encodeResponse(_ context.Context, rsp interface{}) (interface{}, error) {
	return rsp, nil
}

func main() {

	var (
		//etcd服务地址
		etcdServer = "127.0.0.1:2379"
		//服务的信息目录
		prefix = "/services/book/"
		//当前启动服务实例的地址
		instance = "127.0.0.1:50051"
		//服务实例注册的路径
		key = prefix + instance
		//服务实例注册的val
		value = instance
		ctx   = context.Background()
		//服务监听地址
		serviceAddress = ":50051"
	)

	//etcd的连接参数
	options := etcdv3.ClientOptions{
		DialTimeout:   time.Second * 3,
		DialKeepAlive: time.Second * 3,
	}
	//创建etcd连接
	client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
	if err != nil {
		panic(err)
	}

	// 创建注册器
	registrar := etcdv3.NewRegistrar(client, etcdv3.Service{
		Key:   key,
		Value: value,
	}, log.NewNopLogger())

	// 注册器启动注册
	registrar.Register()
	//启动追踪
	reporter := http.NewReporter("http://localhost:9411/api/v2/spans") //追踪地址
	defer reporter.Close()
	zkTracer, err := opzipkin.NewTracer(reporter)     //实例化追踪器
	zkServerTrace := zipkin.GRPCServerTrace(zkTracer) //追踪器Server端
	bookServer := new(BookServer)
	bookListEndPoint := makeGetBookListEndpoint()
	//创建限流器 1r/s  limiter := rate.NewLimiter(rate.Every(time.Second * 1), 100000)
	//通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint
	limiter := rate.NewLimiter(rate.Every(time.Second*1), 1) //限流1秒,临牌数:1
	bookListEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookListEndPoint)

	bookListHandler := grpctransport.NewServer(
		bookListEndPoint,
		decodeRequest,
		encodeResponse,
		zkServerTrace, //添加追踪
	)
	bookServer.bookListHandler = bookListHandler

	bookInfoEndPoint := makeGetBookInfoEndpoint()
	//通过DelayingLimiter中间件,在bookListEndPoint的外层再包裹一层限流的endPoint
	bookInfoEndPoint = ratelimit.NewDelayingLimiter(limiter)(bookInfoEndPoint)
	bookInfoHandler := grpctransport.NewServer(
		bookInfoEndPoint,
		decodeRequest,
		encodeResponse,
		zkServerTrace,
	)
	bookServer.bookInfoHandler = bookInfoHandler

	ls, _ := net.Listen("tcp", serviceAddress)
	gs := grpc.NewServer(grpc.UnaryInterceptor(grpctransport.Interceptor))
	book.RegisterBookServiceServer(gs, bookServer)
	gs.Serve(ls)
}

3、Client端代码

package main

import (
	"MyKit"
	"context"
	"fmt"
	"github.com/afex/hystrix-go/hystrix"
	"github.com/go-kit/kit/circuitbreaker"
	"github.com/go-kit/kit/endpoint"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/sd"
	"github.com/go-kit/kit/sd/etcdv3"
	"github.com/go-kit/kit/sd/lb"
	"github.com/go-kit/kit/tracing/zipkin"
	grpctransport "github.com/go-kit/kit/transport/grpc"
	opzipkin "github.com/openzipkin/zipkin-go"
	"github.com/openzipkin/zipkin-go/reporter/http"
	"google.golang.org/grpc"
	"io"
	"time"
)

func main() {
	commandName := "my-endpoint"
	hystrix.ConfigureCommand(commandName, hystrix.CommandConfig{
		Timeout:                1000 * 30,
		ErrorPercentThreshold:  1,
		SleepWindow:            10000,
		MaxConcurrentRequests:  1000,
		RequestVolumeThreshold: 5,
	})
	breakerMw := circuitbreaker.Hystrix(commandName)

	var (
		//注册中心地址
		etcdServer = "127.0.0.1:2379"
		//监听的服务前缀
		prefix = "/services/book/"
		ctx    = context.Background()
	)
	options := etcdv3.ClientOptions{
		DialTimeout:   time.Second * 3,
		DialKeepAlive: time.Second * 3,
	}
	//连接注册中心
	client, err := etcdv3.NewClient(ctx, []string{etcdServer}, options)
	if err != nil {
		panic(err)
	}
	logger := log.NewNopLogger()
	//创建实例管理器, 此管理器会Watch监听etc中prefix的目录变化更新缓存的服务实例数据
	instancer, err := etcdv3.NewInstancer(client, prefix, logger)
	if err != nil {
		panic(err)
	}
	//创建端点管理器, 此管理器根据Factory和监听的到实例创建endPoint并订阅instancer的变化动态更新Factory创建的endPoint
	endpointer := sd.NewEndpointer(instancer, reqFactory, logger)
	//创建负载均衡器
	balancer := lb.NewRoundRobin(endpointer)

	/**
	我们可以通过负载均衡器直接获取请求的endPoint,发起请求
	reqEndPoint,_ := balancer.Endpoint()
	*/
	/**
	也可以通过retry定义尝试次数进行请求
	*/
	reqEndPoint := lb.Retry(3, 100*time.Second, balancer)

	//增加熔断中间件
	reqEndPoint = breakerMw(reqEndPoint)
	//现在我们可以通过 endPoint 发起请求了

	req := struct{}{}
	for i := 1; i <= 30; i++ {
		if _, err = reqEndPoint(ctx, req); err != nil {
			fmt.Println(err)
		}
	}
}

//通过传入的 实例地址  创建对应的请求endPoint
func reqFactory(instanceAddr string) (endpoint.Endpoint, io.Closer, error) {
	return func(ctx context.Context, request interface{}) (interface{}, error) {
		fmt.Println("请求服务: ", instanceAddr, "当前时间: ", time.Now().Format("2006-01-02 15:04:05.99"))
		conn, err := grpc.Dial(instanceAddr, grpc.WithInsecure())
		if err != nil {
			fmt.Println(err)
			panic("connect error")
		}

		//追踪设置
		reporter := http.NewReporter("http://localhost:9411/api/v2/spans") //追踪地址
		defer reporter.Close()

		zkTracer, err := opzipkin.NewTracer(reporter)     //新建追踪器
		zkClientTrace := zipkin.GRPCClientTrace(zkTracer) //启动追踪器Client端

		bookInfoRequest := grpctransport.NewClient(
			conn,
			"BookService",
			"GetBookInfo",
			func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil },
			func(_ context.Context, out interface{}) (interface{}, error) {
				return out, nil
			},
			book.BookInfo{},
			zkClientTrace, //追踪客户端
		).Endpoint()

		bookListRequest := grpctransport.NewClient(
			conn,
			"BookService",
			"GetBookList",
			func(_ context.Context, in interface{}) (interface{}, error) { return nil, nil },
			func(_ context.Context, out interface{}) (interface{}, error) {
				return out, nil
			},
			book.BookList{},
			zkClientTrace,
		).Endpoint()

		parentSpan := zkTracer.StartSpan("bookCaller")
		defer parentSpan.Flush()

		ctx = opzipkin.NewContext(ctx, parentSpan)
		infoRet, _ := bookInfoRequest(ctx, request)
		bi := infoRet.(*book.BookInfo)
		fmt.Println("获取书籍详情")
		fmt.Println("bookId: 1", " => ", "bookName:", bi.BookName)

		listRet, _ := bookListRequest(ctx, request)
		bl := listRet.(*book.BookList)
		fmt.Println("获取书籍列表")
		for _, b := range bl.BookList {
			fmt.Println("bookId:", b.BookId, " => ", "bookName:", b.BookName)
		}

		return nil, nil
	}, nil, nil
}

3、运行

1、启动etcd   

2、启动zipkin

3、运行Server端

4、运行Client端

5、查看zipkin中的记录

访问http://localhost:9411/zipkin/ 

之后就可以看到记录的数据。

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_42117918/article/details/89233859
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-03-01 22:00:23
  • 阅读 ( 1274 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢