社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
1、 资源隔离。就是对对应的接口分配线程池,避免说对一个接口的过多调用,因为依赖服务接口调用的失败或者延迟,导致所有的线程资源都全部耗费在这个接口上。一旦某个服务的线程资源全部耗尽可能导致服务的崩溃,甚至故障蔓延。
2、降级机制。超时降级、资源不足时(线程或信号量)降级,降级后可以配合降级接口返回托底数据。
下面是springboot和hystrix的整合案例:
下面需要用到maven依赖,主要是springboot版本和hystrix有对应关系。
<!--这是springboot的版本-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<!--下面是关于hystrix的依赖-->
<-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-hystrix -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.10.0</version>
</dependency>
初始化需要用到 HystrixCommandAspect 和 ServletRegistrationBean
@Configuration
public class HystrixConfig {
//用来拦截处理HystrixCommand注解
@Bean
public HystrixCommandAspect hystrixAspect() {
return new HystrixCommandAspect();
}
//用来像监控中心Dashboard发送stream信息
@Bean
public ServletRegistrationBean hystrixMetricsStreamServlet() {
ServletRegistrationBean registration = new ServletRegistrationBean(new HystrixMetricsStreamServlet());
registration.addUrlMappings("/hystrix.stream");
return registration;
}
}
3、对需要熔断的接口进行配置
@PostMapping("/test/netflix")
@HystrixCommand(
fallbackMethod = "netflixFallback",
threadPoolProperties = {
@HystrixProperty(name = "coreSize",value = "10"),
@HystrixProperty(name = "maxQueueSize", value = "2"),
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "10")},
commandProperties = {
//命令执行超时时间10s
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000"),
//若干10s一个窗口内失败三次, 则达到触发熔断的最少请求量
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "2"),
//断路3s后尝试执行, 默认为5s
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "3000")
}
)
public JsonVO testNetflix(@RequestBody PageVO pageVO){
LOGGER.error("/test/netflix receive a message");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new JsonVO<>(200, "success");
}
public JsonVO netflixFallback(@RequestBody PageVO pageVO){
LOGGER.error("comming netflix");
return new JsonVO<>(500, "comming netflix");
}
}
后面附上测试代码,这里创造50个并发,查看测试结果。
@Test
public void contextLoads() {
HashMap<String, Object> paramMap = new HashMap<>();
paramMap.put("pageNum", String.valueOf(1));
paramMap.put("pageSize", String.valueOf(10));
String jsonParam = JSON.toJSONString(paramMap);
CountDownLatch endCount = new CountDownLatch(50);
CountDownLatch beginCount = new CountDownLatch(1);
Thread[] threads = new Thread[50];
//起50线程测试
for(int i= 0;i < 50;i++){
int a = i;
threads[i] = new Thread(new Runnable() {
public void run() {
try {
//等待在一个信号量上,挂起
beginCount.await();
String s = httpClient.doPostJson("http://localhost:8080/filter/test/netflix", jsonParam);
JsonVO jsonVO = JSON.parseObject(s,new TypeReference<JsonVO>() {});
System.out.println("test 第"+a+"个请求"+jsonVO);
endCount.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
threads[i].start();
}
long startTime = System.currentTimeMillis();
//主线程释放开始信号量,并等待结束信号量
beginCount.countDown();
try {
endCount.await();
System.out.println("测试完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
测试结果:
test 第20个请求null
test 第4个请求null
test 第9个请求null
test 第29个请求null
test 第17个请求null
test 第22个请求null
test 第0个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第23个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第8个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第35个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第31个请求null
test 第43个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第27个请求null
test 第11个请求null
test 第18个请求null
test 第39个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第15个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第19个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第47个请求null
test 第7个请求null
test 第34个请求null
test 第21个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第41个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第1个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第37个请求null
test 第48个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第13个请求null
test 第49个请求null
test 第3个请求null
test 第32个请求null
test 第26个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第25个请求null
test 第12个请求null
test 第33个请求null
test 第30个请求null
test 第6个请求null
test 第42个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第46个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第2个请求null
test 第45个请求null
test 第24个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第40个请求null
test 第44个请求null
test 第16个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第14个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第38个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第28个请求null
test 第5个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第36个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第10个请求JsonVO(statusCode=200, errorMessage=success, data=null)
测试完毕
可以看到,当测试发生熔断之后接口又主动调用hystrix的回调接口,但是依然有很多接口出现null。这个时候去看启动接口,发现服务接口报错。
HystrixRuntimeException: Command fallback execution rejected.
com.netflix.hystrix.exception.HystrixRuntimeException: TestCommand fallback execution rejected.
执行错误了,本应该去执行fallback方法,可是却被reject了,为什么呢?
这种情况下,一般来说是command已经熔断了,所有请求都进入fallback导致的,因为fallback默认是有个并发最大处理的限制,fallback.isolation.semaphore.maxConcurrentRequests,默认是10,这个方法及时很简单,处理很快,可是QPS如果很高,还是很容易达到10这个阈值,导致后面的被拒绝。
解决方法也很简单:
1、fallback尽可能的简单,不要有耗时操作,可以直接返回。
2、设置fallback.isolation.semaphore.maxConcurrentRequests更大一点,将这个值改为50之后,发现再也没有返回为null的了。
@HystrixCommand(
fallbackMethod = "netflixFallback",
threadPoolProperties = {
@HystrixProperty(name = "coreSize",value = "10"),
@HystrixProperty(name = "maxQueueSize", value = "2"),
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "10")},
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000"), //命令执行超时时间
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "2"), //若干10s一个窗口内失败三次, 则达到触发熔断的最少请求量
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "3000"),
@HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "50")
}
)
test 第27个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第39个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第31个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第7个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第6个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第20个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第24个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第48个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第15个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第16个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第17个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第49个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第45个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第25个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第5个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第2个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第29个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第1个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第18个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第13个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第46个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第21个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第4个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第12个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第14个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第30个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第8个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第32个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第0个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第28个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第36个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第19个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第11个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第43个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第3个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第47个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第23个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第35个请求JsonVO(statusCode=500, errorMessage=comming netflix, data=null)
test 第26个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第38个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第9个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第22个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第41个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第37个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第44个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第42个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第10个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第33个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第40个请求JsonVO(statusCode=200, errorMessage=success, data=null)
test 第34个请求JsonVO(statusCode=200, errorMessage=success, data=null)
当然这里还有很多参数设置没有演示完整,如果想配置更多参数可以参考
https://blog.csdn.net/tongtong_use/article/details/78611225
附上http的请求工具类:
@Component
public class HttpClient {
private final Logger logger = LoggerFactory.getLogger(HttpClient.class);
private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
private static final MediaType XML = MediaType.parse("application/xml; charset=utf-8");
@Autowired
private OkHttpClient okHttpClient;
/**
* get 请求
* @param url 请求url地址
* @return string
* */
public String doGet(String url) {
return doGet(url, null, null);
}
/**
* get 请求
* @param url 请求url地址
* @param params 请求参数 map
* @return string
* */
public String doGet(String url, Map<String, String> params) {
return doGet(url, params, null);
}
/**
* get 请求
* @param url 请求url地址
* @param headers 请求头字段 {k1, v1 k2, v2, ...}
* @return string
* */
public String doGet(String url, String[] headers) {
return doGet(url, null, headers);
}
/**
* get 请求
* @param url 请求url地址
* @param params 请求参数 map
* @param headers 请求头字段 {k1, v1 k2, v2, ...}
* @return string
* */
public String doGet(String url, Map<String, String> params, String[] headers) {
StringBuilder sb = new StringBuilder(url);
if (params != null && params.keySet().size() > 0) {
boolean firstFlag = true;
for (String key : params.keySet()) {
if (firstFlag) {
sb.append("?").append(key).append("=").append(params.get(key));
firstFlag = false;
} else {
sb.append("&").append(key).append("=").append(params.get(key));
}
}
}
Request.Builder builder = new Request.Builder();
if (headers != null && headers.length > 0) {
if (headers.length % 2 == 0) {
for (int i = 0; i < headers.length; i = i + 2) {
builder.addHeader(headers[i], headers[i + 1]);
}
} else {
logger.warn("headers's length[{}] is error.", headers.length);
}
}
Request request = builder.url(sb.toString()).build();
logger.info("do get request and url[{}]", sb.toString());
return execute(request);
}
/**
* post 请求
* @param url 请求url地址
* @param params 请求参数 map
* @return string
*/
public String doPost(String url, Map<String, String> params) {
FormBody.Builder builder = new FormBody.Builder();
if (params != null && params.keySet().size() > 0) {
for (String key : params.keySet()) {
builder.add(key, params.get(key));
}
}
Request request = new Request.Builder().url(url).post(builder.build()).build();
logger.info("do post request and url[{}]", url);
return execute(request);
}
/**
* post 请求, 请求数据为 json 的字符串
* @param url 请求url地址
* @param json 请求数据, json 字符串
* @return string
*/
public String doPostJson(String url, String json) {
logger.info("do post request and url[{}]", url);
return exectePost(url, json, JSON);
}
/**
* post 请求, 请求数据为 xml 的字符串
* @param url 请求url地址
* @param xml 请求数据, xml 字符串
* @return string
*/
public String doPostXml(String url, String xml) {
logger.info("do post request and url[{}]", url);
return exectePost(url, xml, XML);
}
private String exectePost(String url, String data, MediaType contentType) {
RequestBody requestBody = RequestBody.create(contentType, data);
Request request = new Request.Builder().url(url).post(requestBody).build();
return execute(request);
}
private String execute(Request request) {
Response response = null;
try {
response = okHttpClient.newCall(request).execute();
if (response.isSuccessful()) {
return response.body().string();
}
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
if (response != null) {
response.close();
}
}
return "";
}
}
application.properties 关于http的配置
ok.http.connect-timeout=30
ok.http.read-timeout=30
ok.http.write-timeout=30
# 连接池中整体的空闲连接的最大数量
ok.http.max-idle-connections=200
# 连接空闲时间最多为 300 秒
ok.http.keep-alive-duration=300
@Configuration
public class OkHttpConfiguration {
@Value("${ok.http.connect-timeout}")
private Integer connectTimeOut;
@Value("${ok.http.read-timeout}")
private Integer readTimeOut;
@Value("${ok.http.write-timeout}")
private Integer writeTimeOut;
@Value("${ok.http.max-idle-connections}")
private Integer maxIdleConnections;
@Value("${ok.http.keep-alive-duration}")
private Long keepAliveDuration;
@Bean
public X509TrustManager x509TrustManager(){
return new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String authType) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
};
}
@Bean
@Primary
public SSLSocketFactory sslSocketFactory(){
try {
//信任任何连接
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, new TrustManager[]{x509TrustManager()}, new SecureRandom());
return sslContext.getSocketFactory();
} catch (NoSuchAlgorithmException | KeyManagementException e) {
e.printStackTrace();
}
return null;
}
@Bean
public ConnectionPool pool(){
return new ConnectionPool(maxIdleConnections, keepAliveDuration, TimeUnit.SECONDS);
}
@Bean
public OkHttpClient okHttpClient(){
return new OkHttpClient().newBuilder()
.sslSocketFactory(sslSocketFactory(), x509TrustManager())
//是否开启缓存
.retryOnConnectionFailure(false)
.connectionPool(pool())
.connectTimeout(connectTimeOut, TimeUnit.SECONDS)
.readTimeout(readTimeOut,TimeUnit.SECONDS)
.writeTimeout(writeTimeOut,TimeUnit.SECONDS)
.hostnameVerifier((hostname, session) -> true)
// 拦截器
// .addInterceptor()
.build();
}
}
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!