美文网首页
Spark内存管理

Spark内存管理

作者: 喵星人ZC | 来源:发表于2019-06-01 19:36 被阅读0次

首先Spark内存管理分两种类别:

  • execution 用于suffle join aggregation
    Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations
  • storage 用于缓存 RDD 数据和广播(Broadcast)数据
    cache

Note

  • 在Spark中execution 和 storage 共享一个统一 region (M),当没有execution内存使用时,storage可以获得所有的内存来进行缓存数据。
  • execution可以剔除storage,但是,只有在总存储内存使用量低于某个阈值时才会这样做
    -storage不能剔除execution,由于实现的复杂性

Spark内存模式MemoryMode分为两种,静态和统一。
Spark1.6开始默认使用的就是UnifiedMemoryManager(统一),之前是使用的StaticMemoryManager(静态)

一、内存分配之StaticMemoryManager

在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。


image.png
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
    if (useLegacyMemoryManager) { // true
        // 老的内存管理器
        new StaticMemoryManager(conf, numUsableCores)
    } else {  //false
        // 新的内存管理器
        UnifiedMemoryManager(conf, numUsableCores)  <==
    }
}

假设有10G内存,使用静态内存管理,execution可用内存为1.6G,storage可用内存为5.6G。这种内存管理模式execution和storage之间不能相互借内存,如果你的程序中没有shuffle Join 聚合的话,想把内存都给缓存使用只能通过调整百分比参数。

StaticMemoryManager.getMaxExecutionMemory(conf) {
      private def getMaxExecutionMemory(conf: SparkConf): Long = {
        val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

        if (systemMaxMemory < MIN_MEMORY_BYTES) {
          throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
            s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
            s"option or spark.driver.memory in Spark configuration.")
        }
        if (conf.contains("spark.executor.memory")) {
          val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
          if (executorMemory < MIN_MEMORY_BYTES) {
            throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
              s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
              s"--executor-memory option or spark.executor.memory in Spark configuration.")
          }
        }
        val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
        val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)

        // 10G * 0.2 * 0.8 = 1.6G
        (systemMaxMemory * memoryFraction * safetyFraction).toLong
      }

}

StaticMemoryManager.getMaxStorageMemory(conf) {
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
    // 10G * 0.6 * 0.9 = 5.4G
    (systemMaxMemory * memoryFraction * safetyFraction).toLong
}

公式:
可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction(0.6) * spark.storage.safetyFraction(0.9)
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction(0.2) * spark.shuffle.safetyFraction(0.8)

二、统一内存管理模式UnifiedMemoryManager

Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域


image.png
object UnifiedMemoryManager {

  // Set aside a fixed amount of memory for non-storage, non-execution purposes.
  // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
  // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
  // the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.
  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
      numCores = numCores)
  }

  /**
   * Return the total amount of memory shared between execution and storage, in bytes.
   */
  private def getMaxMemory(conf: SparkConf): Long = {
    val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    val reservedMemory = conf.getLong("spark.testing.reservedMemory",
      if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
    val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
    if (systemMemory < minSystemMemory) {
      throw new IllegalArgumentException(s"System memory $systemMemory must " +
        s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    // SPARK-12759 Check executor memory to fail fast if memory is insufficient
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < minSystemMemory) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$minSystemMemory. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
  // 10G - 300M
    val usableMemory = systemMemory - reservedMemory

    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
  // (10G - 300M) * 60%
    (usableMemory * memoryFraction).toLong
  }
}
//(10G - 300M) * 60% * 50%
onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

相关文章

网友评论

      本文标题:Spark内存管理

      本文链接:https://www.haomeiwen.com/subject/delatctx.html