Spark Streaming&Flume&Ka

作者: 董二弯 | 来源:发表于2019-05-28 22:14 被阅读4次

前几章一起学习了Spark Streaming整合FlumeSpark Streaming整合Kafka。这一章一起学习三者的整合搭建一个流处理平台环境。整体数据流向和处理流程如下:

image.png

整合日志输出到Flume

  • Flume agent的配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.30.131
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注意:这里sink的类型为logger,在测试log4j能够把日志输入到flume后在把sink改为KafkaSink。在开发中做一步测一步是较好的习惯,如果串联了整个流程测试,出现了问题不方便定位。

启动agent
flume-ng agent --name a1 --conf ../conf --conf-file ../conf/log4j_to_flume.conf -Dflume.root.logger=INFO,console
  • log4j配置
    添加pom依赖整合flume
<dependency>
            <groupId>org.apache.flume.flume-ng-clients</groupId>
            <artifactId>flume-ng-log4jappender</artifactId>
            <version>1.9.0</version>
        </dependency>

配置

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n


log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=192.168.30.131
log4j.appender.flume.Port=41414
log4j.appender.flume.UnsafeMode=true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout

此处的Hostname和Port要和flume agent配置中的相同

  • 模拟日志产生
public class LoggerGenerator2 {

    private static Logger logger = Logger.getLogger(LoggerGenerator2.class.getName());

    public static void main(String[] args) throws Exception{
        int index = 0;
        while(true) {
            Thread.sleep(1000);
            logger.info("value : " + index++);
        }
    }
}

注意
我在启动程序后遇到了以下问题

image.png
经过排查,是因为我的pom文件中引入了spark streaming的依赖包,其中包含了avro的依赖,和flume-ng-log4jappender的avro依赖起了冲突,通过以下方式解决了问题
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
  • 测试
    启动程序,观察agent的客户端,发现可以打印出日志信息,表示log4j到flume的整合成功。

整合Flume到Kafka

这一步把Flume收集的日志输出到Kafka,关键的一步就是把上一把agent的类型配置为KafkaSink

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.30.131
a1.sources.r1.port = 41414

# Describe the sink

#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.brokerList=192.168.30.131:9092
#设置Kafka的Topic
a1.sinks.k1.topic=kafka_spark
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#一批中要处理的消息数。较大批量可提高吞吐量,同时增加延迟
a1.sinks.k1.batchSize = 3
a1.sinks.k1.requiredAcks = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动kafka环境,并启动一个消费者。启动日志模拟生成的程序,观察消费者客户端,出现以下内容说明日志已经输出到了Kafka。


image.png

整合Kafka到Spark Streaming

Spark Streaming整合Kafka已经做了详细介绍,这里简单回顾
核心pom

 <!-- Spark Streaming 依赖-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
 <!-- Spark Streaming 集成 kafka-->
 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>2.4.3</version>
</dependency>

scala程序

object Test {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("DirectKafka")
      .setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(2))

    val topicsSet = Array("kafka_spark")
    val kafkaParams = mutable.HashMap[String, String]()
    //必须添加以下参数,否则会报错
    kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
    kafkaParams.put("group.id", "group1")
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
      )
    )
    // 业务逻辑处理,这里为简单打印
   val lines = messages.map(_.value)
    lines.print()
    //开始计算
    ssc.start()
    ssc.awaitTermination()
  }
}

先根据以上流程,启动Kafka环境、agent、模拟日志生成程序。启动Spark Streaming程序,观察控制台


image.png

根据百度查询添加以下配置

val conf = new SparkConf().setAppName("SparkHistoryTags").setMaster("local")
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

重新启动


image.png

成功

Spark Streaming对接收到的数据进行处理

根据以上步骤,已经成功的把日志信息输出到了spark streaming。此时可以通过Spark Streaming对接收到的数据进行处理。把处理后的结果保存到关系型数据库(如MySQL)。在通过Javaweb程序把结果可视化即可。后面的文章将一起学习如果使用Spark Streaming对日志进行处理。

相关文章

  • Spark Streaming&Flume&Ka

    前几章一起学习了Spark Streaming整合Flume,Spark Streaming整合Kafka。这一章...

  • 大数据处理的平台之争

    大数据圈过去五年不断涌现很多优秀的平台和产品。。。而真正起飞的是Spark和Kafka,这也是为何Spark和Ka...

  • Flink 读取Kafka数据

    一直以来都是使用spark structstreaming 读取kafka数据,最近一直在尝试用flink读取ka...

  • kafka 基础知识整理(三)kafka + spark str

    Kafka为一个分布式的消息队列,spark流操作kafka有两种方式:一种是利用接收器(receiver)和ka...

  • 完整集群搭建,hadoop,spark,zookeeper,ka

    环境变量配置 在根目录下新建一个soft 文件夹 以下为安装包,全部放到该文件夹下,解压,并软连接0 jdk-8u...

  • 篇五|ClickHouse数据导入(Flink、Spark、Ka

    本文分享主要是ClickHouse的数据导入方式,本文主要介绍如何使用Flink、Spark、Kafka、MySQ...

  • 五十音第二课

    子音:k音 ka ki ku ke ko 颜文字好好玩 练习:ka ke ki ka ko ku ka ke ki...

  • KA

    我花了很长的时间才发现,我喜欢的是剧中的KA,不是演他们的人,而是演他们的角色。 故事其实特别简单,就是开头小冤家...

  • ka

    啊啊啊 居然没有办法进行复制打卡了! 这是什么啊啊啊 居然没有办法进行复制打卡了! 这是什么情况 啊啊啊 居然没有...

  • 2017-11-01

    1 .humankind n. [U] / ‚hjuːmən'kaɪnd ; ˋhjumən͵kaɪnd / ...

网友评论

    本文标题:Spark Streaming&Flume&Ka

    本文链接:https://www.haomeiwen.com/subject/chaftctx.html