Spark 异步Action

作者: Codlife | 来源:发表于2016-09-18 14:46 被阅读0次

What if we want to execute 2 actions concurrently on different RDD’s, Spark actions are always synchronous. Like if we perform two actions one after other they always execute in sequentially like one after other.
Let see example

val rdd = sc.parallelize(List(32, 34, 2, 3, 4, 54, 3), 4)
rdd.collect().map{ x => println("Items in the lists:" + x)}
val rddCount = sc.parallelize(List(434, 3, 2, 43, 45, 3, 2), 4)
println("Number of items in the list" + rddCount.count())

In the above exmaple 2 actions are perform one after other collect and count, both are execute synchronous. So count will always execute after collect will finish. The out of the above code is as follows

Screenshot from 2015-10-21 12:36:04Screenshot from 2015-10-21 12:36:04
Now question is if we want to run spark jobs concurrently in async fashion.
So for above question answer is simple apache spark also provide a asyn action for concurrent execution of jobs, Few Asynchronous actions spark provide as follows
collectAsync() -> Returns a future for retrieving all elements of this RDD.countAsync() -> Returns a future for counting the number of elements in the RDD.foreachAsync(scala.Function1<T,scala.runtime.BoxedUnit> f) -> Applies a function f to all elements of this RDD.foreachPartitionAsync(scala.Function1<scala.collection.Iterator,scala.runtime.BoxedUnit> f) ->Applies a function f to each partition of this RDD.takeAsync(int num) -> Returns a future for retrieving the first num elements of the RDD.
Now let us see what happen when we use async actions.
val rdd = sc.parallelize(List(32, 34, 2, 3, 4, 54, 3), 4)
rdd.collectAsync().map{ x => x.map{x=> println("Items in the list:"+x)} }
val rddCount = sc.parallelize(List(434, 3, 2, 43, 45, 3, 2), 4)
rddCount.countAsync().map { x =>println("Number of items in the list: "+x) }

So output of the above code is as follows

Screenshot from 2015-10-21 13:23:27Screenshot from 2015-10-21 13:23:27
You can see in above output the result of the second job is come first because first job return future and execute second one but still have you noticed that jobs are execute one after other that’s means a job use all resources of cluster so another job will delayed.
So for take full advantage of Asynchronous jobs we need to configure job scheduler.
Job Scheduling
By default spark scheduler run spark jobs in FIFO (First In First Out) fashion. In FIFO scheduler the priority is given to the first job and then second and so on. If the jobs is not using whole cluster then second job is also run parallel but if first job is too big then second job will wait soo long even it take too less to execute. So for solution spark provide fair scheduler, fair scheduler jobs will execute in “round robin” fashion.
To configure job scheduler we need to set configuration for it as follows
val conf = new SparkConf().setAppName("spark_auth").setMaster("local[*]").set("spark.scheduler.mode", "FAIR")
After configure FAIR scheduling you can see both the jobs are running concurrently and share resources of the spark cluster.
So after this the out of the above code is as follows
Screenshot from 2015-10-21 13:35:53Screenshot from 2015-10-21 13:35:53
You can see in above result both jobs are running concurrently. The result of both the actions are not wait for each other.
For above code you can checkout: https://github.com/knoldus/spark-scala-async

相关文章

  • Spark 异步Action

    What if we want to execute 2 actions concurrently on diff...

  • 高级

    异步 Action 通过 redux-thunk ,在 action 中 dispatch action ,可以是...

  • Vuex之Action

    Action为mutation服务,因为Action可以包含任意异步操作。 分发Action Actions 支持...

  • 20160505

    try algorithm more hadoop in action and spark sql

  • nuxt开发部署指南

    1. vuex 不要在Mutation中做异步操作,需要做异步操作用action action调用通过dispat...

  • Vuex4.x(四)action的各种使用方式

    action 的用法 由于mutation不支持异步操作,所以vuex又提供了action,这个可以支持异步,在有...

  • Redux 的中间件和异步操作

    一、同步和异步 首先讲一下在redux之中,Action的同步和异步的概念。 异步:Action发出之后,过一段时...

  • Spark读写GBK文件

    Spark 读取GBK文件 Spark写GBK文件 参考: RDD行动Action操作(6)–saveAsHado...

  • spark-源码-action算子触发

    基于spark1.6 创建完SparkContext,然后执行Action算子 当RDD执行Action算子时(形...

  • 10-redux-thunk使用

    Time: 20200129 异步Action Creator需要用到同步Action Creator. 完整代码...

网友评论

    本文标题:Spark 异步Action

    本文链接:https://www.haomeiwen.com/subject/dffgettx.html