美文网首页
spark wordcount 例子写法

spark wordcount 例子写法

作者: 无来无去_A | 来源:发表于2020-06-08 13:46 被阅读0次
 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("File - RDD")
    val sc = new SparkContext(sparkConf)
 val source: RDD[String] = sc.makeRDD(List("a","a","b","e","e","e","b","c","c","b","d","c","d"))

元数据

第一种

    val tuples: Array[(String, Int)] = source.groupBy(data => data).map {
      case (k, v) => {
        (k, v.size)
      }
    }.collect()

    println(tuples.mkString(" , "))
 sc.stop

第二种

       val tuples1: Array[(String, Int)] = source.map((_,1)).reduceByKey(_+_).collect()

        println(tuples1.mkString(" , "))
 sc.stop

第三种

    val tuples2: Array[(String, Int)] = source.groupBy(data => data).mapValues(_.size) .collect()

   println(tuples2.mkString(" , "))
 sc.stop

第四种

      val tuples3: Array[(String, Int)] = source.map( data => (data,1)).aggregateByKey(0)(_+_,_+_).collect()

    println(tuples3.mkString(" , "))
 sc.stop

第五种

    val tuples4: Array[(String, Int)] = source.map( data => (data ,1)).foldByKey(0)(_+_).collect()

       println(tuples4.mkString(" , "))
 sc.stop

第六种

   val tuples4: Array[(String, Int)] = source.map(data => (data, 1)).combineByKey(
    k => k,
    (k: Int, v: Int) => {
       k + v
      }

     , (k: Int, v: Int) => {
       k + v
     }).collect()


    println(tuples4.mkString(" , "))

第七种

   val stringToLong: collection.Map[String, Long] = source.map( (_ , 1)).countByKey()

  val list: List[(String, Long)] = stringToLong.toList

   println(list)

第八种

   val stringToLong: collection.Map[String, Long] = source.countByValue()

  val list: List[(String, Long)] = stringToLong.toList

    println(list)

第九种

    val value: RDD[(String, Iterable[Int])] = source.map((_, 1)).groupBy(_._1).map {
      case (k, iter) => {
        (k, iter.map {
          case (kk, num) => num
        })
      }
    }
    val value1: RDD[(String, Int)] = value.map {
      case (k, v) => (k, v.sum)
    }

    println(value1.collect().mkString(" , "))

第十种

    val value: RDD[(String, Iterable[Int])] = source.map((_,1)).groupByKey()

    val value1: RDD[(String, Int)] = value.map {
      case (k, v) => (k, v.sum)
    }
    println(value1.collect().mkString(" , "))

第十一种

    val mapRDD: RDD[Map[String, Int]] = source.map(word=>Map[String, Int]((word,1)))

        val map = mapRDD.reduce(
            ( map1, map2 ) => {
                map1.foldLeft(map2)(
                    ( map, kv ) => {
                        val word = kv._1
                        val count = kv._2
                        map.updated( word, map.getOrElse(word, 0) + count )
                    }
                )
            }
        )
        println(map)

第十二种

val mapRDD: RDD[Map[String, Int]] = source.map(word=>Map[String, Int]((word,1)))
        val map = mapRDD.fold( Map[String, Int]() )(
            ( map1, map2 ) => {
                map1.foldLeft(map2)(
                    ( map, kv ) => {
                        val word = kv._1
                        val count = kv._2
                        map.updated( word, map.getOrElse(word, 0) + count )
                    }
                )
            }
        )
        println(map)
 

第十三种

        val map = rdd.aggregate(Map[String, Int]())(
            (map, k) => {
                map.updated( k, map.getOrElse(k, 0) + 1 )
            },
            ( map1, map2 ) => {
                map1.foldLeft(map2)(
                    ( map, kv ) => {
                        val word = kv._1
                        val count = kv._2
                        map.updated( word, map.getOrElse(word, 0) + count )
                    }
                )
            }
        )
        println(map)

第十四种 累加器

object Test8 {

  def main(args: Array[String]): Unit = {
    //构建配置对象
    val  sparkconfig: SparkConf = new SparkConf().setMaster("local[2]").setAppName("wordCount")

    //启动Spark环境, 构建上下文对象
    val context = new SparkContext(sparkconfig)

    val value: RDD[String] = context.makeRDD(List("a","a","b","e","e","e","b","c","c","b","d","c","d"))


    val wordAcc=new WordCountAccumulator()

    context.register(wordAcc,"wordAcc-keke")

    value.foreach(
      data =>
        wordAcc.add(data)
    )
    println(wordAcc.map)


    context.stop()

  }

}


class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{

  var map : mutable.Map[String, Long] = mutable.Map()

  override def isZero: Boolean = {
    map.isEmpty
  }

  override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
    new WordCountAccumulator
  }

  override def reset(): Unit = {
    map.clear()
  }

  override def add(word: String): Unit = {

    map(word) = map.getOrElse(word, 0L) + 1L
  }

  override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {

    val map1 = map
    val map2 = other.value

    map = map1.foldLeft(map2)(
      ( innerMap, kv ) => {
        innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
        innerMap
      }
    )
  }

  override def value: mutable.Map[String, Long] = map
}

相关文章

网友评论

      本文标题:spark wordcount 例子写法

      本文链接:https://www.haomeiwen.com/subject/migltktx.html