gRPC 服务端和客户端源码分析(golang) - Go语言中文社区

gRPC 服务端和客户端源码分析(golang)


第一部分 gRPC介绍

gRPC是什么,A high-performance, open-source universal RPC framework。RPC是什么,remote procedure call,远程过程调用,应用程序之间使用RPC通信,函数调用与本地调用无异。
gRPC提供一套server/client模型通信机制,其特点如下:

  • 使用http2.0作为底层传输协议,支持流式通信,多路复用( 多路复用原理:http1.1基于“文本分割”,服务端读取直至遇到分隔符,一次只能处理一个请求或响应,且数据解析无法预知内存大小。http2.0基于二进制“帧”,定义不同类型帧,请求和响应交错复用。流式通信原理:http2.0连接上独立的、双向的帧序列交流。帧首部6-9字节表流ID,用来标识帧所属的流 );
  • 可使用protobuf定义接口数据,二进制编码,减少需要传输的数据量( protobuf轻量:与json比较,json为key-value,每个key都占用多个字节,int类型固定字节数。protobuf使用tag,编号标记key,仅占一字节,支持Varint,数据按实际长度存储)
  • 解决不同语言间通信的复杂性以及环境的不同
  • 一次性在一个.proto文件中定义服务,并使用任何支持它的语言去实现客户端和服务器

gRPC的使用场景与restful API相同。

第二部分 gRPC通信过程源码分析

gRPC提供一套server/client端通信机制,源码分析立足于分析利用gRPC进行模块调用整个通信过程。结合golang grpc使用样例文章中的代码,梳理gRPC通信过程源码。此部分共分为三个小节,分别是:

  • 客户端及服务端入口代码解析
  • 客户端请求逻辑
  • 服务端处理逻辑

1:Server端及Client端入口代码解析
客户端、服务端代码逻辑
如果项目中,你只需要简单的利用gRPC远程调用函数。由上图可知,用户只需关心一下几点:
服务端:

  • 注册并开启服务
  • 持续监听client请求

客户端:

  • 与server端建立连接
  • 初始化客户端并构建msg
  • 请求服务端,并接收响应数据

其中,客户端请求c.SayHello()指定调用 "/proto.Hello/SayHello"服务端定义的helloService.SayHello()。由helloService.SayHello()的执行逻辑可知,其将client端请求的“hello world!”消息直接返回client端。
那么,client端的request是如何发送值server端,server端接收消息,调用helloService.SayHello()函数后,是如何将消息返回client端的,是接下来我们具体研究的内容。

2、客户端请求逻辑
client调用grpc.Dial(),返回一个ClientConn,底层本质上调用newHTTP2Client,与server建立http2连接。

func newHTTP2Client(), onClose func()) (_ *http2Client, err error) {
    //dial一个tcp连接
	conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
    
    //循环读取帧,并按类型分发入流
	go t.reader()
	// Send connection preface to server.
	n, err := t.conn.Write(clientPreface)
	/*************省略***********/
}

调用grpc.Dial()之后,client和server的连接已经建立起来了,此时需初始化客户端对象:

c := pb.NewHelloClient(conn)//创建helloClient实例

func NewHelloClient(cc grpc.ClientConnInterface) HelloClient {
	return &helloClient{cc}
}

初始化的helloClient向server发起一次请求msg,即调用hello.pb.go文件中的helloClient.SayHello()方法,其最终还是调用grpc.Invoke()。

	r, err := c.SayHello(context.Background(), reqBody)//调用helloClient对象的的SayHello()方法,并传入参数

func (c *helloClient) SayHello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Reply, error) {
	out := new(Reply)
	err := c.cc.Invoke(ctx, "/proto.Hello/SayHello", in, out, opts...)//
	if err != nil {
		return nil, err
	}
	return out, nil
}

grpc.Invoke()最终调用的是invoke()方法,invoke()表示一次请求过程,包括:

  • 新建clientStream对象,标记调用“/proto.Hello/SayHello” Method,监听用户是否关闭此连接
  • 发送请求SendMsg(),底层调用http2client.Write方法发送req。底层调用函数依次为clientStream.SendMsg()->csAttempt.sendMsg()->http2Client.Write()
  • 接收响应数据RecvMsg(),底层调用函数关系依次为RecvMsg()->recvMsg()->recv()->Stream.Read()->io.ReadFull()->io.ReadAtLeast() ->transportReader.Read()->recvBufferReader.Read()->recvBufferReader.readClient()。readClient从接收数据channel里读取数据。接收数据channel数据依赖http2协议写入。
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
	cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
	if err != nil {
		return err
	}
	if err := cs.SendMsg(req); err != nil {
		return err
	}
	return cs.RecvMsg(reply)
}
/**********************SendMsg**********************/
func (cs *clientStream) SendMsg(m interface{}) (err error) {
	err := a.sendMsg(m, hdr, payload, data)
}

func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
	 err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams})
}

func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
	df := &dataFrame{
		streamID:  s.id,
		endStream: opts.Last,
	}
	return t.controlBuf.put(df)
}

/**********************RecvMsg**********************/
func (cs *clientStream) RecvMsg(m interface{}) error {
		return a.recvMsg(m, recvInfo)
}
func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
	select {
	case <-r.ctxDone:
		m := <-r.recv.get()
		return r.readAdditional(m, p)
	case m := <-r.recv.get():
		return r.readAdditional(m, p)
	}
}
//从接收数据channel中读取数据
func (b *recvBuffer) get() <-chan recvMsg {
	return b.c
}

3、服务端处理逻辑
grpc_server.go文件开启服务监听,接收并处理客户端发送的请求

s.Serve(listen)//阻塞监听客户端请求

func (s *Server) Serve(lis net.Listener) error {
     for {
     	rawConn, err := lis.Accept()
     	go func() {
			s.handleRawConn(rawConn)
		}()
     }
}

接下来我们看看server端如何处理客户端发送的请求。调用Server.handleRawConn完成客户端身份认证并进行stream处理。stream处理主要进行handler函数匹配并发送response。

func (s *Server) handleRawConn(rawConn net.Conn) {
	//身份认证并完成握手连接
	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
	st := s.newHTTP2Transport(conn, authInfo)
	if st == nil {
		return
	}
	//stream处理
	go func() {
		s.serveStreams(st)
	}()
}

func (s *Server) serveStreams(st transport.ServerTransport) {
	defer st.Close()
	var wg sync.WaitGroup
	st.HandleStreams(func(stream *transport.Stream) {
		wg.Add(1)
		go func() {
			defer wg.Done()
			s.handleStream(st, stream, s.traceInfo(st, stream))
		}()
	}, func(ctx context.Context, method string) context.Context {
		if !EnableTracing {
			return ctx
		}
		tr := trace.New("grpc.Recv."+methodFamily(method), method)
		return trace.NewContext(ctx, tr)
	})
	wg.Wait()
}

// HandleStreams receives incoming streams using the given handler.
func (ht *serverHandlerTransport) HandleStreams() {
 	handleStream()
 	ht.runStream()
}

HandleStreams调用handleStream()
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
	s.processUnaryRPC(t, stream, srv, md, trInfo)
}

func (s *Server) processUnaryRPC(){
    调用处理函数
    reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
    发送处理数据
    err := s.sendResponse(t, stream, reply, cp, opts, comp)
}

//sendResponse()调用Write()构建发送Msg
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
	return ht.do(func() {
		ht.writeCommonHeaders(s)
		ht.rw.Write(hdr)
		ht.rw.Write(data)
		ht.rw.(http.Flusher).Flush()
	})
}

总结:gRPC提供一套server/client模型通信机制,底层通信依赖http2。样例中server与client端通信依赖protobuf文件,封装http协议客户端与服务端信息交互机制,client端最终调用http2.Write()发送数据,server端接收request,匹配handler函数,发送response结果给client。

下一篇,将对HTTP协议server端与client源码进行分析。

文章同步公众号,欢迎扫码关注
在这里插入图片描述

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/stayfoolish_yj/article/details/104394702
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-06-28 00:28:19
  • 阅读 ( 2182 )
  • 分类:Go

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢