val sparkConf = new SparkConf().setMaster("local[*]").setAppName("File - RDD")
val sc = new SparkContext(sparkConf)
val source: RDD[String] = sc.makeRDD(List("a","a","b","e","e","e","b","c","c","b","d","c","d"))
元数据
第一种
val tuples: Array[(String, Int)] = source.groupBy(data => data).map {
case (k, v) => {
(k, v.size)
}
}.collect()
println(tuples.mkString(" , "))
sc.stop
第二种
val tuples1: Array[(String, Int)] = source.map((_,1)).reduceByKey(_+_).collect()
println(tuples1.mkString(" , "))
sc.stop
第三种
val tuples2: Array[(String, Int)] = source.groupBy(data => data).mapValues(_.size) .collect()
println(tuples2.mkString(" , "))
sc.stop
第四种
val tuples3: Array[(String, Int)] = source.map( data => (data,1)).aggregateByKey(0)(_+_,_+_).collect()
println(tuples3.mkString(" , "))
sc.stop
第五种
val tuples4: Array[(String, Int)] = source.map( data => (data ,1)).foldByKey(0)(_+_).collect()
println(tuples4.mkString(" , "))
sc.stop
第六种
val tuples4: Array[(String, Int)] = source.map(data => (data, 1)).combineByKey(
k => k,
(k: Int, v: Int) => {
k + v
}
, (k: Int, v: Int) => {
k + v
}).collect()
println(tuples4.mkString(" , "))
第七种
val stringToLong: collection.Map[String, Long] = source.map( (_ , 1)).countByKey()
val list: List[(String, Long)] = stringToLong.toList
println(list)
第八种
val stringToLong: collection.Map[String, Long] = source.countByValue()
val list: List[(String, Long)] = stringToLong.toList
println(list)
第九种
val value: RDD[(String, Iterable[Int])] = source.map((_, 1)).groupBy(_._1).map {
case (k, iter) => {
(k, iter.map {
case (kk, num) => num
})
}
}
val value1: RDD[(String, Int)] = value.map {
case (k, v) => (k, v.sum)
}
println(value1.collect().mkString(" , "))
第十种
val value: RDD[(String, Iterable[Int])] = source.map((_,1)).groupByKey()
val value1: RDD[(String, Int)] = value.map {
case (k, v) => (k, v.sum)
}
println(value1.collect().mkString(" , "))
第十一种
val mapRDD: RDD[Map[String, Int]] = source.map(word=>Map[String, Int]((word,1)))
val map = mapRDD.reduce(
( map1, map2 ) => {
map1.foldLeft(map2)(
( map, kv ) => {
val word = kv._1
val count = kv._2
map.updated( word, map.getOrElse(word, 0) + count )
}
)
}
)
println(map)
第十二种
val mapRDD: RDD[Map[String, Int]] = source.map(word=>Map[String, Int]((word,1)))
val map = mapRDD.fold( Map[String, Int]() )(
( map1, map2 ) => {
map1.foldLeft(map2)(
( map, kv ) => {
val word = kv._1
val count = kv._2
map.updated( word, map.getOrElse(word, 0) + count )
}
)
}
)
println(map)
第十三种
val map = rdd.aggregate(Map[String, Int]())(
(map, k) => {
map.updated( k, map.getOrElse(k, 0) + 1 )
},
( map1, map2 ) => {
map1.foldLeft(map2)(
( map, kv ) => {
val word = kv._1
val count = kv._2
map.updated( word, map.getOrElse(word, 0) + count )
}
)
}
)
println(map)
第十四种 累加器
object Test8 {
def main(args: Array[String]): Unit = {
//构建配置对象
val sparkconfig: SparkConf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
//启动Spark环境, 构建上下文对象
val context = new SparkContext(sparkconfig)
val value: RDD[String] = context.makeRDD(List("a","a","b","e","e","e","b","c","c","b","d","c","d"))
val wordAcc=new WordCountAccumulator()
context.register(wordAcc,"wordAcc-keke")
value.foreach(
data =>
wordAcc.add(data)
)
println(wordAcc.map)
context.stop()
}
}
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]]{
var map : mutable.Map[String, Long] = mutable.Map()
override def isZero: Boolean = {
map.isEmpty
}
override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {
new WordCountAccumulator
}
override def reset(): Unit = {
map.clear()
}
override def add(word: String): Unit = {
map(word) = map.getOrElse(word, 0L) + 1L
}
override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {
val map1 = map
val map2 = other.value
map = map1.foldLeft(map2)(
( innerMap, kv ) => {
innerMap(kv._1) = innerMap.getOrElse(kv._1, 0L) + kv._2
innerMap
}
)
}
override def value: mutable.Map[String, Long] = map
}
网友评论