美文网首页
Flink写kafka性能调优总结

Flink写kafka性能调优总结

作者: 和平菌 | 来源:发表于2020-01-08 10:09 被阅读0次

一、参数调优
//batch.size当批量的数据大小达到设定值后,就会立即发送,不顾下面的linger.ms
properties.put("batch.size", /16384 * 100/ 1024 * 128); //16384是默认值 16k 改成128k

properties.put("linger.ms", 10);//延迟1ms发送,这项设置将通过增加小的延迟来完成--即,不是立即发送一条记录,producer将会等待给定的延迟时间以允许其他消息记录发送
以上两个参数是配置batch提交策略,缓存满提交OR定时flash

properties.put("buffer.memory", 128 * 1024 * 1024 ); //默认值为:33554432合计为32M 修改为128M
properties.put("max.request.size",10 * 1024 * 1024); //每次发送给Kafka服务器请求的最大大小 默认是1M 调整为10M

二、修改sink分区策略
自定义分区器

public class RandomKafkaPartitioner<T> extends FlinkKafkaPartitioner<T> {
    private static final long serialVersionUID = -3785320239953858777L;

 private static final ThreadLocalRandom ran = ThreadLocalRandom.current();

 public RandomKafkaPartitioner() {
    }

    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
 return partitions[this.ran.nextInt(1000000) % partitions.length];
 }

    public boolean equals(Object o) {
        return this == o || o instanceof RandomKafkaPartitioner;
 }

    public int hashCode() {
        return RandomKafkaPartitioner.class.hashCode();
 }
}

创建Sink 算子

new FlinkKafkaProducer011<>("LOGWRITTER_OTHER_TEST",
 new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
 propertiesResult,
 Optional.of(new RandomKafkaPartitioner<>()),
 FlinkKafkaProducer011.Semantic.AT_LEAST_ONCE,
 10)

相关文章

网友评论

      本文标题:Flink写kafka性能调优总结

      本文链接:https://www.haomeiwen.com/subject/ihbjactx.html