美文网首页好程序员大数据
大数据教程:Transformation和Action算子演示

大数据教程:Transformation和Action算子演示

作者: ab6973df9221 | 来源:发表于2019-06-17 14:42 被阅读0次

  大数据教程:Transformation和Action算子演示

  一、Transformation算子演示

  valconf=newSparkConf().setAppName("Test").setMaster("local")

  valsc=newSparkContext(conf)

  //通过并行化生成rdd

  valrdd=sc.parallelize(List(5,6,4,7,3,8,2,9,10))

  //map:对rdd里面每一个元乘以2然后排序

  valrdd2:RDD[Int]=rdd.map(_*2)

  //collect以数组的形式返回数据集的所有元素(是Action算子)

  println(rdd2.collect().toBuffer)

  //filter:该RDD由经过func函数计算后返回值为true的输入元素组成

  valrdd3:RDD[Int]=rdd2.filter(_>10)

  println(rdd3.collect().toBuffer)

  valrdd4=sc.parallelize(Array("abc","bcd"))

  //flatMap:将rdd4中的元素进行切分后压平

  valrdd5:RDD[String]=rdd4.flatMap(_.split(""))

  println(rdd5.collect().toBuffer)

  //假如:List(List("a,b","bc"),List("ec","io"))

  //压平flatMap(_.flatMap(_.split("")))

  //sample随机抽样

  //withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样

  //fraction抽样比例例如30%即0.3但是这个值是一个浮动的值不准确

  //seed用于指定随机数生成器种子默认参数不传

  valrdd5_1=sc.parallelize(1to10)

  valsample=rdd.sample(false,0.5)

  println(sample.collect().toBuffer)

  //union:求并集

  valrdd6=sc.parallelize(List(5,6,7,8))

  valrdd7=sc.parallelize(List(1,2,5,6))

  valrdd8=rdd6unionrdd7

  println(rdd8.collect.toBuffer)

  //intersection:求交集

  valrdd9=rdd6intersectionrdd7

  println(rdd9.collect.toBuffer)

  //distinct:去重出重复

  println(rdd8.distinct.collect.toBuffer)

  //join相同的key会被合并

  valrdd10_1=sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))

  valrdd10_2=sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))

  valrdd10_3=rdd10_1joinrdd10_2

  println(rdd10_3.collect().toBuffer)

  //左连接和右连接

  //除基准值外是Option类型,因为可能存在空值所以使用Option

  valrdd10_4=rdd10_1leftOuterJoinrdd10_2//以左边为基准没有是null

  valrdd10_5=rdd10_1rightOuterJoinrdd10_2//以右边为基准没有是null

  println(rdd10_4.collect().toList)

  println(rdd10_5.collect().toBuffer)

  valrdd11_1=sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))

  valrdd11_2=sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))

  //笛卡尔积

  valrdd11_3=rdd11_1cartesianrdd11_2

  println(rdd11_3.collect.toBuffer)

  //根据传入的参数进行分组

  valrdd11_5_1=rdd11_4.groupBy(_._1)

  println(rdd11_5_1.collect().toList)

  //按照相同key进行分组,并且可以制定分区

  valrdd11_5_2=rdd11_4.groupByKey

  println(rdd11_5_2.collect().toList)

  //根据相同key进行分组[分组的话需要二元组]

  //cogroup和groupBykey的区别

  //cogroup不需要对数据先进行合并就以进行分组得到的结果是同一个key和不同数据集中的数据集合

  //groupByKey是需要先进行合并然后在根据相同key进行分组

  valrdd11_6:RDD[(String,(Iterable[Int],Iterable[Int]))]=rdd11_1cogrouprdd11_2

  println(rdd11_6)

  二、Action算子演示

  valconf=newSparkConf().setAppName("Test").setMaster("local[*]")

  valsc=newSparkContext(conf)

  /*Action算子*/

  //集合函数

  valrdd1=sc.parallelize(List(2,1,3,6,5),2)

  valrdd1_1=rdd1.reduce(_+_)

  println(rdd1_1)

  //以数组的形式返回数据集的所有元素

  println(rdd1.collect().toBuffer)

  //返回RDD的元素个数

  println(rdd1.count())

  //取出对应数量的值默认降序,若输入0会返回一个空数组

  println(rdd1.top(3).toBuffer)

  //顺序取出对应数量的值

  println(rdd1.take(3).toBuffer)

  //顺序取出对应数量的值默认生序

  println(rdd1.takeOrdered(3).toBuffer)

  //获取第一个值等价于take(1)

  println(rdd1.first())

  //将处理过后的数据写成文件(存储在HDFS或本地文件系统)

  //rdd1.saveAsTextFile("dir/file1")

  //统计key的个数并生成mapk是key名v是key的个数

  valrdd2=sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2)

  valrdd2_1:collection.Map[String,Long]=rdd2.countByKey()

  println(rdd2_1)

  //遍历数据

  rdd1.foreach(x=>println(x))

  /*其他算子*/

  //统计value的个数但是会将集合中的一个元素看做是一个vluae

  valvalue:collection.Map[(String,Int),Long]=rdd2.countByValue

  println(value)

  //filterByRange:对RDD中的元素进行过滤,返回指定范围内的数据

  valrdd3=sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1)))

  valrdd3_1:RDD[(String,Int)]=rdd3.filterByRange("c","e")//包括开始和结束的

  println(rdd3_1.collect.toList)

  //flatMapValues对参数进行扁平化操作,是value的值

  valrdd3_2=sc.parallelize(List(("a","12"),("b","34")))

  println(rdd3_2.flatMapValues(_.split("")).collect.toList)

  //foreachPartition循环的是分区数据

  //foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储

  valrdd4=sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)

  rdd4.foreachPartition(x=>println(x.reduce(_+_)))

  //keyBy以传入的函数返回值作为key,RDD中的元素为value新的元组

  valrdd5=sc.parallelize(List("dog","cat","pig","wolf","bee"),3)

  valrdd5_1:RDD[(Int,String)]=rdd5.keyBy(_.length)

  println(rdd5_1.collect.toList)

  //keys获取所有的keyvalues获取所有的values

  println(rdd5_1.keys.collect.toList)

  println(rdd5_1.values.collect.toList)

  //collectAsMap将需要的二元组转换成Map

  valmap:collection.Map[String,Int]=rdd2.collectAsMap()

  println(map)

相关文章

网友评论

    本文标题:大数据教程:Transformation和Action算子演示

    本文链接:https://www.haomeiwen.com/subject/kbjafctx.html