环境准备
1.启动hadoop
2.启动zookeeper
3.启动kafka
4.pom.xml中添加依赖
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>
设置日志
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
/**
* 设置日志
*/
object StreamingExamples extends Logging{
def setStreamingLogLeaves(): Unit ={
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
/**
* 只显示ERROR及以上的日志
*/
Logger.getLogger("org").setLevel(Level.ERROR)
if(!log4jInitialized){
logInfo("Setting log level to [WARN] for streaming example." + "To override add a custom log4j.properties to the classpath.")
}
}
}
产生Kafka数据
import java.util
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.util.Random
/**
* 产生数据
*/
object KafkaWordProducer {
def main(args: Array[String]): Unit = {
// kafka服务器地址
val brokers = "localhost:9092"
// kafka topic
val topic = "wordSender"
// 每秒发送三次消息
val messagesPerSec = 3
// 每次发送五个数字
val wordsPerMessage = 5
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
while (true){
(1 to messagesPerSec.toInt).foreach{
messageNum => val str = (1 to wordsPerMessage.toInt).map(x => Random.nextInt(30).toString).mkString(" ")
println(str)
val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
}
Thread.sleep(1000)
}
}
}
WordCount
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaWordCount {
def main(args: Array[String]): Unit = {
StreamingExamples.setStreamingLogLeaves()
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
// spark streaming设置,第二个参数为微批处理的时间间隔,后面的时间窗口与移动时间要为此时间倍数
val ssc = new StreamingContext(sc, Seconds(1))
// 设置checkpoint的目录,spark默认为hdfs文件,在用户目录下可省略写'checkpoint',本地文件写file:///...
ssc.checkpoint("hdfs://localhost:9000/user/ghost/checkpoint")
// zookeeper服务地址
val zkQuorum = "localhost:2181"
// topic所在的group,可以设置为自己想要的名称
val group = "test-consumer-group"
// topics的名称
val topics = "wordSender"
val numThreads = 1
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x, 1))
// 此重载函数复用过去计算过的值,提高效率,第一个时间为处理的时间窗口,第二个时间为窗口移动间隔
val wordCount = pair.reduceByKeyAndWindow(_+_, _-_, Seconds(3), Seconds(1), 2)
wordCount.print
ssc.start
ssc.awaitTermination
}
}








网友评论