1.在pom.xml依赖下新添加一下kafka依赖ar包
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
---------------------------------------------------------------------------------------------------
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.1</version>
</dependency>
2.生产者
package cn.example.restfulapi.sys.task;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Properties;
import java.util.concurrent.Future;
/**
* @author Mr.Hao
* @date 2020-05-09
*/
@Component
public class Producer {
private static String topic = "resource_data_sync_test";
private Properties producerProp() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("client.id", "");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Scheduled(cron = "1 * * * * *")
public void sendChannelMess()throws Exception{
;
System.out.println("----------------------开始定时任务-------------------------");
KafkaProducer KafkaProducer = new KafkaProducer<>(producerProp());
for (int i = 1; i < 10; i++) {
String msg = "{\"operation\":1,\"docid\":\"1234567" + i + "\",\"data\":{\"sec_title\":null,\"publish_time\":\"2019-02-18 01:00:00\",\"series\":null,\"brand\":\"\",\"series_id\":\"46,23,78\",\"content\":\"1月8日,市长刘大群\"}}";
@SuppressWarnings("unchecked")
ProducerRecord record = new ProducerRecord(topic, msg);
try {
Future result = KafkaProducer.send(record);
System.out.println(msg);
System.out.println(result.get());
} catch (Exception e) {
e.printStackTrace();
}
}
KafkaProducer.close();
}
}
效果
3. 消费者
package cn.example.restfulapi.sys.task;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.*;
/**
* @author Mr.Hao
* @date 2020-05-11
*/
@Component
public class consumer extends Thread{
private static String topic = "resource_data_sync_test";
private Properties getProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "1");
// props.put("client.id", "");
props.put("enable.auto.commit", true);
props.put("auto.offset.reset", "earliest");//"latest";"earliest"
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("max.poll.records", 5000);
props.put("max.partition.fetch.bytes", 5252880);
props.put("auto.commit.interval.ms", "100");
return props;
}
public void initKafkaConsumer(){
new consumer().start();
}
/**
*
* 消费者
* @author Mr.Hao
* @date 2020-05-11
* @param
*/
@Override
@Scheduled(cron = "1 * * * * *")
public void run(){
System.out.println("----------------------开始定时任务-------------------------");
Consumer<String, String> consumer = new KafkaConsumer<>(getProperties());
long pollTime = 5000;
consumer.subscribe(Arrays.asList(topic));
consumer.seekToBeginning(new ArrayList<>());
Map<String, List<PartitionInfo>> listTopics = consumer.listTopics();
Set<Map.Entry<String, List<PartitionInfo>>> entries = listTopics.entrySet();
while (true) {
System.out.println("------ pull "+topic+" start -----");
try {
ConsumerRecords<String, String> records = consumer.poll(pollTime);
if (records.count() > 0){
System.out.println("poll record size:"+ records.count());
process(records);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("------ pull "+topic+" finish -----");
}
}
public void process(ConsumerRecords<String, String> records) {
if (records.count() == 0)
return;
records.forEach(record -> {
System.out.println("topic ="+record.topic());
System.out.println("partition ="+record.partition());
System.out.println("offset ="+record.offset());
System.out.println("record ="+record.value());
});
}
}
效果
网友评论