社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
楼主在使用Java的lambda表达式,进行flink编程时报错,如下:
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function
异常信息如下:
解决方法:需要加上返回值泛型, 举例如下:
flatMap算子,返回类型String
//######flatMap算子的lambda表达式###########
//注意这里执行算子操作,有两种接受类型的写法,
//使用DataSet接受,指定的泛型,是每一有一个元素的泛型,类似于RDD中的没有一个元素
//使用Operator接受,指定类型有两个,输入参数类型,输出参数类型
DataSet<String> words = dataSource.flatMap((FlatMapFunction<String, String>) (line, out)
// final FlatMapOperator<String, String> words = dataSource.flatMap((FlatMapFunction<String, String>) (line, out)
-> {
for (String word : line.split(" ")) {
if (word.trim() != null) //过滤无效值
out.collect(word);
}
}).returns(Types.STRING);
flatMap算子,返回类型Tuple,
//######flatMap算子的lambda表达式###########
//返回值是Tuple,注意,泛型的指定,FlatMapFunction也要指定的
DataSet<Tuple2<String,Integer>> wordsandOnes = dataSource.flatMap((FlatMapFunction<String, Tuple2<String,Integer>>) (line, out)
-> {
for (String word : line.split(" ")) {
// if (word.trim() != null) //过滤无效值
Tuple2 t = new Tuple2(word,1);
out.collect(t);
}
}).returns(Types.TUPLE(Types.STRING,Types.INT));
Map算子,返回Tuple
//######Map算子的lambda表达式###########
//使用lambda表达式,注意要知道返回的类型,否则会报错,另外注意返回是Tuple类型时2,是如何指定的。
final MapOperator<String, Tuple2<String,Integer>> map1 = dataSource
.map((MapFunction<String,Tuple2<String,Integer>>)(in)
-> new Tuple2<>(in,1))
.returns(Types.TUPLE(Types.STRING,Types.INT));
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!