代码实例
public class OnTimerProcesFunction extends KeyedProcessFunction{ private ValueState assetVinState; @Override public void open(Configuration parameters) throws Exception { assetVinState = getRuntimeContext().getState(new ValueStateDescriptor<>("asset-vin-state", JSONObject.class)); } @Override public void processElement(JSONObject value, KeyedProcessFunction .Context ctx, Collector out) throws Exception { JSONObject data = assetVinState.value(); if (data == null || data.isEmpty()) { out.collect(value); //将当前数据设置进状态,并且注册定时器 assetVinState.update(value); // 定时器时间为60秒 ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 40000L); } else { // 注意:这里更新的数据没有注册定时器,那么不会触发定时发送功能;那这里最新的更新数据,则需要在下面的onTimer中进行定时器的注册,并发送数据到下游 assetVinState.update(value); } } @Override public void onTimer(long timestamp, KeyedProcessFunction .OnTimerContext ctx, Collector out) throws Exception { //提取状态数据往下游发送 JSONObject data = assetVinState.value(); if (data == null || data.isEmpty()) { return; } out.collect(data); // 定时器时间为60秒,这里设置一个定时器,为了将状态中vin对应的最新更新数据发送一次到下游 ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 40000L); } }