大数据教程:Transformation和Action算子演示
一、Transformation算子演示
valconf=newSparkConf().setAppName("Test").setMaster("local")
valsc=newSparkContext(conf)
//通过并行化生成rdd
valrdd=sc.parallelize(List(5,6,4,7,3,8,2,9,10))
//map:对rdd里面每一个元乘以2然后排序
valrdd2:RDD[Int]=rdd.map(_*2)
//collect以数组的形式返回数据集的所有元素(是Action算子)
println(rdd2.collect().toBuffer)
//filter:该RDD由经过func函数计算后返回值为true的输入元素组成
valrdd3:RDD[Int]=rdd2.filter(_>10)
println(rdd3.collect().toBuffer)
valrdd4=sc.parallelize(Array("abc","bcd"))
//flatMap:将rdd4中的元素进行切分后压平
valrdd5:RDD[String]=rdd4.flatMap(_.split(""))
println(rdd5.collect().toBuffer)
//假如:List(List("a,b","bc"),List("ec","io"))
//压平flatMap(_.flatMap(_.split("")))
//sample随机抽样
//withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样
//fraction抽样比例例如30%即0.3但是这个值是一个浮动的值不准确
//seed用于指定随机数生成器种子默认参数不传
valrdd5_1=sc.parallelize(1to10)
valsample=rdd.sample(false,0.5)
println(sample.collect().toBuffer)
//union:求并集
valrdd6=sc.parallelize(List(5,6,7,8))
valrdd7=sc.parallelize(List(1,2,5,6))
valrdd8=rdd6unionrdd7
println(rdd8.collect.toBuffer)
//intersection:求交集
valrdd9=rdd6intersectionrdd7
println(rdd9.collect.toBuffer)
//distinct:去重出重复
println(rdd8.distinct.collect.toBuffer)
//join相同的key会被合并
valrdd10_1=sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
valrdd10_2=sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
valrdd10_3=rdd10_1joinrdd10_2
println(rdd10_3.collect().toBuffer)
//左连接和右连接
//除基准值外是Option类型,因为可能存在空值所以使用Option
valrdd10_4=rdd10_1leftOuterJoinrdd10_2//以左边为基准没有是null
valrdd10_5=rdd10_1rightOuterJoinrdd10_2//以右边为基准没有是null
println(rdd10_4.collect().toList)
println(rdd10_5.collect().toBuffer)
valrdd11_1=sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
valrdd11_2=sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
//笛卡尔积
valrdd11_3=rdd11_1cartesianrdd11_2
println(rdd11_3.collect.toBuffer)
//根据传入的参数进行分组
valrdd11_5_1=rdd11_4.groupBy(_._1)
println(rdd11_5_1.collect().toList)
//按照相同key进行分组,并且可以制定分区
valrdd11_5_2=rdd11_4.groupByKey
println(rdd11_5_2.collect().toList)
//根据相同key进行分组[分组的话需要二元组]
//cogroup和groupBykey的区别
//cogroup不需要对数据先进行合并就以进行分组得到的结果是同一个key和不同数据集中的数据集合
//groupByKey是需要先进行合并然后在根据相同key进行分组
valrdd11_6:RDD[(String,(Iterable[Int],Iterable[Int]))]=rdd11_1cogrouprdd11_2
println(rdd11_6)
二、Action算子演示
valconf=newSparkConf().setAppName("Test").setMaster("local[*]")
valsc=newSparkContext(conf)
/*Action算子*/
//集合函数
valrdd1=sc.parallelize(List(2,1,3,6,5),2)
valrdd1_1=rdd1.reduce(_+_)
println(rdd1_1)
//以数组的形式返回数据集的所有元素
println(rdd1.collect().toBuffer)
//返回RDD的元素个数
println(rdd1.count())
//取出对应数量的值默认降序,若输入0会返回一个空数组
println(rdd1.top(3).toBuffer)
//顺序取出对应数量的值
println(rdd1.take(3).toBuffer)
//顺序取出对应数量的值默认生序
println(rdd1.takeOrdered(3).toBuffer)
//获取第一个值等价于take(1)
println(rdd1.first())
//将处理过后的数据写成文件(存储在HDFS或本地文件系统)
//rdd1.saveAsTextFile("dir/file1")
//统计key的个数并生成mapk是key名v是key的个数
valrdd2=sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2)
valrdd2_1:collection.Map[String,Long]=rdd2.countByKey()
println(rdd2_1)
//遍历数据
rdd1.foreach(x=>println(x))
/*其他算子*/
//统计value的个数但是会将集合中的一个元素看做是一个vluae
valvalue:collection.Map[(String,Int),Long]=rdd2.countByValue
println(value)
//filterByRange:对RDD中的元素进行过滤,返回指定范围内的数据
valrdd3=sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1)))
valrdd3_1:RDD[(String,Int)]=rdd3.filterByRange("c","e")//包括开始和结束的
println(rdd3_1.collect.toList)
//flatMapValues对参数进行扁平化操作,是value的值
valrdd3_2=sc.parallelize(List(("a","12"),("b","34")))
println(rdd3_2.flatMapValues(_.split("")).collect.toList)
//foreachPartition循环的是分区数据
//foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储
valrdd4=sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
rdd4.foreachPartition(x=>println(x.reduce(_+_)))
//keyBy以传入的函数返回值作为key,RDD中的元素为value新的元组
valrdd5=sc.parallelize(List("dog","cat","pig","wolf","bee"),3)
valrdd5_1:RDD[(Int,String)]=rdd5.keyBy(_.length)
println(rdd5_1.collect.toList)
//keys获取所有的keyvalues获取所有的values
println(rdd5_1.keys.collect.toList)
println(rdd5_1.values.collect.toList)
//collectAsMap将需要的二元组转换成Map
valmap:collection.Map[String,Int]=rdd2.collectAsMap()
println(map)










网友评论