引子:笔者有一段时间学习使用 spark 图算法实现 One ID 的工作,看到一篇文章打算翻译,今天得空可以还债了。下面便是翻译正文。原文地址https://mapr.com/blog/how-get-started-using-apache-spark-graphx-scala/
这篇博客将帮助你在 MapR 沙箱环境开始学习Scala语言实现的Apache Spark GraphX的使用方法。GraphX 是图并行计算的 Apache Spark 组件,基于图理论的数学分支构建。它是在 Spark 核心上的分布式图计算处理框架。
图计算的概念简介
图是用于表示对象之间模型关系的数学结构。图由顶点和连接顶点的边构成。顶点是对象,而边是对象之间的关系。

有向图是顶点之间的边是有方向的。有向图的例子如 Twitter 上的关注者。用户 Bob 关注了用户 Carol ,而 Carol 并没有关注 Bob。

正则图是每个顶点都有相同数量的边。正则图的例子就是 Facebook 的朋友关系。如果 Bob 是 Carol 的朋友,那么 Carol 也是 Bob 的朋友。
GraphX 属性图
GraphX 通过弹性分布式属性图扩展了 Sprak RDD。
这种属性图是一种有向多重图,它有多条平行的边。每个边和顶点都有用户定义的属性。平行的边允许相同顶点有多种关系。


软件
本教程将运行在 MapR 沙箱中,它会包含 Spark 。
你可以下载代码和数据来运行这些例子。链接:https://github.com/caroljmcdonald/sparkgraphxexample
启动 spark-shell 命令后,这篇博客的例子都会运行在 spark shell 下
你也可以运行这些代码作为一个独立应用,更多介绍在Getting Started with Spark on MapR Sandbox
启动 Spark 交互式壳程序
登录到 MapR 沙箱,如Getting Started with Spark on MapR Sandbox介绍,使用用户 user01 ,密码是 mapr 。启动 spark shell 使用如下命令
$ spark-shell
定义顶点
首先我们将引入 GraphX 包
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
我们定义机场为顶点。顶点有 id 和 相关属性。每个顶点的构成如下
顶点 id -> Id [Long]
顶点属性 -> name[String]
机场顶点表格

我们使用上面的属性定义一个 RDD 来表示顶点
val vertices=Array((1L, ("SFO")),(2L, ("ORD")),(3L,("DFW")))
val vRDD= sc.parallelize(vertices)
vRDD.take(1)
val nowhere = “nowhere"
定义边
边是机场之间的路线。每条边必须有一个起点,一个目的地,并且可以有属性。在我们的例子里,边的构成如下
边起点 id -> src [Long]
边终点 id -> dest [Long]
边属性距离 -> distance [Long]
路线的边表

我们使用上面用于描述边的属性定义一个RDD。边的RDD数据形式如 [src id, dest id, distance]。
val edges = Array(Edge(1L,2L,1800),Edge(2L,3L,800),Edge(3L,1L,1400))
val eRDD= sc.parallelize(edges)
eRDD.take(2)
创建属性图
想要创建一个图,你需要有 Vertex RDD, Edge RDD 和 一个默认顶点。
创建属性图名为 graph
val graph = Graph(vRDD,eRDD, nowhere)
graph.vertices.collect.foreach(println)
graph.edges.collect.foreach(println)
1. 这有多少个飞机场?
val numairports = graph.numVertices
2. 这有多少路线?
val numroutes = graph.numEdges
3. 哪些线路大于 1000 英里?
graph.edges.filter { case Edge(src, dst, prop) => prop > 1000 }.collect.foreach(println)
4. 边三元组类继承自 Edge 类通过增加 srcAttr 和 dstAttr 成员,各自包含了源和目的属性。
graph.triplets.take(3).foreach(println)
5. 排序并打印最长距离路线
graph.triplets.sortBy(_.attr, ascending = false).map(triplet => “Dsitance “ + triplet.attr.toString + “ from “ + triplet.srcAttr + “ to “ + triplet.dstAttr + “.”).collect.foreach(println)
使用 GraphX 分析真正的航班数据
场景
我们的数据来自http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time。我们使用 2015 年 1 月的航班信息。对于每一趟航班,我们有如下的信息。

在这个场景,我们将会以航班作为顶点,路线作为边。我们兴趣点在可视化航班和路线,并且想要看到将要起飞和到达的数量数据。
你可以从如下链接下载代码和数据去运行这些例子。
https://github.com/caroljmcdonald/sparkgraphxexample
登录 MapR 沙箱,可参考 Getting Started with Spark on MapR Sandbox,使用用户id user01,密码 mapr。使用 scp 拷贝样例数据文件 rita2014jan.csv 到你的沙箱 home 文件夹 /user/user01
启动 Spark shell
$ spark-shell
定义顶点
首先我们将引入 GraphX 软件包
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
下面我们使用 Scala case class 定义对应 csv 数据文件的航班模式
case class Flight(dofM:String, dofW:String, carrier:String, tailnum:String, flnum:Int, org_id:Long, origin:String, dest_id:Long, dest:String, crsdeptime:Double, deptime:Double, depdelaymins:Double, crsarrtime:Double, arrtime:Double, arrdelay:Double,crselapsedtime:Double,dist:Int)
下面的函数从数据文件中解释每一行数据到飞行类中。
def parseFlight(str: String): Flight = {
val line = str.split(",")
Flight(line(0), line(1), line(2), line(3), line(4).toInt, line(5).toLong, line(6), line(7).toLong, line(8), line(9).toDouble, line(10).toDouble, line(11).toDouble, line(12).toDouble, line(13).toDouble, line(14).toDouble, line(15).toDouble, line(16).toInt)
}
下面我们将载入 csv 文件到弹性分布式数据集(RDD)。RDD 有 transformations 和 actions 两种操作, first() 操作会返回 RDD 的第一个元素
val textRDD = sc.textFile("/user/user01/data/rita2014jan.csv”)
val flightsRDD = textRDD.map(parseFlight).cache()
我们定义飞机场为顶点。顶点具有属性,每个顶点的属性如下:
Airport name [String]
飞机场顶点表

我们使用上面的属性定义一个 RDD 用来表示顶点
val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct
airports.take(1)
val nowhere = “nowwhere”
val airportMap = airports.map { case ((org_id),name) => (org_id -> name)}.collect.toList.toMap
定义边
边是机场之间的路线。每条边必须有一个起点,一个目的地,并且可以有属性。在我们的例子里,边的构成如下
边起点 id -> src [Long]
边终点 id -> dest [Long]
边属性距离 -> distance [Long]
路线的边表

我们使用上面用于描述边的属性定义一个RDD。边的RDD数据形式如 [src id, dest id, distance]。
val routes = flightsRDD.map(flight => ((flight.org_id, flight.dest_id), flight.dist)).distinctdistinct
routes.take(2)
val edges = routes.map { case((org_id, dest_id),distance) => Edge(org_id.toLong, dest_id.toLong, distance)}
edges.take(1)
创建属性图
想要创建一个图,你需要有 Vertex RDD, Edge RDD 和 一个默认顶点。
创建属性图名为 graph
val graph = Graph(airports, edges, nowhere)
graph.vertices.take(2)
graph.edges.take(2)
6. 有多少个飞机场?
val numairports = graph.numVertices
7. 有多少路线?
val numroutes = graph.numEdges
8. 有多少路线距离大于 1000 英里
graph.edges.filter { case ( Edge(org_id, dest_id, distance) ) => distance > 1000 }.take(3)
9. 边三元组类继承自 Edge 类通过增加 srcAttr 和 dstAttr 成员,各自包含了源和目的属性。
graph.triplets.take(3).foreach(println)
10. 排序并打印最长距离路线
graph.triplets.sortBy(_.attr, ascending = false).map(triplet => “Dsitance “ + triplet.attr.toString + “ from “ + triplet.srcAttr + “ to “ + triplet.dstAttr + “.”).take(10).foreach(println)
11. 计算最高度的顶点
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b}
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)airportMap(10397)
12. 哪个航班收入最高?
val maxIncoming = graph.inDegrees.collect.sortWith(_._2 > _._2).map(x => (airportMap(x._1), x._2)).take(3)
maxIncoming.foreach(println)
val maxout = graph.outDegrees.join(airports).sortBy(_._2._1, ascending = false).take(3)
maxout.foreach(println)
PageRank
另一个 GraphX 运算符是PageRank,它基于谷歌的 PageRank 算法。
PageRank 衡量每个顶点在图中的重要性,它是通过判断哪个顶点有最多的边。在我们的例子里,我们使用 PageRank 去决定哪个航班是最重要的,衡量的方式是计算哪个航班与其他航班有最多的连接。
我们需要指定一个容忍误差,用来衡量收敛
13. 使用 PageRank 判断哪个是最重要的航班?
val ranks = graph.pageRank(0.1).vertices
val temp = ranks.join(airports)
temp.take(1)
val temp2 = temp.sortBy(_._2._1, false)
temp2.take(2)
val impAirports = temp2.map(_._2._2)
impAirports.take(4)
Pregel
很多重要的图形算法是迭代算法,因为顶点的属性依赖于它邻居们的属性,而邻居的属性又依赖它们邻居的属性。 Pregel 是一个迭代图处理模型,由谷歌开发,它使用顶点之间传递的消息进行一系列的迭代。GraphX 实现了类似 Pregel 块同步消息传递 API。
使用 GraphX 实现的 Pregel ,顶点只能发送消息给相邻的顶点。
Pregel 运算符会执行一些列的超级步骤。在每一个超级步骤:
· 顶点接收前面一个超级步骤的入站消息和
· 计算每个顶点属性的新值
· 发送消息给下一个超级步骤的相邻顶点
当没有信息保留时,Pregel 操作符会结束迭代,并返回最终的图。

下面的代码使用 Pregel 用下列公式计算最便宜的机票。
50 + distance / 20

想要学习更多?
MapR announces Free Complete Apache Spark Training and Developer Certification
Get Certified on Spark with MapR Spark Certification
MapR Certified Spark Developer Study Guide
Programming Guide -Apache Spark Developer Cheat Sheet
网友评论