美文网首页
2020-12.1-Spark-9(Spark-Core)

2020-12.1-Spark-9(Spark-Core)

作者: 冰菓_ | 来源:发表于2020-12-07 08:17 被阅读0次

1.RDD血缘关系

image.png
image.png
image.png
image.png

2.持久化:cache persist checkpoint(检查点)

image.png
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object Test5 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("").setMaster("local[*]"))
    sc.setCheckpointDir("src/ff")
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 1, 2, 3, 4))
    val mardd = rdd.map(data => {
      //每个collect执行了一次,重复操作
      println("*******")
      (data, 1)
    })
    //考虑持久化
    //只能持戒化到内存
    // mardd.cache()
    //可持久化到磁盘
    // mardd.persist(StorageLevel.DISK_ONLY)
    mardd.checkpoint()
    mardd.groupByKey().collect.foreach(println)
    mardd.reduceByKey(_ + _).collect.foreach(println)
    sc.stop()
  }
}

3.自定义分区器(模式匹配)

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

object Test6 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test6").setMaster("local[*]"))
    val rdd = sc.makeRDD(List("scala", "spark", "scala", "flink"), 2)
    val myrdd = rdd.map(data => (data, 1)).partitionBy(new MyPartitioner)
    myrdd.saveAsTextFile("src/bbb")
    sc.stop()

  }
}

class MyPartitioner extends Partitioner {
  override def numPartitions: Int = 2

  override def getPartition(key: Any): Int = {
    key match {
      case "scala" => 1
      case _ => 0
    }
  }
}

4.累加器

5.广播变量

相关文章

网友评论

      本文标题:2020-12.1-Spark-9(Spark-Core)

      本文链接:https://www.haomeiwen.com/subject/gimcwktx.html