美文网首页
Spark Streaming + Kafka WordCoun

Spark Streaming + Kafka WordCoun

作者: 请不要问我是谁 | 来源:发表于2019-03-26 21:56 被阅读0次

环境准备

1.启动hadoop
2.启动zookeeper
3.启动kafka
4.pom.xml中添加依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>
</project>

设置日志

import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging

/**
  * 设置日志
  */
object StreamingExamples extends Logging{
  def setStreamingLogLeaves(): Unit ={
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements

    /**
      * 只显示ERROR及以上的日志
      */
    Logger.getLogger("org").setLevel(Level.ERROR)
    if(!log4jInitialized){
      logInfo("Setting log level to [WARN] for streaming example." + "To override add a custom log4j.properties to the classpath.")
    }
  }
}

产生Kafka数据

import java.util

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import scala.util.Random

/**
  * 产生数据
  */
object KafkaWordProducer {
  def main(args: Array[String]): Unit = {
    // kafka服务器地址
    val brokers = "localhost:9092"
    // kafka topic
    val topic = "wordSender"
    // 每秒发送三次消息
    val messagesPerSec = 3
    // 每次发送五个数字
    val wordsPerMessage = 5
    val props = new util.HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    while (true){
      (1 to messagesPerSec.toInt).foreach{
        messageNum => val str = (1 to wordsPerMessage.toInt).map(x => Random.nextInt(30).toString).mkString(" ")
          println(str)
          val message = new ProducerRecord[String, String](topic, null, str)
          producer.send(message)
      }
      Thread.sleep(1000)
    }
  }
}

WordCount

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaWordCount {
  def main(args: Array[String]): Unit = {
    StreamingExamples.setStreamingLogLeaves()
    val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    // spark streaming设置,第二个参数为微批处理的时间间隔,后面的时间窗口与移动时间要为此时间倍数
    val ssc = new StreamingContext(sc, Seconds(1))
    // 设置checkpoint的目录,spark默认为hdfs文件,在用户目录下可省略写'checkpoint',本地文件写file:///...
    ssc.checkpoint("hdfs://localhost:9000/user/ghost/checkpoint")
    // zookeeper服务地址
    val zkQuorum = "localhost:2181"
    // topic所在的group,可以设置为自己想要的名称
    val group = "test-consumer-group"
    // topics的名称
    val topics = "wordSender"
    val numThreads = 1
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lineMap = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
    val lines = lineMap.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val pair = words.map(x => (x, 1))
    // 此重载函数复用过去计算过的值,提高效率,第一个时间为处理的时间窗口,第二个时间为窗口移动间隔
    val wordCount = pair.reduceByKeyAndWindow(_+_, _-_, Seconds(3), Seconds(1), 2)
    wordCount.print
    ssc.start
    ssc.awaitTermination
  }
}

相关文章

网友评论

      本文标题:Spark Streaming + Kafka WordCoun

      本文链接:https://www.haomeiwen.com/subject/uvlovqtx.html