Flink flatMap 使用lambda表达式异常问题

代码示例:

.flatMap((FlatMapFunction) (message, out) -> {
        String mes = message.replaceAll("\\\\", "");
        if (StringUtils.isNotEmpty(mes)) {
            out.collect(JSON.parseObject(mes));
        }
}).name("主流格式化")

错误信息:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
	at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
	at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
	at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:612)
	at com.vsoc.realtime.schedule.check.EmulationRealCheckJob.main(EmulationRealCheckJob.java:127)

解决方案:

.flatMap((FlatMapFunction) (message, out) -> {
        String mes = message.replaceAll("\\\\", "");
        if (StringUtils.isNotEmpty(mes)) {
            out.collect(JSON.parseObject(mes));
        }
}).name("主流数式化")
.returns(new TypeHint() {
})

错误原因:

这个错误是由于Flink在处理使用Java泛型的lambda表达式时,无法自动提取足够的类型信息。一个简单的解决方法是使用一个实现了org.apache.flink.api.common.functions.FlatMapFunction接口的(匿名)类,而不是lambda表达式。否则,类型必须使用类型信息明确指定





请使用浏览器的分享功能分享到微信等