介数反应的节点/边在整个网络中的作用和影响力。
参考资料
论文 《A Faster Algorithm for Betweenness Centrality》
博客 https://blog.csdn.net/BetaRun/article/details/51168259
Python的networkx实现介数中心性的源码: https://github.com/networkx/networkx/blob/master/networkx/algorithms/centrality
1. 算法介绍
节点介数中心性:在所有最短路径中经过该节点的路径数目占最短路径总数的占比。
计算图中节点的介数中心性分为两种情况: 有权图上的介数中心性和无权图上的介数中心性。两者的区别在于求最短路径时使用的方法不同,对于无权图采用BFS(宽度优先遍历)求最短路径,对于有权图采用Dijkstra算法求最短路径。
2. 介数中心性公式
节点介数中心性的计算公式如下:
其中
: 经过节点v的s到t的最短路径条数
: 节点s到节点t的所有最短路径条数
为方便计算,将每对顶点的介数计算定义为:
所以上面的公式可以用代替,即
3. 求解思路
求节点v的介数中心性,即计算 ,需要知道节点 v 是否在 s 到 t 的路径上。
(1)求节点 v 是否在 s 到 t 的最短路径上,采用下面公式判断( 表示两点之间的最短路径长度):
所以有下面公式:
(2)根据上面公式可得:
节点 s 到节点 t 的经过 w 的最短路径条数为 ,在图中节点 v 是 w 的前置节点,所以 st 之间经过节点 v 和 w 的最短路径条数计算公式为:
下面分为两种情况:分别是 和
(一) 时
(二) 时,不存在
(3)所以将上面两种情况加起来,得到经过 v 的 s 到所有顶点的最短路径数占 s 到所有顶点的最短路径数的比值。
其中 即 v 是 s 到 w 路径中 w 的前驱节点。
(4)根据上面的求的公式,下面给出论文中的求解有向无权图时的算法流程,如下所示。

对于无权图的代码实现基本上跟着上面流程来就OK了。
有权图的介数中心性计算需要将求解最短路径的方法改成采用Dijkstra方法,即改动第一个while循环内的代码。
4. 实现代码
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
import scala.collection.mutable
import scala.collection.mutable.{HashMap, ListBuffer, Queue, Stack}
/**
* User: nicole
* Time: 2018/12/5 17:48
*/
/**
* 在Graphx上实现求解点介数
* 时间复杂度为O(n3)
* 空间复杂度为O(n2)
* */
object Betweenness {
def main(args: Array[String]): Unit ={
val sc = SparkSession
.builder()
.master("local")
.getOrCreate()
.sparkContext
val graph = loadInitGraph(sc, "src/main/scala/person.txt", " ", -1)
val result = run(sc, graph, 10, false)
result.vertices.collect().foreach(x => println("vid: " + x._1 + " BC: " + x._2))
println()
}
/**
* betweennessCentrality算法入口
*
* @param graph 初始图
* @param k pregel中最大迭代次数
* @param isWeighted 是否是有权图, 默认是false
* true 表示该图是有权图
* false表示该图是无权图
*
* @return Graph[Double, Double] 返回结果图,顶点属性Double为该顶点对应的介数中心性的值
*
* */
def run(sc: SparkContext, graph: Graph[None.type, Double], k: Int, isWeighted: Boolean = false): Graph[Double, Double] ={
val initBCgraph = createBetweenGraph(graph, k)
//有权图计算方法
if(isWeighted){
val vertexBCGraph = initBCgraph.mapVertices((id, attr) => {
val a = betweennessCentralityForWeightedGraph(id, attr)
(id, a)
})
val BCGraph = aggregateBetweennessScores(vertexBCGraph)
BCGraph
}
//无权图计算方法
else {
val vertexBCGraph = initBCgraph.mapVertices((id, attr) => {
(id, betweennessCentralityForUnweightedGraph(id, attr))
})
val BCGraph = aggregateBetweennessScores(vertexBCGraph)
BCGraph
}
}
/**
* betweennessCentralityForUnweightedGraph
*
* 对无权图计算顶点的介数中心性
* 对每个顶点vid计算从该节点出发时其他节点的介数中心性,返回对于顶点vid而言图中节点所计算出来的介数中心性
*
* 对图中所有节点都会计算一次全局的介数中心性,最后进行汇总
*
* @param vid:顶点id
* @param vAttr:顶点对应的属性信息
*
*@return List((Vid, betweennessValue))
*
* */
def betweennessCentralityForUnweightedGraph(vid: VertexId, vAttr: VertexProperty): List[(VertexId, Double)] = {
//无权图的计算方法
println("enter betweennessCentrality for vertex: " + vid)
//对图中每个顶点做如下操作
val S = Stack[VertexId]() //每次访问过的节点入栈
val P = new HashMap[VertexId, ListBuffer[VertexId]]() //存储源顶点到某个顶点中间经过哪些顶点
//如[5,[2,3]],表示源顶点到顶点5的最短路径会经过顶点2,3
val Q = Queue[VertexId]() //BFS遍历时将顶点入队列
val dist = new HashMap[VertexId, Double]()
val sigma = new HashMap[VertexId, Double]()
val delta = new HashMap[VertexId, Double]()
val neighborMap = getNeighborMap(vAttr.vlist, vAttr.elist)
val medBC = new ListBuffer[(VertexId, Double)]()
for(vertex <- vAttr.vlist){
dist.put(vertex, -1)
sigma.put(vertex, 0.0)
delta.put(vertex, 0.0)
P.put(vertex, ListBuffer[VertexId]())
}
//对于当前节点,有特殊对待
dist(vid) = 0.0
sigma(vid) = 1.0
Q.enqueue(vid)
while(Q.nonEmpty){
val v = Q.dequeue()
S.push(v)
for(w <- neighborMap(v)){
if(dist(w) < 0){ //节点w未被访问过
Q.enqueue(w)
dist(w) = dist(v) + 1
}
if(dist(w) == dist(v) + 1){
sigma(w) += sigma(v)
P(w).+= (v)
}
}
}
while(S.nonEmpty){
val w = S.pop()
for(v <- P(w)){
delta(v) += sigma(v) / sigma(w) * (1 + delta(w))
}
if(w != vid)
medBC.append((w, delta(w) / 2)) //一条边会被两个节点各自计算一次,所以需要对求出来的值除以2
}
medBC.toList
}
/**
* betweennessCentralityForWeightedGraph
*
* 有权图求介数中心性
*
* 与无权图求介数中心性的区别在于“存储邻居节点信息”的数据结构不同
* 有权图不是用队列,而是采用scala的优先级队列PriorityQueue,即最小堆
* 利用最小堆维护顶点的邻居节点以及与邻居节点的边权重元组:(vw_dist, v, w)其中v是w的前驱节点,即w是v的往深处走的邻居节点
* 当遍历完顶点v的所有邻居节点后,需要从中选择一个最近的邻居继续进行深度遍历,所以让最小堆根据wv_dist降序排列,
* 每次pop出来的即是最小边对应的顶点信息,也就是每次选出来的邻居节点都是距离最近的邻居。
*
* @param vid:顶点id
* @param vAttr:顶点对应的属性信息
*
*@return List((Vid, betweennessValue))
* */
def betweennessCentralityForWeightedGraph(vid: VertexId, vAttr: VertexProperty): List[(VertexId, Double)] = {
println("enter betweennessCentralityForWeightedGraph function")
//对图中每个顶点做如下操作
val S = Stack[VertexId]() //每次访问过的节点入栈
val P = new HashMap[VertexId, ListBuffer[VertexId]]() //存储源顶点到某个顶点中间经过哪些顶点
//如[5,[2,3]],表示源顶点到顶点5的最短路径会经过顶点2,3
//下面定义一个优先级队列,即最小堆,根据第一个元素进行倒排
val ord = Ordering.by[(Double, VertexId, VertexId), Double](_._1).reverse
val Q = new mutable.PriorityQueue[(Double, VertexId, VertexId)]()(ord) //遍历时将顶点及对应的路径长度入队列
val dist = new HashMap[VertexId, Double]()
val sigma = new HashMap[VertexId, Double]()
val delta = new HashMap[VertexId, Double]()
val neighborMap = getNeighborMap(vAttr.vlist, vAttr.elist)
val medBC = new ListBuffer[(VertexId, Double)]()
for(vertex <- vAttr.vlist){
dist.put(vertex, -1)
sigma.put(vertex, 0.0)
delta.put(vertex, 0.0)
P.put(vertex, ListBuffer[VertexId]())
}
//对于当前节点,有特殊对待
sigma(vid) = 1.0
val seen = new HashMap[VertexId, Double]()
seen(vid) = 0
Q.enqueue((0.0, vid, vid))
//获取两个相邻节点之间的距离
def getDist(v: VertexId, w: VertexId) = {
vAttr.elist
.filter(e => (e._1 == v && e._2 == w) || (e._2 == v && e._1 == w))
.map(x => x._3)
.reduce(_.min(_))
}
while(Q.nonEmpty){
val (d, pred, v) = Q.dequeue()
if(dist(v) > 0){ //节点v已经访问过了
null
}
else{
sigma(v) += sigma(pred)
S.push(v)
dist(v) = d
for(w <- neighborMap(v)){
val vw_dist = d + getDist(v, w)
if(dist(w) < 0 && (!seen.contains(w) || vw_dist < seen(w))){
seen(w) = vw_dist
Q.enqueue((vw_dist, v, w))
sigma(w) = 0.0
P(w) = ListBuffer[VertexId](v)
}
else if(vw_dist == seen(w)){
sigma(w) += sigma(v)
P(w).+= (v)
}
}
}
}
while(S.nonEmpty){
val w = S.pop()
for(v <- P(w)){
delta(v) += sigma(v) / sigma(w) * (1 + delta(w))
}
if(w != vid)
medBC.append((w, delta(w) / 2))
}
medBC.toList
}
/**
* 为每个顶点收集其邻居节点信息
* 尝试过用收集邻居节点的api,但在计算介数中心性内部又需要每个节点都维护所有节点信息和边信息,所以采用对每个节点根据边来计算邻居节点的方式
*
* @param vlist, elist 所有顶点信息和所有边信息
* @return [vid, [ 邻居id, 邻居id ...] ]
* */
def getNeighborMap(vlist: List[VertexId], elist: List[(VertexId, VertexId, Double)]): HashMap[VertexId, List[VertexId]] = {
val neighborList = new HashMap[VertexId, List[VertexId]]()
vlist.map(v => {
val nlist = (elist.filter(e => (e._1 == v || e._2 == v))).map(e => {
if(v == e._1) e._2
else e._1
})
neighborList.+= ((v, nlist.distinct))
})
neighborList
}
/**
* 根据原始数据构建初始图
*
* @param sc
* @param path 原始数据所在的hdfs路径
* @param separator 数据不同字段间的分隔符
* @param weightCol 用于标识哪一列作为权重列,给出列号,从0开始。
* 对权重列的限制: 权重列为-1时,表示没有权重列,即该图是无权图,默认设权重为1.0
* 为非-1的整数时,表示图数据中的第weightCol列为权重列
* 要求:其值小于数据中列的数目,且该权重列对应的数据必须是double数值
*
* @return Graph
* */
def loadInitGraph(sc:SparkContext, path:String, separator: String, weightCol: Int):Graph[None.type,Double] = {
val data = sc.textFile(path)
val edges = data.map(line => {
val items = line.split(separator)
require(items.length > weightCol,
"权重列超过了图数据的字段数,图数据字段数目为 " + items.length + ", 选择的权重列为 " + weightCol)
var weightValue = 0.0
if(weightCol == -1) {
weightValue = 1.0
}
else {
require(isNumic(items(weightCol)), "权重列必须为double数值")
weightValue = items(weightCol).toDouble
}
Edge(items(0).toLong, items(1).toLong, weightValue)
})
Graph.fromEdges(edges, None)
}
/**
* 工具方法,验证权重列中的值可以转为double
* */
def isNumic(str: String): Boolean = {
var result = true
for(s <- str.replaceAll(".", "")){
if(!s.isDigit)
result = false
}
result
}
/**
* 构建BetweennessCentrality图,图中顶点属性维护了图中所有顶点id的列表和所有边(srcId, dstId, attr)的列表
*
* @param initG 原始数据构造的图
* @param k 最大迭代次数
* @return Graph
* */
def createBetweenGraph(initG:Graph[None.type , Double], k: Int):Graph[VertexProperty,Double] = {
val betweenG = initG
.mapTriplets[Double]({x:EdgeTriplet[None.type, Double] => x.attr})
.mapVertices((id, attr) => new VertexProperty)
.cache
//准备进入pregel前的初始化消息、vertexProgram方法、 sendMessage方法、mergeMessage方法
val initMessage = (List[VertexId](), List[(VertexId, VertexId, Double)]())
//将发送过来的邻居节点信息以及当前点与邻居点的边,更新到当前点的属性中
def vertexProgram(id: VertexId, attr: VertexProperty, msgSum: (List[VertexId], List[(VertexId, VertexId, Double)])): VertexProperty ={
val newAttr = new VertexProperty()
newAttr.CB = attr.CB
newAttr.vlist = (msgSum._1 ++ attr.vlist).distinct
newAttr.elist = (msgSum._2 ++ attr.elist).distinct
newAttr
}
//向邻居节点发送自身节点的id和自身与邻居点的边
def sendMessage(edge: EdgeTriplet[VertexProperty, Double]): Iterator[(VertexId, (List[VertexId], List[(VertexId, VertexId, Double)]))] = Iterator(
(edge.dstId, (edge.srcId +: edge.srcAttr.vlist, (edge.srcId, edge.dstId, edge.attr) +: edge.srcAttr.elist)),
(edge.srcId, (edge.dstId +: edge.dstAttr.vlist, (edge.srcId, edge.dstId, edge.attr) +: edge.dstAttr.elist))
)
//合并接受到的多条消息
def mergeMessage(a: (List[VertexId], List[(VertexId, VertexId, Double)]), b: (List[VertexId], List[(VertexId, VertexId, Double)])): (List[VertexId], List[(VertexId, VertexId, Double)]) ={
((a._1 ++ b._1).distinct, (a._2 ++ b._2).distinct)}
Pregel(betweenG, initMessage, k, EdgeDirection.Either)(vertexProgram, sendMessage, mergeMessage)
}
/**
* 将每个节点分别计算出来的BC值进行统计
* */
def aggregateBetweennessScores(BCgraph: Graph[(VertexId, List[(VertexId, Double)]), Double]): Graph[Double, Double] ={
//将图中顶点属性所维护的listBC信息单独提取出来
val BCaggregate = BCgraph.vertices.flatMap{case(v, (id, listBC)) => {
listBC.map{case(w, bc) => (w, bc)}
}}
//对BCaggregate的信息 (w, bc)根据顶点id做汇总
val vertexBC = BCaggregate.reduceByKey(_+_)
val resultG = BCgraph.outerJoinVertices(vertexBC)((vid, oldAttr, vBC) => {
vBC.getOrElse(0.0)
})
resultG
}
}
/**
* 定义顶点的属性类
* CB: 定义初始的betweennessCentrality
* vlist: 每个顶点需维护图中所有的顶点信息
* elist: 每个顶点需维护图中所有的边信息
*
* 维护所有边信息是为了在计算介数中心性的时候可以从每个顶点依次根据邻居节点走下去(空间复杂度很高,O(n2))
* */
class VertexProperty() extends Serializable {
var CB = 0.0
var vlist = List[VertexId]()
var elist = List[(VertexId, VertexId, Double)]()
}
5. 结果示例
输入数据:前两个字段表示顶点id,第三个字段表示边的权重
1 5 1
1 2 3
1 4 5
2 3 2
2 5 4
3 4 1
4 5 2
测试无权图:
vid: 4 BC: 1.0
vid: 1 BC: 0.3333333333333333
vid: 3 BC: 0.3333333333333333
vid: 5 BC: 0.3333333333333333
vid: 2 BC: 1.0
测试有权图
vid: 4 BC: 2.0
vid: 1 BC: 0.5
vid: 3 BC: 1.0
vid: 5 BC: 2.0
vid: 2 BC: 0.0
网友评论