社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
流计算系统中经常需要与外部系统进行交互,比如需要查询外部数据库以关联上用户的额外信息。通常,我们的实现方式是向数据库发送用户a
的查询请求,然后等待结果返回,在这之前,我们无法发送用户b
的查询请求。这是一种同步访问的模式,如下图左边所示。
图中棕色的长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,你可以连续地向数据库发送用户a
、b
、c
等的请求,与此同时,哪个请求先返回了就处理哪个请求,从而连续的请求之间不需要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。
利用richmapfunction进行维表关联,就是典型的sync I/O的关联方式。两次请求之间阻塞进行。不适合并发量高的情形。
public static final class MapWithSiteInfoFunc
extends RichMapFunction<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(MapWithSiteInfoFunc.class);
private static final long serialVersionUID = 1L;
private transient ScheduledExecutorService dbScheduler;
// 引入缓存,减小请求次数
private Map<Integer, SiteAndCityInfo> siteInfoCache;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
siteInfoCache = new HashMap<>(1024);
// 利用定时线程,实现维度数据的周期性更新
dbScheduler = new ScheduledThreadPoolExecutor(1, r -> {
Thread thread = new Thread(r, "site-info-update-thread");
thread.setUncaughtExceptionHandler((t, e) -> {
LOGGER.error("Thread " + t + " got uncaught exception: " + e);
});
return thread;
});
dbScheduler.scheduleWithFixedDelay(() -> {
try {
QueryRunner queryRunner = new QueryRunner(JdbcUtil.getDataSource());
List<Map<String, Object>> info = queryRunner.query(SITE_INFO_QUERY_SQL, new MapListHandler());
for (Map<String, Object> item : info) {
siteInfoCache.put((int) item.get("site_id"), new SiteAndCityInfo(
(int) item.get("site_id"),
(String) item.getOrDefault("site_name", ""),
(long) item.get("city_id"),
(String) item.getOrDefault("city_name", "")
));
}
LOGGER.info("Fetched {} site info records, {} records in cache", info.size(), siteInfoCache.size());
} catch (Exception e) {
LOGGER.error("Exception occurred when querying: " + e);
}
}, 0, 10 * 60, TimeUnit.SECONDS);
}
@Override
public String map(String value) throws Exception {
JSONObject json = JSON.parseObject(value);
int siteId = json.getInteger("site_id");
String siteName = "", cityName = "";
SiteAndCityInfo info = siteInfoCache.getOrDefault(siteId, null);
if (info != null) {
siteName = info.getSiteName();
cityName = info.getCityName();
}
json.put("site_name", siteName);
json.put("city_name", cityName);
return json.toJSONString();
}
@Override
public void close() throws Exception {
// 清空缓存,关闭连接
siteInfoCache.clear();
ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, dbScheduler);
JdbcUtil.close();
super.close();
}
private static final String SITE_INFO_QUERY_SQL = "...";
}
Flink 1.2中引入了Async IO(异步IO)来加快flink与外部系统的交互性能,提升吞吐量。其设计的核心是对原有的每条处理后的消息发送至下游operator的执行流程进行改进。其核心实现包括生产和消费两部分,生产端引入了一个AsyncWaitOperator,在其processElement/processWatermark方法中完成对消息的维表关联,随即将未处理完的Futrue对象存入队列中;消费端引入一个Emitter线程,不断从队列中消费数据并发往下游算子。
简单的来说,使用 Async I/O 对应到 Flink 的 API 就是 RichAsyncFunction 这个抽象类,继承这个抽象类实现里面的open(初始化),asyncInvoke(数据异步调用),close(停止的一些操作)方法,最主要的是实现asyncInvoke 里面的方法。有如下示例,Kafka作为流表,存储用户浏览记录,Elasticsearch作为维表,存储用户年龄信息,利用async I/O对浏览记录进行加宽。
流表: 用户行为日志。某个用户在某个时刻点击或浏览了某个商品。自己造的测试数据,数据样例如下:
{"userID": "user_1", "eventTime": "2016-06-06 07:03:42", "eventType": "browse", "productID": 2}
维表: 用户基础信息。自己造的测试数据,数据存储在ES上,数据样例如下:
GET dim_user/dim_user/user
{
"_index": "dim_user",
"_type": "dim_user",
"_id": "user_1",
"_version": 1,
"found": true,
"_source": {
"age": 22
}
}
实现逻辑:
public class FlinkAsyncIO {
public static void main(String[] args) throws Exception{
String kafkaBootstrapServers = "localhost:9092";
String kafkaGroupID = "async-test";
String kafkaAutoOffsetReset= "latest";
String kafkaTopic = "asyncio";
int kafkaParallelism =2;
String esHost= "localhost";
Integer esPort= 9200;
String esUser = "";
String esPassword = "";
String esIndex = "dim_user";
String esType = "dim_user";
/**Flink DataStream 运行环境*/
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT,8081);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
/**添加数据源*/
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",kafkaBootstrapServers);
kafkaProperties.put("group.id",kafkaGroupID);
kafkaProperties.put("auto.offset.reset",kafkaAutoOffsetReset);
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(kafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
SingleOutputStreamOperator<String> source = env.addSource(kafkaConsumer).name("KafkaSource").setParallelism(kafkaParallelism);
//数据转换
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
Tuple4<String, String, String, Integer> output = new Tuple4<>();
try {
JSONObject obj = JSON.parseObject(value);
output.f0 = obj.getString("userID");
output.f1 = obj.getString("eventTime");
output.f2 = obj.getString("eventType");
output.f3 = obj.getInteger("productID");
} catch (Exception e) {
e.printStackTrace();
}
return output;
}).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){}).name("Map: ExtractTransform");
//过滤掉异常数据
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value.f3 != null).name("Filter: FilterExceptionData");
//Timeout: 超时时间 默认异步I/O请求超时时,会引发异常并重启或停止作业。 如果要处理超时,可以重写AsyncFunction#timeout方法。
//Capacity: 并发请求数量
/**Async IO实现流表与维表Join*/
SingleOutputStreamOperator<Tuple5<String, String, String, Integer, Integer>> result = AsyncDataStream.unorderedWait(sourceFilter, new ElasticsearchAsyncFunction(esHost,esPort,esUser,esPassword,esIndex,esType), 500, TimeUnit.MILLISECONDS, 10).name("Join: JoinWithDim");
/**结果输出*/
result.print().name("PrintToConsole");
env.execute();
}
}
ElasticsearchAsyncFunction:
public class ElasticsearchAsyncFunction extends RichAsyncFunction<Tuple4<String, String, String, Integer>, Tuple5<String, String, String, Integer, Integer>> {
private String host;
private Integer port;
private String user;
private String password;
private String index;
private String type;
public ElasticsearchAsyncFunction(String host, Integer port, String user, String password, String index, String type) {
this.host = host;
this.port = port;
this.user = user;
this.password = password;
this.index = index;
this.type = type;
}
private RestHighLevelClient restHighLevelClient;
private Cache<String, Integer> cache;
/**
* 和ES建立连接
*
* @param parameters
*/
@Override
public void open(Configuration parameters) {
//ES Client
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
restHighLevelClient = new RestHighLevelClient(
RestClient
.builder(new HttpHost(host, port))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> HttpAsyncClientBuilder.create()));
//初始化缓存
cache = CacheBuilder.newBuilder().maximumSize(2).expireAfterAccess(5, TimeUnit.MINUTES).build();
}
/**
* 关闭连接
*
* @throws Exception
*/
@Override
public void close() throws Exception {
restHighLevelClient.close();
}
/**
* 异步调用
*
* @param input
* @param resultFuture
*/
@Override
public void asyncInvoke(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {
// 1、先从缓存中取
Integer cachedValue = cache.getIfPresent(input.f0);
if (cachedValue != null) {
System.out.println("从缓存中获取到维度数据: key=" + input.f0 + ",value=" + cachedValue);
resultFuture.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, cachedValue)));
// 2、缓存中没有,则从外部存储获取
} else {
searchFromES(input, resultFuture);
}
}
/**
* 当缓存中没有数据时,从外部存储ES中获取
*
* @param input
* @param resultFuture
*/
private void searchFromES(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {
// 1、构造输出对象
Tuple5<String, String, String, Integer, Integer> output = new Tuple5<>();
output.f0 = input.f0;
output.f1 = input.f1;
output.f2 = input.f2;
output.f3 = input.f3;
// 2、待查询的Key
String dimKey = input.f0;
// 3、构造Ids Query
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.types(type);
searchRequest.source(SearchSourceBuilder.searchSource().query(QueryBuilders.idsQuery().addIds(dimKey)));
RequestOptions requestOptions = RequestOptions.DEFAULT;
// 4、用异步客户端查询数据
restHighLevelClient.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
//成功响应时处理
@Override
public void onResponse(SearchResponse searchResponse) {
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length > 0) {
JSONObject obj = JSON.parseObject(searchHits[0].getSourceAsString());
Integer dimValue = obj.getInteger("age");
output.f4 = dimValue;
cache.put(dimKey, dimValue);
System.out.println("将维度数据放入缓存: key=" + dimKey + ",value=" + dimValue);
}
resultFuture.complete(Collections.singleton(output));
}
//响应失败时处理
@Override
public void onFailure(Exception e) {
output.f4 = null;
resultFuture.complete(Collections.singleton(output));
}
});
}
//超时时处理
@Override
public void timeout(Tuple4<String, String, String, Integer> input, ResultFuture<Tuple5<String, String, String, Integer, Integer>> resultFuture) {
searchFromES(input, resultFuture);
}
}
Flink Async I/O又可以细分为三种,一种是有序的Ordered模式,一种是ProcessingTime 无序模式,一种是EventTime 无序。
主要区别是往下游output的顺序,有序模式会按接收的顺序继续往下游output发送,无序模式就是谁先处理完谁就先往下游发送。下图是ordered模式的原理图。
无论有序无需,都采用了Futrue/Promise设计模式,大体都遵循以下设计逻辑:
生产端:将每条消息封装成一个StreamRecordQueueEntry
(内部维护一个Future对象),放入StreamElementQueue
中
生产端:消息与外部系统交互的逻辑放入AsynInvoke方法中,将交互执行结果放入StreamRecordQueueEntry
中
消费端:启动一个emitter线程,从StreamElementQueue
中读取已经完成的StreamRecordQueueEntry
,将其结果发送至下游operator算子
下面我们分别就生产端和消费端对ordered模式进行源码分析
AsyncWaitOperator
@Internal
public class AsyncWaitOperator<IN<
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!