Grpc中Deadline分析 - Go语言中文社区

Grpc中Deadline分析


  1. Deadline核心解决的问题,就是在client请求在遇到异常情况时的最大等待时间,减少不必要的阻塞。
  2. GRPC中没有采用传统的timeout方式去处理,而是采用了Deadline机制,主要的区别大致如下:



    先举一个超时的例子,设置超时时长为200毫秒,这时需要调用三个服务A/B/C,当处理到B的时候,已经超时了,这时client会抛出TimeoutException,但是这时请注意,只有client一端响应,实际server端还在工作!



    实际这个时候server是否返回已经不重要了,因为client已经主动的断开调用了,虽然返回不重要,但是这样会造成很大的资源浪费。
    结论:实际看到这,Deadline的要解决的核心问题已经暴露出来了,就是如何两端同步超时时间,如何将超时传播给其他(等待)服务。

    client为自己设置监听器,此处为了体现client与server一致,所以EXCEED由server端报出

设计思路

  1. 当client产生请求时,将request中的deadline绑定到context,并启动一个定时任务,执行时间为当前时间+deadline时间。
  2. service按照依赖关系依次运行,如果超出deadline规定的时间,执行cancel任务,包括中断当前task,清除所有listeners等。
  3. 向server返回exceed异常,通知超时,client结束等待。
client设置deadline

stub.withDeadlineAfter(long duration, TimeUnit unit)
public final S withDeadlineAfter(long duration, TimeUnit unit) {
  return build(channel, callOptions.withDeadlineAfter(duration, unit));
}
定义在

public abstract class AbstractStub<S extends AbstractStub<S>>    
是一个抽象方法,实现在GRPC代码生成器生成的代码中

以下为example的实现

@Override
protected GreeterStub build(io.grpc.Channel channel,
 io.grpc.CallOptions callOptions) {
  return new GreeterStub(channel, callOptions);
}
只是在当前的channel和callOptions基础上新建一个stub对象.(如果反复设置会创建大量stub,会不会影响性能,这么设计的初衷是什么,为何不能复用.)

 

Context 

Context为一个链表性的结构,每个Context会记录自己的父Context,Root的父Context为null.
每个线程的Context保存到ThreadLocal中

final class ThreadLocalContextStorage extends Context.Storage {
/**
 * Currently bound context.
 */
 private static final ThreadLocal<Context> localContext = new ThreadLocal<Context>();
 

server端绑定上下文

首先server端启动时会创建一个serverImpl

public final class ServerImpl extends io.grpc.Server implements WithLogId
该类在初始化的过程中,会创建Context.
@Override
public void streamCreated(
    final ServerStream stream, final String methodName, final Metadata headers) {

  final StatsTraceContext statsTraceCtx = Preconditions.checkNotNull(
      stream.statsTraceContext(), "statsTraceCtx not present from stream");

 final Context.CancellableContext context = createContext(stream, headers, statsTraceCtx);
.......}
以下是createContext方法的内容

private Context.CancellableContext createContext(
    final ServerStream stream, Metadata headers, StatsTraceContext statsTraceCtx) {
  Long timeoutNanos = headers.get(TIMEOUT_KEY);

 Context baseContext = statsTraceCtx.serverFilterContext(rootContext);

 if (timeoutNanos == null) {   如果client没有指明timeout,则生成一个timeout为null的上下文,此举应该是为了向下兼容,之前不支持Deadline的概念,所以扩展了Context
    return baseContext.withCancellation();
 }

  Context.CancellableContext context =
      baseContext.withDeadlineAfter(timeoutNanos, NANOSECONDS, timeoutService); 还是启动一个监听程序,如果当前上线文的状态是超时状态,则自动取消(满足了取消传播的需求)
 context.addListener(new Context.CancellationListener() {
    @Override
 public void cancelled(Context context) {
      Status status = statusFromCancelled(context);
 if (DEADLINE_EXCEEDED.getCode().equals(status.getCode())) {
        // This should rarely get run, since the client will likely cancel the stream before
 // the timeout is reached.
 stream.cancel(status);
 }
    }
  }, directExecutor());

 return context;
}
server端也可以根据实际情况自己设置Context,可以参见API中 public Context attach() 和 public void detach(Context toAttach).

关于Deadline的设置,官方的一段注释写的非常清楚

/**
* If the parent deadline is before the given deadline there is no need to install the value
* or listen for its expiration as the parent context will already be listening for it.
*/
也就是说只有在新设定的deadline值在parent context之前才会创建新的销毁线程.

 

 

 

client端监听代码,这段在clientStreamImpl中,代码跳转太多,只贴上来最后取消的代码

/**
 * Stores listener and executor pair. client端会启动一个监听线程,用来cancel掉这个任务
 */
private class ExecutableListener implements Runnable {
  private final Executor executor;
 private final CancellationListener listener;

 private ExecutableListener(Executor executor, CancellationListener listener) {
    this.executor = executor;
 this.listener = listener;
 }

  private void deliver() {
    try {
      executor.execute(this);
 } catch (Throwable t) {
      log.log(Level.INFO, "Exception notifying context listener", t);
 }
  }

  @Override
 public void run() {
    listener.cancelled(Context.this);
 }
}
 

调用listener.cancelled(Context.this);   会对当前状态进行判断,并给出相应的status

Throwable cancellationCause = context.cancellationCause();
if (cancellationCause == null) {
  return Status.CANCELLED;
}
if (cancellationCause instanceof TimeoutException) {
  return Status.DEADLINE_EXCEEDED
 .withDescription(cancellationCause.getMessage())
      .withCause(cancellationCause);
}
 
//Status.CANCELLED,DEADLINE_EXCEEDED   给出取消的status参数,返回给调用者
@Override
protected void sendCancel(Status reason) { 
  synchronized (lock) {
    if (cancelSent) {
      return;
 }
    cancelSent = true;
 if (pendingData != null) {
      // stream is pending.
 transport.removePendingStream(this); 把暂停的流给释放掉
 // release holding data, so they can be GCed or returned to pool earlier.
 requestHeaders = null;
 for (PendingData data : pendingData) {
        data.buffer.clear();
 }
      pendingData = null;
 transportReportStatus(reason, true, new Metadata());
 } else {
      // If pendingData is null, start must have already been called, which means synStream has
 // been called as well.
 transport.finishStream(id(), reason, ErrorCode.CANCEL); 结束传输
 }
  }
}
版权声明:本文来源简书,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://www.jianshu.com/p/f67be6287c1d
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-02-02 14:54:38
  • 阅读 ( 1401 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢