Top10 热门品类中 Top10 活跃 Session 统计
一、需求分析

二、代码实现
1)创建样例类Datas.scala
package com.atguigu.sparkmall.common.model
// 用户访问动作对象
case class UserVisitAction(date: String,
user_id: String,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id:String
)
//创建CategoryTop10的样例类
case class CategoryTop10 ( taskId:String, categoryId:String, clickCount:Long, orderCount:Long, payCount:Long )
case class CategoryTop10SessionTop10(taskId :String, categoryId :String, sessionId :String, clickCount :Long)
2)主程序
package com.atguigu.sparkmall.offline
/**
* Top10 热门品类中 Top10 活跃 Session 统计
*/
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.UUID
import com.atguigu.sparkmall.common.model.{CategoryTop10, CategoryTop10SessionTop10, UserVisitAction}
import com.atguigu.sparkmall.common.util.StringUtil
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.{immutable, mutable}
object Req2CategoryTop10SessionTop10Application {
def main(args: Array[String]): Unit = {
//需求一:获取点击、下单和支付数量排名前10的品类
val conf = new SparkConf().setMaster("local[*]").setAppName("Req1CategoryTop10Application")
val sc = new SparkContext(conf)
//从文件中获取原始数据
val lineDataRDD: RDD[String] = sc.textFile("input/user_visit_action.csv")
//创建累加器
val accumulator = new CategoryAccumulator
//注册累加器
sc.register(accumulator,"accumulator")
//使用累加器
lineDataRDD.foreach{
line => {
val datas: Array[String] = line.split(",")
if(datas(6) != -1) {
//点击数据
accumulator.add(datas(6)+"_click")
} else if(StringUtil.isNotEmpty(datas(8))){
//下单数据
val categoryIds: Array[String] = datas(8).split("-")
categoryIds.map{
id => {
accumulator.add(id + "_order")
}
}
} else if (StringUtil.isNotEmpty(datas(10))) {
//支付场合
val categoryIds: Array[String] = datas(10).split("-")
categoryIds.map{
id => {
accumulator.add(id + "_pay")
}
}
}
}
}
//获取累加器的值(category-click,sum)
val accumulatorVal: mutable.HashMap[String, Long] = accumulator.value
//将累加器的值根据品类Category分组
val categoryToMap: Map[String, mutable.HashMap[String, Long]] = accumulatorVal.groupBy {
case (k, sum) => {
k.split("_")(0)
}
}
val taskId = UUID.randomUUID().toString
//将分组后的数据转换为样例类:CategoryTop10
val categoryTop10: immutable.Iterable[CategoryTop10] = categoryToMap.map {
case (category, map) => {
CategoryTop10(
taskId,
category,
map.getOrElse(category + "_click", 0L),
map.getOrElse(category + "_order", 0L),
map.getOrElse(category + "_pay", 0L)
)
}
}
//排序,取top10
val result: List[CategoryTop10] = categoryTop10.toList.sortWith {
(left, right) => {
if (left.clickCount > right.clickCount) {
true
} else if (left.clickCount == right.clickCount) {
if (left.orderCount > right.orderCount) {
true
} else if (left.orderCount == right.orderCount) {
if (left.payCount > right.payCount) {
true
} else {
false
}
} else {
false
}
} else {
false
}
}
}.take(10)
/** 上面是求点击、下单、支付Top10*/
//对原始日志数据进行筛选过滤
//将原始日志数据转换为样例类UserVisitAction
val actionRDD: RDD[UserVisitAction] = lineDataRDD.map {
line => {
val datas: Array[String] = line.split(",")
UserVisitAction(
datas(0),
datas(1),
datas(2),
datas(3).toLong,
datas(4),
datas(5),
datas(6).toLong,
datas(7).toLong,
datas(8),
datas(9),
datas(10),
datas(11),
datas(12)
)
}
}
//获取top10品类的categoryId
val ids: List[String] = result.map {
top => {
top.categoryId
}
}
//广播变量
/** 将ids集合发往Excutor端*/
val broadcastIds: Broadcast[List[String]] = sc.broadcast(ids)
//对无用数据进行过滤,我们只需要在top10品类中的数据
val filterRDD: RDD[UserVisitAction] = actionRDD.filter {
action => {
if (action.click_category_id == -1) {
false
} else {}
broadcastIds.value.contains(action.click_category_id.toString)
}
}
//对过滤后的数据进行转换:(click_category_id-session_id,1)
val mapRDD: RDD[(String, Long)] = filterRDD.map {
action => {
(action.click_category_id + "_" + action.session_id, 1L)
}
}
//将数据进行统计:((click_category_id-session_id, 1) => (click_category_id-session_id,sum)
val reduceRDD: RDD[(String, Long)] = mapRDD.reduceByKey(_+_)
//统计结果进行结构转换:(click_category_id-session_id,sum)=>(category,(session,sum))
val mapRDD1: RDD[(String, (String, Long))] = reduceRDD.map {
case (k, sum) => {
val ks: Array[String] = k.split("_")
(ks(0), (ks(1), sum))
}
}
//根据品类分组(category,Iterator[(session,sum)])
val groupRDD: RDD[(String, Iterable[(String, Long)])] = mapRDD1.groupByKey()
//将分组后的数据进行排序,取top10
val resultRDD: RDD[(String, List[(String, Long)])] = groupRDD.mapValues {
datas => {
datas.toList.sortWith {
(left, right) => {
left._2 > right._2
}
}.take(10)
}
}
val resultMapRDD: RDD[List[CategoryTop10SessionTop10]] = resultRDD.map {
case (category, list) => {
list.map {
case (sessionid, sum) => {
CategoryTop10SessionTop10(taskId, category, sessionid, sum)
}
}
}
}
val realResultRDD: RDD[CategoryTop10SessionTop10] = resultMapRDD.flatMap(list => list)
/* 测试
println(realResultRDD.count())
realResultRDD.foreach(println)*/
//写入mysql中保存
realResultRDD.foreachPartition(datas => {
/** 因为这里调用的是SparkRDD的算子,执行在Excutor端,所以jdbc的创建需要在Excutor端 */
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop102:3306/sparkmall190311"
val userName = "root"
val passWd = "111111"
Class.forName(driver)
val connection: Connection = DriverManager.getConnection(url,userName,passWd)
val sql = "insert into category_top10_session_count(taskId, categoryId, sessionId, clickCount) values (?,?,?,?)"
val preparedStatement: PreparedStatement = connection.prepareStatement(sql)
datas.foreach{
obj => {
preparedStatement.setObject(1,obj.taskId)
preparedStatement.setObject(1,obj.categoryId)
preparedStatement.setObject(1,obj.sessionId)
preparedStatement.setObject(1,obj.clickCount)
preparedStatement.executeUpdate()
}
}
preparedStatement.close()
connection.close()
})
// 释放资源
sc.stop()
}
}
网友评论