美文网首页大数据
flink多流水印设置

flink多流水印设置

作者: apophisdeity | 来源:发表于2022-04-14 00:00 被阅读0次

场景:基于eventTime,定时读取mysql获取清洗规则,结合mysql广播流对上游kafka数据实时ETL

  1. 由于仅需要由kafka数据来生成水印,需要让广播流不会影响水印生成,避免程序无法处理数据.
  2. flink的low watermark机制是取最小的水印,单独设置时,只需要设置成最大值,即可.

flink 1.10

public class QueryStreamAssigner<T> implements AssignerWithPeriodicWatermarks<T> {

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return Watermark.MAX_WATERMARK;
    }

    @Override
    public long extractTimestamp(T element, long previousElementTimestamp) {
        return 0;
    }
}

flink 1.12

public class BroadcastStreamAssigner<T> implements WatermarkStrategy<T> {
    private static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);

    @Override
    public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new WatermarkGenerator<T>() {
            @Override
            public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                output.emitWatermark(MAX_WATERMARK);
            }
        };
    }

    @Override
    public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return (element, recordTimestamp) -> 0;
    }
}

相关文章

网友评论

    本文标题:flink多流水印设置

    本文链接:https://www.haomeiwen.com/subject/caxasrtx.html