美文网首页
5.1. 需求2分析和实现

5.1. 需求2分析和实现

作者: yayooo | 来源:发表于2019-08-04 15:46 被阅读0次

Top10 热门品类中 Top10 活跃 Session 统计

一、需求分析


数据流转换

二、代码实现

1)创建样例类Datas.scala

package com.atguigu.sparkmall.common.model

// 用户访问动作对象
case class UserVisitAction(date: String,
                           user_id: String,
                           session_id: String,
                           page_id: Long,
                           action_time: String,
                           search_keyword: String,
                           click_category_id: Long,
                           click_product_id: Long,
                           order_category_ids: String,
                           order_product_ids: String,
                           pay_category_ids: String,
                           pay_product_ids: String,
                           city_id:String
                          )
//创建CategoryTop10的样例类
case class CategoryTop10 ( taskId:String, categoryId:String, clickCount:Long, orderCount:Long, payCount:Long )

case class CategoryTop10SessionTop10(taskId :String, categoryId :String, sessionId :String, clickCount :Long)

2)主程序

package com.atguigu.sparkmall.offline

/**
  * Top10 热门品类中 Top10 活跃 Session 统计
  */

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.UUID

import com.atguigu.sparkmall.common.model.{CategoryTop10, CategoryTop10SessionTop10, UserVisitAction}
import com.atguigu.sparkmall.common.util.StringUtil
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.{immutable, mutable}


object Req2CategoryTop10SessionTop10Application {
  def main(args: Array[String]): Unit = {
    //需求一:获取点击、下单和支付数量排名前10的品类

    val conf = new SparkConf().setMaster("local[*]").setAppName("Req1CategoryTop10Application")
    val sc = new SparkContext(conf)

    //从文件中获取原始数据
    val lineDataRDD: RDD[String] = sc.textFile("input/user_visit_action.csv")

    //创建累加器
    val accumulator = new CategoryAccumulator
    //注册累加器
    sc.register(accumulator,"accumulator")


    //使用累加器
    lineDataRDD.foreach{
      line => {
        val datas: Array[String] = line.split(",")
        if(datas(6) != -1) {
          //点击数据
          accumulator.add(datas(6)+"_click")
        } else if(StringUtil.isNotEmpty(datas(8))){
          //下单数据
          val categoryIds: Array[String] = datas(8).split("-")
          categoryIds.map{
            id => {
              accumulator.add(id + "_order")
            }
          }
        } else if (StringUtil.isNotEmpty(datas(10))) {
          //支付场合
          val categoryIds: Array[String] = datas(10).split("-")
          categoryIds.map{
            id => {
              accumulator.add(id + "_pay")
            }
          }
        }
      }
    }

    //获取累加器的值(category-click,sum)
    val accumulatorVal: mutable.HashMap[String, Long] = accumulator.value

    //将累加器的值根据品类Category分组
    val categoryToMap: Map[String, mutable.HashMap[String, Long]] = accumulatorVal.groupBy {
      case (k, sum) => {
        k.split("_")(0)
      }
    }

    val taskId = UUID.randomUUID().toString

    //将分组后的数据转换为样例类:CategoryTop10
    val categoryTop10: immutable.Iterable[CategoryTop10] = categoryToMap.map {
      case (category, map) => {
        CategoryTop10(
          taskId,
          category,
          map.getOrElse(category + "_click", 0L),
          map.getOrElse(category + "_order", 0L),
          map.getOrElse(category + "_pay", 0L)
        )
      }
    }


    //排序,取top10
    val result: List[CategoryTop10] = categoryTop10.toList.sortWith {
      (left, right) => {
        if (left.clickCount > right.clickCount) {
          true
        } else if (left.clickCount == right.clickCount) {
          if (left.orderCount > right.orderCount) {
            true
          } else if (left.orderCount == right.orderCount) {
            if (left.payCount > right.payCount) {
              true
            } else {
              false
            }
          } else {
            false
          }
        } else {
          false
        }
      }
    }.take(10)
/** 上面是求点击、下单、支付Top10*/

    //对原始日志数据进行筛选过滤
    //将原始日志数据转换为样例类UserVisitAction
    val actionRDD: RDD[UserVisitAction] = lineDataRDD.map {
      line => {
        val datas: Array[String] = line.split(",")
        UserVisitAction(
          datas(0),
          datas(1),
          datas(2),
          datas(3).toLong,
          datas(4),
          datas(5),
          datas(6).toLong,
          datas(7).toLong,
          datas(8),
          datas(9),
          datas(10),
          datas(11),
          datas(12)

        )
      }
    }

    //获取top10品类的categoryId
    val ids: List[String] = result.map {
      top => {
        top.categoryId
      }
    }

    //广播变量
    /** 将ids集合发往Excutor端*/
    val broadcastIds: Broadcast[List[String]] = sc.broadcast(ids)

    //对无用数据进行过滤,我们只需要在top10品类中的数据
    val filterRDD: RDD[UserVisitAction] = actionRDD.filter {
      action => {
        if (action.click_category_id == -1) {
          false
        } else {}
        broadcastIds.value.contains(action.click_category_id.toString)
      }
    }


    //对过滤后的数据进行转换:(click_category_id-session_id,1)
    val mapRDD: RDD[(String, Long)] = filterRDD.map {
      action => {
        (action.click_category_id + "_" + action.session_id, 1L)
      }
    }

    //将数据进行统计:((click_category_id-session_id, 1) => (click_category_id-session_id,sum)
    val reduceRDD: RDD[(String, Long)] = mapRDD.reduceByKey(_+_)

    //统计结果进行结构转换:(click_category_id-session_id,sum)=>(category,(session,sum))
    val mapRDD1: RDD[(String, (String, Long))] = reduceRDD.map {
      case (k, sum) => {
        val ks: Array[String] = k.split("_")
        (ks(0), (ks(1), sum))
      }
    }

    //根据品类分组(category,Iterator[(session,sum)])
    val groupRDD: RDD[(String, Iterable[(String, Long)])] = mapRDD1.groupByKey()

    //将分组后的数据进行排序,取top10
    val resultRDD: RDD[(String, List[(String, Long)])] = groupRDD.mapValues {
      datas => {
        datas.toList.sortWith {
          (left, right) => {
            left._2 > right._2
          }
        }.take(10)
      }
    }

    val resultMapRDD: RDD[List[CategoryTop10SessionTop10]] = resultRDD.map {
      case (category, list) => {
        list.map {
          case (sessionid, sum) => {
            CategoryTop10SessionTop10(taskId, category, sessionid, sum)
          }
        }
      }
    }

    val realResultRDD: RDD[CategoryTop10SessionTop10] = resultMapRDD.flatMap(list => list)

   /* 测试
   println(realResultRDD.count())
    realResultRDD.foreach(println)*/


    //写入mysql中保存
    realResultRDD.foreachPartition(datas => {
      /** 因为这里调用的是SparkRDD的算子,执行在Excutor端,所以jdbc的创建需要在Excutor端 */
      val driver = "com.mysql.jdbc.Driver"
      val url = "jdbc:mysql://hadoop102:3306/sparkmall190311"
      val userName = "root"
      val passWd = "111111"

      Class.forName(driver)
      val connection: Connection = DriverManager.getConnection(url,userName,passWd)
      val sql = "insert into category_top10_session_count(taskId, categoryId, sessionId, clickCount) values (?,?,?,?)"
      val preparedStatement: PreparedStatement = connection.prepareStatement(sql)


      datas.foreach{
        obj => {
          preparedStatement.setObject(1,obj.taskId)
          preparedStatement.setObject(1,obj.categoryId)
          preparedStatement.setObject(1,obj.sessionId)
          preparedStatement.setObject(1,obj.clickCount)
          preparedStatement.executeUpdate()
        }
      }

      preparedStatement.close()
      connection.close()
    })
    // 释放资源
    sc.stop()
  }
}

相关文章

网友评论

      本文标题:5.1. 需求2分析和实现

      本文链接:https://www.haomeiwen.com/subject/egiddctx.html