Flink onTimer定时器

代码实例

public class OnTimerProcesFunction extends KeyedProcessFunction {
    private ValueState assetVinState;
    @Override
    public void open(Configuration parameters) throws Exception {
        assetVinState = getRuntimeContext().getState(new ValueStateDescriptor<>("asset-vin-state", JSONObject.class));
    }
    @Override
    public void processElement(JSONObject value, KeyedProcessFunction.Context ctx, Collector out) throws Exception {
        JSONObject data = assetVinState.value();
        if (data == null || data.isEmpty()) {
            out.collect(value);
            //将当前数据设置进状态,并且注册定时器
            assetVinState.update(value);
            // 定时器时间为60秒
            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 40000L);
        } else {
            // 注意:这里更新的数据没有注册定时器,那么不会触发定时发送功能;那这里最新的更新数据,则需要在下面的onTimer中进行定时器的注册,并发送数据到下游
            assetVinState.update(value);
        }
    }
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector out) throws Exception {
        //提取状态数据往下游发送
        JSONObject data = assetVinState.value();
        if (data == null || data.isEmpty()) {
            return;
        }
        out.collect(data);
        // 定时器时间为60秒,这里设置一个定时器,为了将状态中vin对应的最新更新数据发送一次到下游
        ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 40000L);
    }
}


请使用浏览器的分享功能分享到微信等