kafka
1.介绍
1.1 概念
KafKa是一个消息队列(生产者,消费者模式)
Kafka是一个分布式流媒体平台
发布和订阅记录流,类似于消息队列或企业消息传递系统.
以容错的持久方式存储记录流.
记录发生时处理流
server 与 clent 之间的通信是通过简单,高性能,语言无关的Tcp协议完成的
1.2 作用
1.作为缓冲 --> 削峰还谷 可以缓解数据库压力
2.异构
3.解耦合 ---> 可也减少程序与程序之间的耦合度
1.3 特性
1.高吞吐量.低延迟 kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
2.可扩展性 Kafka集群支持热扩展
3.持久性.可扩展性 消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
4.容错性 循序集群中节点失败 (若副本数量为n,则允许n-1失败节点)
1.4 Kafka架构

Broker 代理者
Producer 生产者
Consumer 消费者
partition 分片
2.操作
2.1 控制台操作
- 创建一个Topic
- /kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 2 --partitions 3 --topic admin
- 启动一个生产者 提供数据
- kafka-console-producer.sh --broker-list hadoop01:9092 --topic admin
- 启动一个消费者 消费数据
- kafka-console-consumer.sh --zookeeper:2181 --from-beginning --topic admin
2.2 使用JavaApi操作
producer
Properties props = new Properties();
//设置配置参数
props.put("bootstrap.servers", "node01:9092"); //连接集群
props.put("acks", "all"); //返回状态为-1
props.put("retries", 0); //重试 0不重试 3
props.put("batch.size", 16384); //一批次的数量 阈值
props.put("linger.ms", 1); //阈值 1ms
props.put("buffer.memory", 33554432); //缓冲区的大小
//我们的数据发送到kafka集群的时候需要先序列化
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
//1.创建kafka生产者对象 需要配置参数 <key(数据的标识,默认可以是给null),value(发送的数据)>
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
//2.生产者生产数据,发送消息 ProducerRecord是有多个参数的(1:topic 2:partition ,3 key ,4value)
ProducerRecord<String, String> record = new ProducerRecord<String, String>("order", "订单2");
kafkaProducer.send(record); //提交数据
kafkaProducer.flush(); //刷新数据
kafkaProducer.close(); //关闭连接 --> 关闭前会提交数据
consumer
Properties props = new Properties();
//设置连接参数
props.put("bootstrap.servers", "路径"); //连接集群
props.put("group.id", "test"); //组名称
props.put("enable.auto.commit", "true");//是否自动向kafka集群中提交偏移量
props.put("auto.commit.interval.ms", "1000");//多长时间提交一次
//反序列化
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
//1.创建kafka消费者对象
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//2.消费数据
//2.1订阅topic集合
kafkaConsumer.subscribe(Arrays.asList("主题名称"));
while (true){ //循环拉取数据
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//拉取数据间隔
for (ConsumerRecord<String, String> record : records) { //遍利拿出主题中的值
System.out.println(record.value()); //输出值
}
}
3.Apache Kafka原理
3.1 分片与副本机制
- 分片
- 当数据量非常大的时候,一个服务器放不下了,就将数据分成多个部分,分布式存储数据。这个过程就是分片
- 副本
- 当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据备份几份,分别存储不同的服务器上
3.2 消息储存与查询机制
- 消息储存
- 一个segment中有两个核心的文件 一个是log(存储-数据存储),一个是index(索引-查找使用),log文件默认是存1G的数据,当数据达到阈值时 就会写入到写一个segment中
- Kafka定期会删除存储的segment 默认时间是7天
- 查询机制
- segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据
3.3 生产者数据分发机制
- 指定 partition (分片) 当调用方法有具体的分片数时,就可以直接发送到具体的Partition上
- 指定 Key 通过传入的Key 算出hash值 进行发送 但如果Key值不变的话 那这个hash也是一样的
- 轮询模式 没有指定Partition与Key值时 使用轮询方式发送数据
3.4 消费者负载均衡机制
一个分片只能被一个组中的一个消费者所消费,但可以与其他组的一个消费者同时消费
分片与消费组中成员的数量应该成正比 不然会有人不干活(浪费资源)
3.5 消息不丢失机制
- producer 消息不丢失 ASCs状态码 0.提交无反应 1 提交给leader all -1
- Borker 消息不丢失 有备份
- consumer 消息不丢失 消息并列
扩展
消息队列的通信模式
-
点对点模式
- 点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。点对点模型的的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控
- 发布于订阅模式

- 发布订阅模式是一个基于消息送的消息传送模型,改模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者(类似微信公众号)。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!但是broker1、broker2、broker3由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是8M/s、5M/s、2M/s,如果队列推送的速度为5M/s,则broker3无法承受!如果队列推送的速度为2M/s,则broker1、broker2、会出现资源的极大浪费!
Kafka分区的目的
-
方便扩展 因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量
-
提高并发 以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率
Message结构
我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
1、 offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
2、 消息大小:消息大小占用4byte,用于描述消息的大小。
3、 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
存储策略
1、 基于时间,默认配置是168小时(7天)。
2、 基于大小,默认配置是1073741824。
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!
Kafka direct 跟receiver 方式接收数据的区别?
Receiver是使用Kafka的高层次Consumer API来实现的。
Receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。
如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。
该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。
所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复,但是效率会下降。
Direct这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。
当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
这种方式有如下优点:
1、简化并行读取:
如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。
Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2、高性能:
如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。
这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。
而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
3、一次且仅一次的事务机制:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。
这是消费Kafka数据的传统方式。
这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。
因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,SparkStreaming自己就负责追踪消费的offset,并保存在checkpoint中。
Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
kafka中的topic的分区和sparkstreaming中生成的rdd的分区没有啥关系,在kafkaUtils.createStream中增加分区 数量只会增加单个reciver的线程数,不会增加spark的并行度
可以创建多个kafka的输入DStream, 使用不同的group和topic, 使用多个receiver并行接收数据
从哪里开始消费数据[latest,earliest,none]
- latest: 当各个分区下有已递交的Offset时,从递交的offset开始消费,无递交的offset时,从头开始消费
- earliest:当各个分区下有已递交的offset时,从递交的offset开始消费,无递交的offset时,消费新生产的该分区下 的数据
- none:当各分区都存在已递交的offset时,从递交的offset开始消费,只要有一个分区不存在已递交的offset,则抛出异常
网友评论