美文网首页
kafka生产者连接池模式

kafka生产者连接池模式

作者: 会飞的蜗牛66666 | 来源:发表于2019-08-16 17:45 被阅读0次

package com.ky.produce;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**

  • @author xwj

  • 线程池生产者
    */
    public class ProduceThreadPool {

    private static Properties properties = new Properties();

    private static KafkaProducer<String, String> producer;

    private static ThreadPoolExecutor service;

    private static TimeUnit timeUnit = TimeUnit.SECONDS;

    private static BlockingQueue blockingQueue = new LinkedBlockingQueue<Runnable>();

    static {
    int corePoolSize = 40;
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.1.11.110:6667,10.1.11.111:6667,10.1.11.112:6667");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
    properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072);
    properties.put(ProducerConfig.ACKS_CONFIG, "all");
    producer = new KafkaProducer<>(properties);
    int maximumPoolSize = 100;
    long keepAliveTime = 60;
    service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, blockingQueue);
    System.out.println("init ok....");
    }

//发送数据
public static void sendData(String topic, String msg) {
    try {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, String.valueOf(System.currentTimeMillis()), msg);
        service.submit(new ProducerThread(producer, record));
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

package com.ky.produce;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**

  • @author xwj

  • <p>

  • 生产者线程
    */
    public class ProducerThread implements Runnable {

    private Logger log = LoggerFactory.getLogger(ProducerThread.class);

    private KafkaProducer<String, String> producer;
    private ProducerRecord<String, String> record;

    ProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
    this.producer = producer;
    this.record = record;
    }

    @Override
    public void run() {
    producer.send(record, (metadata, e) -> {
    if (null != e) {
    e.printStackTrace();
    }
    if (null != metadata) {
    log.info("消息发送成功 : " + String.format("offset: %s, partition:%s, topic:%s timestamp:%s",
    metadata.offset(), metadata.partition(), metadata.topic(), metadata.timestamp()));
    }
    });
    }

}

相关文章

  • kafka生产者连接池模式

    package com.ky.produce; import org.apache.kafka.clients.p...

  • 2022-01-19

    ```/** * kafka produce 单例模式只初始化一个生产者 */publicclassKafkaPr...

  • kafka0.8

    1、Kafka分为:生产者(producer),消费者(consumer) 2、生产者提交消息,给Kafka集群,...

  • Kafka - 生产者初步学习

    Kafka - 生产者初步学习 一、kafka生产者组件 我们从创建一个 ProducerRecord 对象开始,...

  • Kafka消费者API总结

    相对于Kafka的生产者API,消费者的API略显繁杂,本文总结了0.11.0版本的kafka消费者的几种消费模式...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • [kafka系列]之producer端消息发送

    本小节我们来讨论Kafka生产者是如何发送消息到Kafka的, Kafka项目有一个生产者客户端,我们可以通过这个...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • Kafka生产者:写消息到Kafka

    本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客...

  • 2020-04-21springboot2.x rabbitmq

    之前使用kafka只有2种模式1.生产者消费者2.发布订阅 而ribbitmq却有三种模式 fanout,topi...

网友评论

      本文标题:kafka生产者连接池模式

      本文链接:https://www.haomeiwen.com/subject/efiqsctx.html