美文网首页Spark 应用sparkSpark深入学习
(基于最新的Kafka version 0.10.2 new

(基于最新的Kafka version 0.10.2 new

作者: 俺是亮哥 | 来源:发表于2017-04-19 00:16 被阅读3770次

本文基于Spark2.1.0、Kafka 0.10.2、Scala 2.11.8版本

背景:

Kafka做为一款流行的分布式发布订阅消息系统,以高吞吐、低延时、高可靠的特点著称,已经成为Spark Streaming常用的流数据来源。

常用的ETL架构


1,Kafka Topic的消费保证:

流数据,顾名思义,就是无界的、流动的、快速的、连续到达的数据序列,所以它不像可靠的文件系统(如HDFS)在计算出现故障时,可以随时恢复数据来重新计算。

那么,如何保证流数据可靠的传递呢?我们先了解下面的概念:

Producer通过Broker传递消息给Consumer,Consumer消费消息,

P-B-C 3者之间的传输,主要有以下几种可能的场景:

At most once(最多传输一次): 消息可能会丢,但绝不会重复传输;

At least one  (至少传输一次):  消息绝不会丢,但可能会重复传输;

Exactly once(精确传输一次):  每条消息肯定会被传输一次且仅传输一次,不会重复.

3种场景,适合不同生产环境的需要,相关介绍网上很多,这里就不多说了。

本文的重点是,如何用Spark 2.1.0、Kafka 0.10.2、spark-streaming-kafka-0-10_2.11-2.1.0.jar、HBase 1.3.0来配合实现消息Exactly once(精确一次)的传递和消费。网上相关的scala或者java代码,都是基于老版本的API,目前没有发现基于new Kafka consumer API的实现,所以看到本文觉得有收获的同学,就给点个赞吧。

2,Kafka 0.10.2版本介绍:

Kafka 0.10.2版本,为了和Zookeeper的解耦,较之前的版本有了很大的变化,老版本的高级API和简单API的说法不见了,取而代之的是New Consumer API及New Consumer Configs,相关接口的参数及P-B-C 3者的配置参数有了很多改动。

Spark官方与之配合的工具包spark-streaming-kafka-0-10_2.11-2.1.0.jar 也做了相应的改变,取消了KafkaCluster类、取消了ZkUtils.updatePersistentPath等多个方法,也都是为了不在将Topic offset由zookeeper自动保存,而由用户灵活的选择Kafka和Spark 2.1.0官方提供的几种方法来保存offset,最好的使用情况下,端到端的业务可以达到精确一次的消费保证。

(为了美观,本文相关的java代码都用贴图方式展现了,最终实现的端到端精确一次消费消息的源码见文末的链接)

3,Kafka官方提供的多种消费保证:

Consumer的3个重要的配置,需要配合使用,来达到Broker到Consumer之间精确一次的消费保证。

请看这些参数的组合(有点绕,请仔细看)

(enable.auto.commit:false) + (auto.offset.reset:earliest):

在Broker到Consumer之间实现了至少一次语义,因为不使用Kafka提供的自动保存offset功能,每次应用程序启动时,都是从Topic的初始位置来获取消息。也就是说,应用程序因为故障失败,或者是人为的停止,再次启动应用程序时,都会从初始位置把指定的Topic所有的消息都消费一遍,这就导致了Consumer会重复消费。

(enable.auto.commit:false) + (auto.offset.reset:latest):

在Broker到Consumer之间实现了至多一次语义,因为不使用Kafka提供的自动保存offset功能,每次应用程序启动时,都是从Topic的末尾位置来获取消息。也就是说,应用程序因为故障失败,或者是人为的停止后,如果Producer向Broker发送新的消息,当再次启动应用程序时,Consumer从指定的Topic的末尾来开始消费,这就导致了这部分新产生的消息丢失。

(enable.auto.commit:true)+(auto.offset.reset:earliest)+(auto.commit.interval.ms) :

在Broker到Consumer之间实现了精确一次语义,因为使用了Kafka提供的自动保存offset功能,当应用程序第一次启动时,首先从Topic的初试位置来获取消息,原有的消息一个都没有丢失;紧接着,在auto.commit.interval.ms时间后,Kafka会使用coordinator协议commit当前的offset(topic的每个分区的offset)。当应用程序因为故障失败,或者是人为的停止,再次启动应用程序时,都会从coordinator模块获取Topic的offset,从上一次消费结束的位置继续消费,所以不会重复消费已经消费过的消息,也不会丢失在应用程序停止期间新产生的消息,做到了Broker到Consumer之间精确一次的传递。

下面是Kafka 0.10.2 ConsumerCoordinator.java的源码片段,用户配置enable.auto.commit:true对应的代码是autoCommitEnabled为true,最终调用doAutoCommitOffsetsAsync,使用coordinator协议保存offset(注意,最新版本已经和zookeeper解耦,不会把offset保存在zookeeper中,所以通过zkCli.sh是看不到相关topic的)

下面是实现的Spark Streaming代码。

当然,这还远远不够,因为这样的方式,会出现业务两段性的后果:

1,读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once;

2,读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。

所以,要想实现端到端消息的精确一次消费,还需要耐心往后看。

3,Spark官方提供的多种消费保证:(基于spark-streaming-kafka-0-10_2.11-2.1.0.jar,相比前一个版本有很多改变)

CheckPoint:

通过设置Driver程序的checkpoint,来保存topic offset。这种方法很简单,但是缺陷也很大:应用程序有改变时,无法使用原来的checkpoint来恢复offset;只能满足Broker到Consumer之间精确一次的传递。

当应用程序第一次启动时,首先从Topic的初试位置来获取消息,原有的消息一个都没有丢失;紧接着,在batch时间到达后,Spark会使用checkpoint保存当前的offset(topic的每个分区的offset)。当应用程序失败或者人为停止后,再次启动应用程序时,都会从checkpoint恢复Topic的offset,从上一次消费结束的位置继续消费,所以不会重复消费已经消费过的消息,也不会丢失在应用程序停止期间新产生的消息。

实现的Spark Streaming代码如下(注意:Spark 1.6.3之后,检查checkpoint的实现已经不在用JavaStreamingContextFactory工厂操作了,请细看我的代码是怎么做的)

Kafka itself:

和前面提到的enable.auto.commit:true异曲同工,不过这里用commitAsync方法异步的把offset提交给Kafka 。当应用程序第一次启动时,首先从Topic的初试位置来获取消息,原有的消息一个都没有丢失;紧接着,用commitAsync方法异步的把offset提交给Kafka(topic的每个分区的offset)。当应用程序失败或者人为停止后,再次启动应用程序时,都会从kafka恢复Topic的offset,从上一次消费结束的位置继续消费,所以不会重复消费已经消费过的消息,也不会丢失在应用程序停止期间新产生的消息。

与checkpoint相比,应用程序代码的更改不会影响offset的存储和获取。然而,这样的操作不是事务性的,由于是异步提交offset,当提交offset过程中应用程序crash,则无法保存正确的offset,会导致消息丢失或者重复消费。

实现的Spark Streaming代码如下:

Your own data store:(当当当当,好戏出场)

如果要做到消息端到端的Exactly once消费,就需要事务性的处理offset和实际操作的输出。

经典的做法让offset和操作输出存在同一个地方,会更简洁和通用。比如,consumer把最新的offset和加工后的数据一起写到HBase中,那就可以保证数据的输出和offset的更新要么都成功,要么都失败,间接实现事务性,最终做到消息的端到端的精确一次消费。(新版本的官网中只字未提使用Zookeeper保存offset,是有多嫌弃😁)

实现的Spark Streaming代码如下(ConsumerRecord类不能序列化,使用时要注意,不要分发该类到其他工作节点上,避免错误打印)

其实说白了,官方提供的思路就是,把JavaInputDStream转换为OffsetRange对象,该对象具有topic对应的分区的所有信息,每次batch处理完,Spark Streaming都会自动更新该对象,所以你只需要找个合适的地方保存该对象(比如HBase、HDFS),就可以愉快的操纵offset了。

4,相关链接

本文实现的精确一次消费的Java源代码

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

Kafka 0.10.2 Documentation

(如需转载,请标明作者和出处)

相关文章

网友评论

  • f1c9bdf49821:特地登陆上来评论,解惑了,谢谢。我还说怎么0.10的没有kafkaCLuster了呢
  • dd9e42924b74:如何一个batch中同一份数据需要进行多个job操作,每个job又都有输出操作该如何做到精确一次呢?
  • lishiyu_cn:[ERROR] 2018-04-17 15:32:49,785 method:org.apache.spark.internal.Logging$class.logError(Logging.scala:70)
    group.id is null, you should probably set it
    Exception in thread "main" java.lang.Exception: ============()
    还是获取不到kafka中的消息
  • 484d72eac86b:数据处理写入存储介质成功, offset 写入hbase也有可能出现问题, 也不能精确一次
    8db017a311da:作者的意思为把结果数据和offset写入hbase打包在一个事务里进行提交
  • b67a1ab502ad:作者你好,请问,kafka0.10没有在zk上存储 consumer offset 信息,那么我想在不修改应用程序的情况下,修改 offset(如重跑前100条记录)应如何做呢?
  • 97e2cf8d9d84:Hbase的行键设计的有问题吧,如果多个消费者组消费这个数据,那么你之前的消费者组消费的数据偏移量就被覆盖了,得加上groupid,不然没法唯一标识
    b85451aadbe5:感觉用mysql存offset就挺好,用scalikeJDBC也比较方便
  • c9b9398e3996:Kafka itself:

    和前面提到的enable.auto.commit:true异曲同工,不过这里用commitAsync方法异步的把offset提交给Kafka 。当应用程序第一次启动时,首先从Topic的初试位置来获取消息,原有的消息一个都没有丢失;紧接着,用commitAsync方法异步的把offset提交给Kafka(topic的每个分区的offset)。当应用程序失败或者人为停止后,再次启动应用程序时,都会从kafka恢复Topic的offset,从上一次消费结束的位置继续消费,所以不会重复消费已经消费过的消息,也不会丢失在应用程序停止期间新产生的消息。

    与checkpoint相比,应用程序代码的更改不会影响offset的存储和获取。然而,这样的操作不是事务性的,由于是异步提交offset,当提交offset过程中应用程序crash,则无法保存正确的offset,会导致消息丢失或者重复消费。

    l大佬大佬 那这个异步提交的方式到底能还是不能保证exactly once啊啊啊 小弟要爆炸了了了。。。
    b85451aadbe5:不行,前面说的都是铺垫,后面按照官网的处理思路才是正道,将提交偏移量和数据写出放到一个事务中才可以。
  • c9b9398e3996:老哥有没有保存到zk上面的 kafka010版本的 或者有什么文章推荐也可以 链接什么的 万分感谢
  • 9f93f43cd8cf:JavaDStream<String> jpds = stream.map(
    new Function<ConsumerRecord<String, String>, String>() {
    @Override
    public String call(ConsumerRecord<String, String> record) {
    System.out.println(record.value());
    return record.value();
    }
    });
    为什么这个打印不出来呢
    b85451aadbe5:可以用log方式打印出来
    97e2cf8d9d84:@狗蛋闷 你这个打印代码写在逻辑里面了,而这段逻辑又是在excutor端执行的,你去UI界面的executor的stdout就能看到了
  • 9f93f43cd8cf:没有porm吗,这个最重要了,光一个代码跑不起来呀
    俺是亮哥:@狗蛋闷 不行吧,只有2.x的spark版本的streaming才支持kafka0.10的api
    9f93f43cd8cf:windows上开发用的spark-streaming_2.10的2.1.0,集群上装的是spark 1.6环境,这样可以跑吗?
    俺是亮哥:当时直接idea用组件本身的lib或jars目录下jar包,没有单独写pom.xml,那几个import的包都是常用的,你从官网用例的pom.xml抠一下吧
  • bf3fb1672aac:文章写的不错,大致都是翻译官网的内容,还是给了自己的例子。看了最后一个exactly once例子,发现楼主这个貌似也做不到。怎么在一个hbase的put操作中进行不同表的数据插入?
    假设确实可以做到,在存在多个输出操作时,怎么决定offset和哪一个的输出操作进行事物操作?
    e826090df225:@俺是亮哥 如果输出结果和offset都写入同一张表,offset 写成某个列,streaming 怎么获取上次读到哪里了?
    俺是亮哥:假设输出结果并同时更新offset没有办法做到事务操作,那么就考虑设计成幂等吧。其实可以把offset作为列族中某一列,这样用一张表就可以了。
  • b4b615e03d68:之前就是kafka0.10的版本使用时接受不到数据 , 现在看了这篇文章应该就能解决问题咯

本文标题:(基于最新的Kafka version 0.10.2 new

本文链接:https://www.haomeiwen.com/subject/uultzttx.html