flink cep对于超时时间处理patternTimeoutFunction - Go语言中文社区

flink cep对于超时时间处理patternTimeoutFunction


Flink Cep是对复杂事件处理的一种手段,通过规则进行匹配,比如有 A B A C B C D是个消息,我们想获取 B C这种事件的消息,就可以通过定义相关规则来进行业务处理,通常我们会对C消息到达的时间有要求,比如在3s之内,那么我们怎么获得超出3s已经匹配到的消息呢?

现在来讨论下Flink CEP对于超时时间的处理

直接上demo(实现功能,匹配click后为buy事件的消息,事件限定为5s,同时获取超时时间外匹配的消息)

git地址 https://github.com/fan-code/flinkLearning/blob/master/src/main/java/myflink/demo/CepLearn01.java

import myflink.bean.UserAction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

public class CepLearn01 {

    private static final OutputTag<UserAction> overFiveTag = new OutputTag<UserAction>("overFive") {
    };

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Pattern<UserAction, ?> pattern = Pattern.<UserAction>begin("start").where(
                new SimpleCondition<UserAction>() {
                    @Override
                    public boolean filter(UserAction event) {
                        return event.action.equals("click");
                    }
                }
        ).next("middle").where(
                new SimpleCondition<UserAction>() {
                    @Override
                    public boolean filter(UserAction subEvent) {
                        return subEvent.action.equals("buy");
                    }
                }
        ).within(Time.seconds(5));


        SingleOutputStreamOperator<UserAction> input = env.fromElements(WORDS).map(new MapFunction<String, UserAction>() {
            @Override
            public UserAction map(String value) throws Exception {
                String[] split = value.toLowerCase().split(",");
                return new UserAction(split[0], split[1], split[2], split[3]);
            }
        }).keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {

                return value.name;
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserAction>(Time.seconds(0)) {
            @Override
            public long extractTimestamp(UserAction element) {
                return Long.parseLong(element.timeStamp);
            }
        });

        PatternStream<UserAction> patternStream = CEP.pattern(input, pattern);

        //获取click-buy事件消息及超时消息
        SingleOutputStreamOperator<String> select1 = patternStream.select(overFiveTag, new PatternTimeoutFunction<UserAction, UserAction>() {
            @Override
            public UserAction timeout(Map<String, List<UserAction>> pattern, long timeoutTimestamp) throws Exception {

                UserAction click = pattern.get("start").iterator().next();
//                UserAction buy = pattern.get("middle").iterator().next();
                System.out.println("click"+click);
 //               System.out.println("buy"+buy);

                return click;
        }
        }, new PatternSelectFunction<UserAction, String>() {
            @Override
            public String select(Map<String, List<UserAction>> pattern) throws Exception {
                UserAction click = pattern.get("start").iterator().next();
                UserAction buy = pattern.get("middle").iterator().next();
                return "name:"+ click.name+"--click--"+click.timeStamp+"--buy--"+buy.timeStamp;
            }
        });
        select1.print();
        DataStream<UserAction> sideOutput = select1.getSideOutput(overFiveTag);
        sideOutput.print();

//仅获取click-buy事件消息
//        SingleOutputStreamOperator<String> select = patternStream.select(new PatternSelectFunction<UserAction, String>() {
//           @Override
//           public String select(Map<String, List<UserAction>> map) throws Exception {
//                UserAction click = map.get("start").iterator().next();
//                UserAction buy = map.get("middle").iterator().next();
//               return "name:"+ click.name+"--click--"+click.timeStamp+"--buy--"+buy.timeStamp;
//            }
//       });
//        select.print();

        env.execute("jjjj");
    }


    public static final String[] WORDS = new String[]{
            "176.168.50.26,1575600181000,john,click",
            "176.168.50.26,1575600182000,john,buy",
            "176.168.50.26,1575600183000,john,click",
            "176.168.50.26,1575600184000,john,click",
            "176.168.50.26,1575600190000,john,buy",
            "176.168.50.26,1575600191000,jerry,click",
            "176.168.50.26,1575600194000,jerry,buy",
            "176.168.50.26,1575600199000,jerry,click",
            "176.168.50.26,1575600200000,jerry,order",
            "176.168.50.26,1575600220000,jerry,buy"
    };


}

在这里,我们使用了该方式去获取outTime的消息,需要注意,我们获取的超时事件是什么?

	public <L, R> SingleOutputStreamOperator<R> select(
		final OutputTag<L> timeoutOutputTag,
		final PatternTimeoutFunction<T, L> patternTimeoutFunction,
		final PatternSelectFunction<T, R> patternSelectFunction)

超时事件定义:对于超时数据来说(匹配上了一个条件,但是在规定时间内,下一条数据没有匹配上第二个条件),他只有等到下一条数据来了,才会判断上一条数据是否超时了。而不是等到时间窗口到了,就立即判断这条数据是否超时。

重点在于我们是要判断的是上一条数据是否超时

在该demo中,即为匹配了第一个条件,第二个条件超时的事件数据,我们只能够获取start事件的消息,如果我们在

PatternTimeoutFunction中试图获取middle事件的消息,会给我们返回空指针异常的错误

该demo的输出为,中间有条数据超过了限定的5s

 

 

 

版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_34864753/article/details/106639907
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
  • 发表于 2020-06-28 02:34:31
  • 阅读 ( 2702 )
  • 分类:

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢