美文网首页
kafka 优雅实践

kafka 优雅实践

作者: wyh1791 | 来源:发表于2019-10-11 20:31 被阅读0次

本文分别从生产端和消费端分别说明

1.生产端优化

生产端通过如下提高并发和可靠性

  • 设置大缓冲区100M

  • 缓冲区延迟1s发送

  • 缓冲区最大批量200000

  • 发送端消息进行压缩

  • 失败重试1000次

2.消费端优化(针对图片处理)

消费端通过如下方式进行优化

  • 消息手动确认, 提高可靠性, 避免消息丢失

  • 一次只拉去一个消息, 因为图片处理慢, 所以一次只拉去一个消息

  • 拉取一次最大处理时间30min(保证图片处理时间充足)

  • 避免reblance 超时时间5min, 心跳间隔3s, 超过5min没有心跳才reblance

  • 任何情况必须ack消息

  • 并发控制为5

  • kafka用户名&密码配置化, 去除秘钥文件

  • toptic 统一配置


#kafka基础配置,不要变动

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.acks=1

spring.kafka.producer.retries=1000

spring.kafka.producer.compression-type=gzip

spring.kafka.producer.properties.linger.ms=1000

spring.kafka.producer.properties.batch.size=200000

spring.kafka.producer.properties.max.block.ms=600000

spring.kafka.producer.properties.buffer.memory=100554432

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.enable-auto-commit=false

spring.kafka.consumer.max-poll-records=1

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.consumer.properties.max.poll.interval.ms=1800000

spring.kafka.consumer.properties.rebalance.timeout.ms=300000

spring.kafka.consumer.properties.session.timeout.ms=300000

spring.kafka.consumer.properties.heartbeat.interval.ms=3000

spring.kafka.properties.security.protocol=SASL_PLAINTEXT

spring.kafka.properties.sasl.mechanism=PLAIN

spring.kafka.properties.request.timeout.ms=301000

spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

spring.kafka.listener.concurrency=5


#kafka账号

spring.kafka.bootstrap-servers=172.16.97.161:2093

spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="用户名" password="密码";

#kafka topic

spring.topics.imageTransformMessage.topic=image_transform_message_prod

spring.topics.imageTransformMessage.group=image_transform_message_prod

spring.topics.imageTransformResult.topic=image_transform_result_prod

spring.topics.imageTransformResult.group=image_transform_result_prod

3.代码实例

3.1发送端代码


@Slf4j

@Component

public class KafkaProducer {

    @Autowired

    private KafkaTemplate kafkaTemplate;

    @Value("${spring.topics.imageTransformResult.topic}")

    private String topic;

    /**

    * 相同的key发到同一个partition

    * <p>

    * 由于kafka是根据key的hash值取模去分的partition 导致肯能分布不均,所以此处随机去partition的值

    * @param key

    * @param data

    * @param <T>

    * @return

    */

    public <T> boolean sendMessage(String key, T data) {

        String jsonData = JSONObject.toJSONString(data);

        UUID uuid = UUID.randomUUID();

        String suuid = StringUtils.remove(uuid.toString(), "-");

        try {

            int partitionSize = kafkaTemplate.partitionsFor(topic).size();

            int randomPartition = (int) (System.currentTimeMillis() % partitionSize);

            Header header = new RecordHeader("UUID", suuid.getBytes());

            ProducerRecord producerRecord = new ProducerRecord(topic, randomPartition, key, jsonData, Arrays.asList(header));

            log.info("begin send key {}, data {}, uuid {}", key, data, suuid);

            kafkaTemplate.send(producerRecord);

            log.info("after send uuid {}", suuid);

            return true;

        } catch (Exception e) {

            log.error("sendMessage error, suuid {}, key {}, data {}", suuid, key, jsonData, e);

            String message = "商品同步kafka消息发送失败:" + suuid;

            return false;

        }

    }

}

3.2消费端代码


@Slf4j

@Component

public class KafkaConsumer {

    @Autowired

    private ImageBiz imageBiz;

    @Autowired

    private KafkaProducer kafkaProducer;

    @KafkaListener(topics = "#{'${spring.topics.imageTransformMessage.topic}'}", groupId = "#{'${spring.topics.imageTransformMessage.group}'}")

    public void processMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {

        Stopwatch stopwatch = Stopwatch.createStarted();

        try {

            String key = record.key();

            String data = record.value();

            log.info("kafka receive message, key {}, data {}", key, data);

            if (BaseUtil.isEmpty(data)) {

                acknowledgment.acknowledge();

                return;

            }

            acknowledgment.acknowledge();

        } catch (Exception e) {

            String suuid = MDC.get("UUID") == null ? "" : MDC.get("UUID");

            log.error("消费消息异常,请线上查找原因: {}", suuid, e);

        }

    }

}

4.消费端场景

kafka消费端主要有两种场景

  1. 消息数量不多, 但是处理每一消息的时间比较长

  2. 消息数量很多, 处理每一个消息的时间很短

场景1

和上面介绍的图片处理类似, 每次拉去少量消息, 给消息处理留足够时间

场景2

场景2可以转化为场景1, 把原来的1000个消息组织为一个消息, 批量处理

如果发送方很分散, 并且只能一个个的发消息, 可以使用批量监听消息

配置修改


#一次拉取1000消息

spring.kafka.consumer.max-poll-records=1000

#批量消费模式

spring.kafka.listener.type=batch

消费端代码


@Slf4j

@Component

public class KafkaConsumer {

    @Autowired

    private ImageBiz imageBiz;

    @Autowired

    private KafkaProducer kafkaProducer;

    @KafkaListener(topics = "#{'${spring.topics.imageTransformMessage.topic}'}", groupId = "#{'${spring.topics.imageTransformMessage.group}'}")

    public void processMessage(List<ConsumerRecord<String, String>> record, Acknowledgment acknowledgment) {

        Stopwatch stopwatch = Stopwatch.createStarted();

        try {

            acknowledgment.acknowledge();

        } catch (Exception e) {

            String suuid = MDC.get("UUID") == null ? "" : MDC.get("UUID");

            log.error("消费消息异常,请线上查找原因: {}", suuid, e);

        }

    }

}

参考:

相关文章

  • kafka 优雅实践

    本文分别从生产端和消费端分别说明 1.生产端优化 生产端通过如下提高并发和可靠性 设置大缓冲区100M 缓冲区延迟...

  • 【kafka】kafka理论之partition & repli

    深入理解Kafka:核心设计与实践原理 Kafka理论之Partition & Replication 基于分区和...

  • Kafka实践

    kafka基本概念: Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为...

  • 简读笔记-深入理解kafka-第一部分

    第一章 初始kafka 参考书籍: 朱小厮--深入理解Kafka 核心设计与实践原理 Kafka体系结构 Kaf...

  • 聊聊 Kafka:Kafka 消息重复的场景以及最佳实践

    一、前言 上一篇我们讲了 聊聊 Kafka:Kafka 消息丢失的场景以及最佳实践[https://blog.cs...

  • kafka安装与启动(转)

    kafka的背景知识已经讲了很多了,让我们现在开始实践吧,假设你现在没有Kafka和ZooKeeper环境。 St...

  • kafka搭建

    Kafka实践 提前准备: 安装Java 安装zookeeper 一、kafka集群安装 分别在h1、h2、h3三...

  • Kafka API实践

    系统学习三步骤走:理解原理、搭建系统、Api练习。从哪里找到Api?Document和git。例如,Kafka在g...

  • Kafka最佳实践

    作者:Sriharsha Chintalapani, Jay Kumar SenSharma译者:java达人来源...

  • Kafka Consumer实践

    背景 生产环境采用默认5个分区的设置 kafka已经经过一层封装, 实现了自动增减consumer的逻辑 问题 J...

网友评论

      本文标题:kafka 优雅实践

      本文链接:https://www.haomeiwen.com/subject/lwixmctx.html