springboot整合hystrix对请求进行降级 - Go语言中文社区

springboot整合hystrix对请求进行降级


hystrix功能:

1、 资源隔离。就是对对应的接口分配线程池,避免说对一个接口的过多调用,因为依赖服务接口调用的失败或者延迟,导致所有的线程资源都全部耗费在这个接口上。一旦某个服务的线程资源全部耗尽可能导致服务的崩溃,甚至故障蔓延。
2、降级机制。超时降级、资源不足时(线程或信号量)降级,降级后可以配合降级接口返回托底数据。

下面是springboot和hystrix的整合案例:
下面需要用到maven依赖,主要是springboot版本和hystrix有对应关系。

<!--这是springboot的版本-->
  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <!--下面是关于hystrix的依赖-->
  <-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-hystrix -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-hystrix</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
            <version>1.4.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>com.squareup.okhttp3</groupId>
            <artifactId>okhttp</artifactId>
            <version>3.10.0</version>
        </dependency>

初始化需要用到 HystrixCommandAspect 和 ServletRegistrationBean

@Configuration
public class HystrixConfig {
//用来拦截处理HystrixCommand注解
@Bean
public HystrixCommandAspect hystrixAspect() {
    return new HystrixCommandAspect();
    }
//用来像监控中心Dashboard发送stream信息
 @Bean
public ServletRegistrationBean hystrixMetricsStreamServlet() {
ServletRegistrationBean registration = new ServletRegistrationBean(new HystrixMetricsStreamServlet());
 registration.addUrlMappings("/hystrix.stream");
 return registration;
}
}

3、对需要熔断的接口进行配置

   @PostMapping("/test/netflix")
    @HystrixCommand(
            fallbackMethod = "netflixFallback",
            threadPoolProperties = {
                    @HystrixProperty(name = "coreSize",value = "10"),
                    @HystrixProperty(name = "maxQueueSize", value = "2"),
                    @HystrixProperty(name = "queueSizeRejectionThreshold", value = "10")},
            commandProperties = {
            //命令执行超时时间10s
                    @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000"), 
                    //若干10s一个窗口内失败三次, 则达到触发熔断的最少请求量
                    @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "2"), 
                    //断路3s后尝试执行, 默认为5s
                    @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "3000") 
            }

    )
    public JsonVO testNetflix(@RequestBody PageVO pageVO){
        LOGGER.error("/test/netflix receive a message");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new JsonVO<>(200, "success");
    }
    public JsonVO netflixFallback(@RequestBody PageVO pageVO){
        LOGGER.error("comming netflix");
       return new JsonVO<>(500, "comming netflix");
    }
}

后面附上测试代码,这里创造50个并发,查看测试结果。

  @Test
    public void contextLoads() {
        HashMap<String, Object> paramMap = new HashMap<>();
        paramMap.put("pageNum", String.valueOf(1));
        paramMap.put("pageSize", String.valueOf(10));
        String jsonParam = JSON.toJSONString(paramMap);

        CountDownLatch endCount = new CountDownLatch(50);
        CountDownLatch beginCount = new CountDownLatch(1);


        Thread[] threads = new Thread[50];
        //起50线程测试
        for(int i= 0;i < 50;i++){
            int a = i;
            threads[i] = new Thread(new  Runnable() {

                public void run() {
                    try {
                        //等待在一个信号量上,挂起
                        beginCount.await();
                        String s = httpClient.doPostJson("http://localhost:8080/filter/test/netflix", jsonParam);
                        JsonVO jsonVO = JSON.parseObject(s,new TypeReference<JsonVO>() {});
                        System.out.println("test 第"+a+"个请求"+jsonVO);
                        endCount.countDown();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            threads[i].start();

        }


        long startTime = System.currentTimeMillis();
        //主线程释放开始信号量,并等待结束信号量
        beginCount.countDown();
        try {
            endCount.await();
            System.out.println("测试完毕");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }

测试结果:

test 第20个请求null
test 第4个请求null
test 第9个请求null
test 第29个请求null
test 第17个请求null
test 第22个请求null
test 第0个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第23个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第8个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第35个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第31个请求null
test 第43个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第27个请求null
test 第11个请求null
test 第18个请求null
test 第39个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第15个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第19个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第47个请求null
test 第7个请求null
test 第34个请求null
test 第21个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第41个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第1个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第37个请求null
test 第48个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第13个请求null
test 第49个请求null
test 第3个请求null
test 第32个请求null
test 第26个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第25个请求null
test 第12个请求null
test 第33个请求null
test 第30个请求null
test 第6个请求null
test 第42个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第46个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第2个请求null
test 第45个请求null
test 第24个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第40个请求null
test 第44个请求null
test 第16个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第14个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第38个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第28个请求null
test 第5个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第36个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第10个请求JsonVO(statusCode=200, errorMessage=success, data=null)
测试完毕

可以看到,当测试发生熔断之后接口又主动调用hystrix的回调接口,但是依然有很多接口出现null。这个时候去看启动接口,发现服务接口报错。

HystrixRuntimeException: Command fallback execution rejected.

com.netflix.hystrix.exception.HystrixRuntimeException: TestCommand fallback execution rejected.

在这里插入图片描述
执行错误了,本应该去执行fallback方法,可是却被reject了,为什么呢?

这种情况下,一般来说是command已经熔断了,所有请求都进入fallback导致的,因为fallback默认是有个并发最大处理的限制,fallback.isolation.semaphore.maxConcurrentRequests,默认是10,这个方法及时很简单,处理很快,可是QPS如果很高,还是很容易达到10这个阈值,导致后面的被拒绝。

解决方法也很简单:

1、fallback尽可能的简单,不要有耗时操作,可以直接返回。
2、设置fallback.isolation.semaphore.maxConcurrentRequests更大一点,将这个值改为50之后,发现再也没有返回为null的了。

  @HystrixCommand(
            fallbackMethod = "netflixFallback",
            threadPoolProperties = {
                    @HystrixProperty(name = "coreSize",value = "10"),
                    @HystrixProperty(name = "maxQueueSize", value = "2"),
                    @HystrixProperty(name = "queueSizeRejectionThreshold", value = "10")},
            commandProperties = {
                    @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000"), //命令执行超时时间
                    @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "2"), //若干10s一个窗口内失败三次, 则达到触发熔断的最少请求量
                    @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "3000"),
                    @HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "50")
            }

    )
test 第27个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第39个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第31个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第7个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第6个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第20个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第24个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第48个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第15个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第16个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第17个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第49个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第45个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第25个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第5个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第2个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第29个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第1个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第18个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第13个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第46个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第21个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第4个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第12个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第14个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第30个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第8个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第32个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第0个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第28个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第36个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第19个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第11个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第43个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第3个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第47个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第23个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第35个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第26个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第38个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第9个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第22个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第41个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第37个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第44个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第42个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第10个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第33个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第40个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第34个请求JsonVO(statusCode=200, errorMessage=success, data=null)

当然这里还有很多参数设置没有演示完整,如果想配置更多参数可以参考

https://blog.csdn.net/tongtong_use/article/details/78611225

附上http的请求工具类:

@Component
public class HttpClient {

    private final Logger logger = LoggerFactory.getLogger(HttpClient.class);

    private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
    private static final MediaType XML = MediaType.parse("application/xml; charset=utf-8");

    @Autowired
    private OkHttpClient okHttpClient;

    /**
     * get 请求
     * @param url       请求url地址
     * @return string
     * */
    public String doGet(String url) {
        return doGet(url, null, null);
    }


    /**
     * get 请求
     * @param url       请求url地址
     * @param params    请求参数 map
     * @return string
     * */
    public String doGet(String url, Map<String, String> params) {
        return doGet(url, params, null);
    }

    /**
     * get 请求
     * @param url       请求url地址
     * @param headers   请求头字段 {k1, v1 k2, v2, ...}
     * @return string
     * */
    public String doGet(String url, String[] headers) {
        return doGet(url, null, headers);
    }


    /**
     * get 请求
     * @param url       请求url地址
     * @param params    请求参数 map
     * @param headers   请求头字段 {k1, v1 k2, v2, ...}
     * @return string
     * */
    public String doGet(String url, Map<String, String> params, String[] headers) {
        StringBuilder sb = new StringBuilder(url);
        if (params != null && params.keySet().size() > 0) {
            boolean firstFlag = true;
            for (String key : params.keySet()) {
                if (firstFlag) {
                    sb.append("?").append(key).append("=").append(params.get(key));
                    firstFlag = false;
                } else {
                    sb.append("&").append(key).append("=").append(params.get(key));
                }
            }
        }

        Request.Builder builder = new Request.Builder();
        if (headers != null && headers.length > 0) {
            if (headers.length % 2 == 0) {
                for (int i = 0; i < headers.length; i = i + 2) {
                    builder.addHeader(headers[i], headers[i + 1]);
                }
            } else {
                logger.warn("headers's length[{}] is error.", headers.length);
            }

        }

        Request request = builder.url(sb.toString()).build();
        logger.info("do get request and url[{}]", sb.toString());
        return execute(request);
    }

    /**
     * post 请求
     * @param url       请求url地址
     * @param params    请求参数 map
     * @return string
     */
    public String doPost(String url, Map<String, String> params) {
        FormBody.Builder builder = new FormBody.Builder();

        if (params != null && params.keySet().size() > 0) {
            for (String key : params.keySet()) {
                builder.add(key, params.get(key));
            }
        }
        Request request = new Request.Builder().url(url).post(builder.build()).build();
        logger.info("do post request and url[{}]", url);

        return execute(request);
    }


    /**
     * post 请求, 请求数据为 json 的字符串
     * @param url       请求url地址
     * @param json      请求数据, json 字符串
     * @return string
     */
    public String doPostJson(String url, String json) {
        logger.info("do post request and url[{}]", url);
        return exectePost(url, json, JSON);
    }

    /**
     * post 请求, 请求数据为 xml 的字符串
     * @param url       请求url地址
     * @param xml       请求数据, xml 字符串
     * @return string
     */
    public String doPostXml(String url, String xml) {
        logger.info("do post request and url[{}]", url);
        return exectePost(url, xml, XML);
    }


    private String exectePost(String url, String data, MediaType contentType) {
        RequestBody requestBody = RequestBody.create(contentType, data);
        Request request = new Request.Builder().url(url).post(requestBody).build();

        return execute(request);
    }

    private String execute(Request request) {
        Response response = null;
        try {
            response = okHttpClient.newCall(request).execute();
            if (response.isSuccessful()) {
                return response.body().string();
            }
        } catch (Exception e) {
            logger.error(e.getMessage());
        } finally {
            if (response != null) {
                response.close();
            }
        }
        return "";
    }

}

application.properties 关于http的配置

ok.http.connect-timeout=30
ok.http.read-timeout=30
ok.http.write-timeout=30
# 连接池中整体的空闲连接的最大数量
ok.http.max-idle-connections=200
# 连接空闲时间最多为 300 秒
ok.http.keep-alive-duration=300
@Configuration
public class OkHttpConfiguration {

    @Value("${ok.http.connect-timeout}")
    private Integer connectTimeOut;

    @Value("${ok.http.read-timeout}")
    private Integer readTimeOut;

    @Value("${ok.http.write-timeout}")
    private Integer writeTimeOut;


    @Value("${ok.http.max-idle-connections}")
    private Integer maxIdleConnections;

    @Value("${ok.http.keep-alive-duration}")
    private Long keepAliveDuration;




    @Bean
    public X509TrustManager x509TrustManager(){
        return new X509TrustManager() {
            @Override
            public void checkClientTrusted(X509Certificate[] x509Certificates, String authType) throws CertificateException {
            }

            @Override
            public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

            }

            @Override
            public X509Certificate[] getAcceptedIssuers() {
                return new X509Certificate[0];
            }
        };
    }

    @Bean
    @Primary
    public SSLSocketFactory sslSocketFactory(){
        try {
        //信任任何连接
            SSLContext sslContext = SSLContext.getInstance("TLS");
            sslContext.init(null, new TrustManager[]{x509TrustManager()}, new SecureRandom());
            return sslContext.getSocketFactory();
        } catch (NoSuchAlgorithmException | KeyManagementException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Bean
    public ConnectionPool pool(){
        return new ConnectionPool(maxIdleConnections, keepAliveDuration, TimeUnit.SECONDS);
    }

    @Bean
    public OkHttpClient okHttpClient(){
        return new OkHttpClient().newBuilder()
                .sslSocketFactory(sslSocketFactory(), x509TrustManager())
                //是否开启缓存
                .retryOnConnectionFailure(false)
                .connectionPool(pool())
                .connectTimeout(connectTimeOut, TimeUnit.SECONDS)
                .readTimeout(readTimeOut,TimeUnit.SECONDS)
                .writeTimeout(writeTimeOut,TimeUnit.SECONDS)
                .hostnameVerifier((hostname, session) -> true)
                // 拦截器
//                .addInterceptor()
                .build();
    }
}
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/fajing_feiyue/article/details/103543921
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢