社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
目录
RPC 框架的目标就是让远程服务调用更加简单、透明,RPC 框架负责屏蔽底层的传输方式(TCP 或者 UDP)、序列化方式(XML/Json/ 二进制)和通信细节。服务调用者可以像调用本地接口一样调用远程的服务提供者,而不需要关心底层通信细节和调用过程。
RPC 框架的调用原理图如下所示:
业界主流的 RPC 框架整体上分为三类:
- 支持多语言的 RPC 框架,比较成熟的有 Google 的 gRPC、Apache(Facebook)的 Thrift;
- 只支持特定语言的 RPC 框架,例如新浪微博的 Motan;
- 支持服务治理等服务化特性的分布式服务框架,其底层内核仍然是 RPC 框架, 例如阿里的 Dubbo。
随着微服务的发展,基于语言中立性原则构建微服务,逐渐成为一种主流模式,例如对于后端并发处理要求高的微服务,比较适合采用 Go 语言构建,而对于前端的 Web 界面,则更适合 Java 和 JavaScript。
因此,基于多语言的 RPC 框架来构建微服务,是一种比较好的技术选择。例如 Netflix,API 服务编排层和后端的微服务之间就采用 gRPC 进行通信。
gRPC 是一个高性能、开源和通用的 RPC 框架,面向服务端和移动端,基于 HTTP/2 设计。
gRPC 是由 Google 开发并开源的一种语言中立的 RPC 框架,当前支持 C、Java 和 Go 语言,其中 C 版本支持 C、C++、Node.js、C# 等。当前 Java 版本最新 Release 版为 1.5.0。
Git 地址如下:grpc-java
gRPC 的调用示例如下所示:
- 语言中立,支持多种语言;
- 基于 IDL 文件定义服务,通过 proto3 工具生成指定语言的数据结构、服务端接口以及客户端 Stub;
- 通信协议基于标准的 HTTP/2 设计,支持双向流、消息头压缩、单 TCP 的多路复用、服务端推送等特性,这些特性使得 gRPC 在移动端设备上更加省电和节省网络流量;
- 序列化支持 PB(Protocol Buffer)和 JSON,PB 是一种语言无关的高性能序列化框架,基于 HTTP/2 + PB, 保障了 RPC 调用的高性能。
以官方的 helloworld 为例,介绍 gRPC 服务端创建以及 service 调用流程(采用简单 RPC 模式)。
服务定义如下(helloworld.proto):
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
服务端创建代码如下(HelloWorldServer 类):
private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
...
其中,服务端接口实现类(GreeterImpl)如下所示:
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
gRPC 服务端创建采用 Build 模式,对底层服务绑定、transportServer 和 NettyServer 的创建和实例化做了封装和屏蔽,让服务调用者不用关心 RPC 调用细节,整体上分为三个过程:
- 创建 Netty HTTP/2 服务端;
- 将需要调用的服务端接口实现类注册到内部的 Registry 中,RPC 调用时,可以根据 RPC 请求消息中的服务定义信息查询到服务接口实现类;
- 创建 gRPC Server,它是 gRPC 服务端的抽象,聚合了各种 Listener,用于 RPC 消息的统一调度和处理。
下面我们看下 gRPC 服务端创建流程:
gRPC 服务端创建关键流程分析:
gRPC 的客户端请求消息由 Netty Http2ConnectionHandler 接入,由 gRPC 负责将 PB 消息(或者 JSON)反序列化为 POJO 对象,然后通过服务定义查询到该消息对应的接口实例,发起本地 Java 接口调用,调用完成之后,将响应消息反序列化为 PB(或者 JSON),通过 HTTP2 Frame 发送给客户端。
流程并不复杂,但是细节却比较多,整个 service 调用可以划分为如下四个过程:
- gRPC 请求消息接入;
- gRPC 消息头和消息体处理;
- 内部的服务路由和调用;
- 响应消息发送。
gRPC 的请求消息由 Netty HTTP/2 协议栈接入,通过 gRPC 注册的 Http2FrameListener,将解码成功之后的 HTTP Header 和 HTTP Body 发送到 gRPC 的 NettyServerHandler 中,实现基于 HTTP/2 的 RPC 请求消息接入。
gRPC 请求消息接入流程如下:
关键流程解读如下:
gRPC 消息头的处理入口是 NettyServerHandler 的 onHeadersRead(),处理流程如下所示:
处理流程如下:
gRPC 消息体的处理入口是 NettyServerHandler 的 onDataRead(),处理流程如下所示:
消息体处理比较简单,下面就关键技术点进行讲解:
wrappedExecutor.execute(new ContextRunnable(context) {
@Override
public void runInContext() {
ServerStreamListener listener = NOOP_LISTENER;
try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
}
因此,实际上它们是并行 + 交叉串行实行的,后续章节介绍线程模型时会介绍切换原则。
内部的服务路由和调用,主要包括如下几个步骤:
- 将请求消息体反序列为 Java 的 POJO 对象,即 IDL 中定义的请求参数对象;
- 根据请求消息头中的方法名到注册中心查询到对应的服务定义信息;
- 通过 Java 本地接口调用方式,调用服务端启动时注册的 IDL 接口实现类。
具体流程如下所示:
中间的交互流程比较复杂,涉及的类较多,但是关键步骤主要有三个:
响应消息的发送由 StreamObserver 的 onNext 触发,流程如下所示:
响应消息的发送原理如下:
需要指出的是,请求消息的接收、服务调用以及响应消息发送,多次发生 NIO 线程和应用线程之间的互相切换,以及并行处理。因此上述流程中的一些步骤,并不是严格按照图示中的顺序执行的,后续线程模型章节,会做分析和介绍。
gRPC 请求消息头处理涉及的主要类库如下:
需要说明的是,响应消息的发送由调用服务端接口的应用线程执行,在本示例中,由 SerializingExecutor 进行调用。
当请求消息头被封装成 SendResponseHeadersCommand 并被插入到 WriteQueue 之后,后续操作由 Netty 的 NIO 线程 NioEventLoop 负责处理。
应用线程继续发送响应消息体,将其封装成 SendGrpcFrameCommand 并插入到 WriteQueue 队列中,由 Netty 的 NIO 线程 NioEventLoop 处理。响应消息的发送严格按照顺序:即先消息头,后消息体。
了解 gRPC 服务端消息接入和 service 调用流程之后,针对主要的流程和类库,进行源码分析,以加深对 gRPC 服务端工作原理的了解。
基于 Netty 的 HTTP/2 协议栈,构建 gRPC 服务端,Netty HTTP/2 协议栈初始化代码如下所示(创建 NettyServerHandler,NettyServerHandler 类):
frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
Http2ConnectionDecoder decoder = new FixedHttp2ConnectionDecoder(connection, encoder,frameReader);
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(flowControlWindow);
settings.maxConcurrentStreams(maxStreams);
settings.maxHeaderListSize(maxHeaderListSize);
return new NettyServerHandler(transportListener, streamTracerFactories, decoder, encoder, settings, maxMessageSize,
keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
keepAliveEnforcer);
创建 gRPC FrameListener,作为 Http2FrameListener,监听 HTTP/2 消息的读取,回调到 NettyServerHandler 中(NettyServerHandler 类):
decoder().frameListener(new FrameListener());
将 NettyServerHandler 添加到 Netty 的 ChannelPipeline 中,接收和发送 HTTP/2 消息(NettyServerTransport 类):
ChannelHandler negotiationHandler = protocolNegotiator.newHandler(grpcHandler);
channel.pipeline().addLast(negotiationHandler);
gRPC 服务端请求和响应消息统一由 NettyServerHandler 拦截处理,相关方法如下:
NettyServerHandler 是 gRPC 应用侧和底层协议栈的桥接类,负责将原生的 HTTP/2 消息调度到 gRPC 应用侧,同时将应用侧的消息发送到协议栈。
gRPC 服务端启动时,需要将调用的接口实现类实例注册到内部的服务注册中心,用于后续的接口调用,关键代码如下(InternalHandlerRegistry 类):
Builder addService(ServerServiceDefinition service) {
services.put(service.getServiceDescriptor().getName(), service);
return this;
}
服务接口绑定时,由 Proto3 工具生成代码,重载 bindService() 方法(GreeterImplBase 类):
@java.lang.Override
public final io.grpc.ServerServiceDefinition bindService() {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_SAY_HELLO,
asyncUnaryCall(
new MethodHandlers<
io.grpc.examples.helloworld.HelloRequest,
io.grpc.examples.helloworld.HelloReply>(
this, METHODID_SAY_HELLO)))
.build();
}
gRPC 消息的接收
gRPC 消息的接入由 Netty HTTP/2 协议栈回调 gRPC 的 FrameListener,进而调用 NettyServerHandler 的 onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers) 和 onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream),代码如下所示:
消息头和消息体的处理,主要由 MessageDeframer 的 deliver 方法完成,相关代码如下(MessageDeframer 类):
if (inDelivery) {
return;
}
inDelivery = true;
try {
while (pendingDeliveries > 0 && readRequiredBytes()) {
switch (state) {
case HEADER:
processHeader();
break;
case BODY:
processBody();
pendingDeliveries--;
break;
default:
throw new AssertionError("Invalid state: " + state);
gRPC 请求消息(PB)的解码由 PrototypeMarshaller 负责,代码如下 (ProtoLiteUtils 类):
public T parse(InputStream stream) {
if (stream instanceof ProtoInputStream) {
ProtoInputStream protoStream = (ProtoInputStream) stream;
if (protoStream.parser() == parser) {
try {
T message = (T) ((ProtoInputStream) stream).message();
...
gRPC 响应消息发送
响应消息分为两部分发送:响应消息头和消息体,分别被封装成不同的 WriteQueue.AbstractQueuedCommand,插入到 WriteQueue 中。
消息头封装代码(NettyServerStream 类):
public void writeHeaders(Metadata headers) {
writeQueue.enqueue(new SendResponseHeadersCommand(transportState(),
Utils.convertServerHeaders(headers), false),
true);
}
消息体封装代码(NettyServerStream 类):
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
final int numBytes = bytebuf.readableBytes();
onSendingBytes(numBytes);
writeQueue.enqueue(
new SendGrpcFrameCommand(transportState(), bytebuf, false),
channel.newPromise().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
transportState().onSentBytes(numBytes);
}
}), flush);
Netty 的 NioEventLoop 将响应消息发送到 ChannelPipeline,最终被 NettyServerHandler 拦截并处理。
响应消息头处理代码如下(NettyServerHandler 类):
private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
ChannelPromise promise) throws Http2Exception {
int streamId = cmd.stream().id();
Http2Stream stream = connection().stream(streamId);
if (stream == null) {
resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
return;
}
if (cmd.endOfStream()) {
closeStreamWhenDone(promise, streamId);
}
encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
}
响应消息体处理代码如下(NettyServerHandler 类):
private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
ChannelPromise promise) throws Http2Exception {
if (cmd.endStream()) {
closeStreamWhenDone(promise, cmd.streamId());
}
encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
}
服务接口实例调用: 经过一系列预处理,最终由 ServerCalls 的 ServerCallHandler 调用服务接口实例,代码如下(ServerCalls 类):
return new EmptyServerCallListener<ReqT>() {
ReqT request;
@Override
public void onMessage(ReqT request) {
this.request = request;
}
@Override
public void onHalfClose() {
if (request != null) {
method.invoke(request, responseObserver);
responseObserver.freeze();
if (call.isReady()) {
onReady();
}
最终的服务实现类调用如下(GreeterGrpc 类):
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_SAY_HELLO:
serviceImpl.sayHello((io.grpc.examples.helloworld.HelloRequest) request,
(io.grpc.stub.StreamObserver<io.grpc.examples.helloworld.HelloReply>) responseObserver);
break;
default:
throw new AssertionError();
}
gRPC 的线程由 Netty 线程 + gRPC 应用线程组成,它们之间的交互和切换比较复杂,下面做下详细介绍。
它的工作流程总结如下:
Netty Server 使用的 NIO 线程实现是 NioEventLoop,它的职责如下:
gRPC 服务端调度线程为 SerializingExecutor,它实现了 Executor 和 Runnable 接口,通过外部传入的 Executor 对象,调度和处理 Runnable,同时内部又维护了一个任务队列 ConcurrentLinkedQueue,通过 run 方法循环处理队列中存放的 Runnable 对象,代码示例如下:
Netty Server I/O 线程的职责:
- gRPC 请求消息的读取、响应消息的发送
- HTTP/2 协议消息的编码和解码
- NettyServerHandler 的调度
gRPC service 线程的职责:
- 将 gRPC 请求消息(PB 码流)反序列化为接口的请求参数对象
- 将接口响应对象序列化为 PB 码流
- gRPC 服务端接口实现类调用
gRPC 的线程模型遵循 Netty 的线程分工原则,即:协议层消息的接收和编解码由 Netty 的 I/O(NioEventLoop) 线程负责;后续应用层的处理由应用线程负责,防止由于应用处理耗时而阻塞 Netty 的 I/O 线程。
基于上述分工原则,在 gRPC 请求消息的接入和响应发送过程中,系统不断的在 Netty I/O 线程和 gRPC 应用线程之间进行切换。明白了分工原则,也就能够理解为什么要做频繁的线程切换。
gRPC 线程模型存在的一个缺点,就是在一次 RPC 调用过程中,做了多次 I/O 线程到应用线程之间的切换,频繁切换会导致性能下降,这也是为什么 gRPC 性能比一些基于私有协议构建的 RPC 框架性能低的一个原因。尽管 gRPC 的性能已经比较优异,但是仍有一定的优化空间。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!