9.spark core之共享变量

作者: java大数据编程 | 来源:发表于2018-08-27 15:59 被阅读6次

简介

  spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。

  • 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
  • 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。

  spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。

  • 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
  • 累加器用于在驱动器中对数据结果进行聚合。

广播变量

原理

广播变量.png
  • 广播变量只能在Driver端定义,不能在Executor端定义。
  • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
  • 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

用法

  • 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
  • 通过value属性访问该对象的值
  • 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)

实例

  查询每个国家的呼号个数

python

# 将呼号前缀(国家代码)作为广播变量
signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

scala

// 将呼号前缀(国家代码)作为广播变量
val signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

val countryContactCounts = contactCounts.map{case (sign, count) => {
    val country = lookupInArray(sign, signPrefixes.value)
    (country, count)
    }}.reduceByKey((x, y) => x+y)
    
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

java

// 将呼号前缀(国家代码)作为广播变量
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());

JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
    public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
        String sign = callSignCount._1();
        String country = lookupCountry(sign, signPrefixes.value());
        return new Tuple2(country, callSignCount._2()); 
    }
}).reduceByKey(new SumInts());

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

累加器

原理

累加器.png
  • 累加器在Driver端定义赋初始值。
  • 累加器只能在Driver端读取最后的值,在Excutor端更新。

用法

  • 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
  • Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
  • 驱动器程序可以调用累加器的value属性来访问累加器的值

实例

  累加空行

python

file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines # 访问全局变量
    if (line == ""):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value

scala

val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0

val callSigns = file.flatMap(line => {
    if (line == "") {
        blankLines += 1 //累加器加1
    }
    line.split(" ")
})

callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)

java

JavaRDD<String> rdd = sc.textFile(args[1]);

final Accumulator<Integer> blankLines = sc.accumulator(0);

JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String line) {
        if ("".equals(line)) {
            blankLines.add(1);
        }
        return Arrays.asList(line.split(" "));
    }
});

callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());

相关文章

  • 9.spark core之共享变量

    简介   spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。 ...

  • volatile

    作用 只能保证共享变量的可见性,不能保证共享变量操作的原子性。 原理 线程: A B C ,共享变量:s 例:三...

  • Spark共享变量

    共享变量分类 共享变量官网解释 Normally, when a function passed to a Spa...

  • 共享变量

    共享变量 Spark一个非常重要的特性就是共享变量。 默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那...

  • ThreadLocal模式

    多个线程同时读写同一个共享变量存在并发问题。其实可以突破共享变量,没有共享变量就不会有并发问题。没有共享,就没有伤...

  • 共享变量:广播变量

    一、使用场景如果我们要在分布式计算里面分发大对象(如:字典,集合,黑白名单等),由Driver端进行分发。如果这个...

  • Spark-broadcast

    参见Spark相关--共享变量-广播变量-broadcast

  • JMM之Synchronized&Volatile

    内存可见性 可见性:如果一个线程对共享变量值的修改,能够及时的被其他线程看到,那么这个共享变量就是可见的 共享变量...

  • Nginx内置变量

    nginx内置变量内置变量存放在 ngx_http_core_module 模块中,变量的命名方式和apache...

  • iOS开发-Core Animation (核心动画)

    iOS开发之Core Animation (核心动画) Core Animation简介 Core Animati...

网友评论

    本文标题:9.spark core之共享变量

    本文链接:https://www.haomeiwen.com/subject/iyhtwftx.html