美文网首页
共享变量:累加器

共享变量:累加器

作者: ryancao_b9b9 | 来源:发表于2020-05-01 21:51 被阅读0次

一、使用场景
上一篇提到了共享变量可以解决Task重复拷贝Driver端变量的问题,但广播变量在Task中只读不能修改。实际的应用场景会在Spark Streaming应用中记录某些事件的数量,广播变量无法满足需求,所以出现了累加器机制。

二、使用特性
1、累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加。
2、累加器不会改变Spark Lazy计算的特点(只会在Job触发的时候进行相关累加操作)。

三、使用方式(简单)
注意:系统自带Long和Double类型的累加器
简单示例如下:

val sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val accum = sc.longAccumulator("longAccum") //统计奇数的个数
    val sum = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).filter(n=>{
      if(n%2!=0) accum.add(1L) 
      n%2==0
    }).reduce(_+_)
    println("sum: "+sum)
    println("accum: "+accum.value)
    sc.stop()

统计结果

sum: 20
accum: 5

四、常见问题
1、少加

val accum = sc.longAccumulator("longAccum")
    val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
      accum.add(1L)
      n+1
    })
    println("accum: "+accum.value)

结果:

accum1:0

原因:
累加器不会改变spark的lazy的计算模型,即在打印的时候像map这样的transformation还没有真正的执行,从而累加器的值也就不会更新。

2、多加

val accum = sc.longAccumulator("longAccum")
    val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
      accum.add(1L)
      n+1
    })
    numberRDD.count
    println("accum1:"+accum.value)
    numberRDD.reduce(_+_)
    println("accum2: "+accum.value)

结果:

accum1:9
accum2: 18

原因:
虽然只在map里进行了累加器加1的操作,但是两次得到的累加器的值却不一样,这是由于count和reduce都是action类型的操作,触发了两次作业的提交,所以map算子实际上被执行了了两次,在reduce操作提交作业后累加器又完成了一轮计数,所以最终累加器的值为18。究其原因是因为count虽然促使numberRDD被计出来,但是由于没有对其进行缓存,所以下次再次需要使用numberRDD这个数据集是,还需要从并行化数据集的部分开始执行计算。解释到这里,这个问题的解决方法也就很清楚了,就是在count之前调用numberRDD的cache方法(或persist),这样在count后数据集就会被缓存下来,reduce操作就会读取缓存的数据集而无需从头开始计算了。
改进代码如下:

val accum = sc.longAccumulator("longAccum")
    val numberRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2).map(n=>{
      accum.add(1L)
      n+1
    })
    numberRDD.cache().count
    println("accum1:"+accum.value)
    numberRDD.reduce(_+_)
    println("accum2: "+accum.value)

五、自定义累加器
自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:CollectionAccumulator类,这个类允许以集合的形式收集spark应用执行过程中的一些信息。例如,我们可以用这个类收集Spark处理数据时的一些细节,当然,由于累加器的值最终要汇聚到driver端,为了避免 driver端的outofmemory问题,需要对收集的信息的规模要加以控制,不宜过大。
实现自定义类型累加器需要继承AccumulatorV2并至少覆写下例中出现的方法,下面这个累加器可以用于在程序运行过程中收集一些文本类信息,最终以Set[String]的形式返回。

import java.util
import org.apache.spark.util.AccumulatorV2

class LogAccumulator extends AccumulatorV2[String, java.util.Set[String]] {
  private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
 
  override def isZero: Boolean = {
    _logArray.isEmpty
  }
 
  override def reset(): Unit = {
    _logArray.clear()
  }
 
  override def add(v: String): Unit = {
    _logArray.add(v)
  }
 
  override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {
    other match {
      case o: LogAccumulator => _logArray.addAll(o.value)
    }
 
  }
 
  override def value: java.util.Set[String] = {
    java.util.Collections.unmodifiableSet(_logArray)
  }
 
  override def copy(): AccumulatorV2[String, util.Set[String]] = {
    val newAcc = new LogAccumulator()
    _logArray.synchronized{
      newAcc._logArray.addAll(_logArray)
    }
    newAcc
  }
}

测试类:

import java.util
 
import org.apache.spark.util.AccumulatorV2
 
class LogAccumulator extends AccumulatorV2[String, java.util.Set[String]] {
  private val _logArray: java.util.Set[String] = new java.util.HashSet[String]()
 
  override def isZero: Boolean = {
    _logArray.isEmpty
  }
 
  override def reset(): Unit = {
    _logArray.clear()
  }
 
  override def add(v: String): Unit = {
    _logArray.add(v)
  }
 
  override def merge(other: AccumulatorV2[String, java.util.Set[String]]): Unit = {
    other match {
      case o: LogAccumulator => _logArray.addAll(o.value)
    }
 
  }
 
  override def value: java.util.Set[String] = {
    java.util.Collections.unmodifiableSet(_logArray)
  }
 
  override def copy(): AccumulatorV2[String, util.Set[String]] = {
    val newAcc = new LogAccumulator()
    _logArray.synchronized{
      newAcc._logArray.addAll(_logArray)
    }
    newAcc
  }
}

本例中利用自定义的收集器收集过滤操作中被过滤掉的元素,当然这部分的元素的数据量不能太大。
运行结果如下:

sum; 32
7cd 4b 2a 

相关文章

  • spark之广播变量&累加器

    Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。累加器...

  • Spark快速入门(9) 高级话题:累加器变量

    本节我们会介绍一种在tasks之间共享可读写变量的方式,就是累加器变量。 累加器变量 累加器变量是在tasks之间...

  • Spark累加器(Accumulator)

    什么是累加器 累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)累加器用来把Exec...

  • 简述spark中共享变量的基本原理和用途

    共享变量包含两个,广播和累加器。累加器(accumulator)是spark中提供的一种分布式的变量机制,其原理类...

  • 共享变量:累加器

    一、使用场景上一篇提到了共享变量可以解决Task重复拷贝Driver端变量的问题,但广播变量在Task中只读不能修...

  • Spark—累加器

    spark累加器 累加器是一种共享变量,提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是...

  • Spark—广播变量

    广播变量 Spark有两种共享变量——累加器、广播变量。广播变量可以让程序高效地向所有工作节点发送一个较大的只读值...

  • spark broadcast variables and Ac

    1. Accumulator(累加器):分布式共享只写变量 求和示例:对rdd(两个分区)数据求和,driver端...

  • Spark 之广播变量

    1. Background Spark 中有两种共享变量,其中一个是累加器,另一个是广播变量。前者解决了 Spar...

  • Spark中的共享变量---广播变量和累加器

    一.广播变量和累加器的作用累加器(集群规模之间的大变量):做Spark的全局统计使用广播变量(集群规模间的大常量)...

网友评论

      本文标题:共享变量:累加器

      本文链接:https://www.haomeiwen.com/subject/ghdcghtx.html