1. Pregel开发时, 图计算领域面对的问题
- 分布式图算法大部分是高度定制化的, 如果需要新的图算法, 就需要从头开始开发, 实际上分布式图算法自打有MPP就开始做了, 只不过通用的比较少.
- MapReduce可以解决部分图挖掘问题, 但是在MR的模型上直接跑Graph算法那, 性能和可用性都大打折扣, 达不到谷歌家的需求
- 单机版图算法倒是不错, 可惜neo4j这种东西面对谷歌的数据量就抓瞎.并没有什么卵用
- 一些理论上的分布式图计算引擎, 比如CGMGraph, Parallel BGraph没法做fault tolerance. 这和分布式系统假设所有点都可以挂, 所有网络都可以跑不通有矛盾. 并不能部署在工业界.
谷歌一气之下, 在Valiant’s Bulk Synchronous Parallel Model的技术上做出了Pregel. 从算法理论到工业实现用了20年, 和cherry lock有一拼.
Leslie G. Valiant, A Bridging Model for Parallel
Computation. Comm. ACM 33(8), 1990, 103–111
2. 抽象模型概述
2.1 Supserstep
Pregel计算由一系列的被称之为superstep的步骤组成, 它的输入是一个有向图, 所有的vertex必须是唯一的
在superstep的第N步, 会收取N-1步的信息, 计算, 更新状态, 把信息按照边向后发送, 等待第N+1步读取.
2.2 计算模型的状态机
superstep何时终止, 取决于所有节点的投票vote to halt的结果.
当所有的点都不再收取信息时, 就会终止.
寻找图中最大的点
在论文中介绍了一个非常简单的例子如上, 我们要想办法在一个图中找到value最大的vertex.
可以从图中看到, 在每个supersetp
- 每个点比保存一个max(local_value, message_value)
- 如果没有变化, 则vote to halt
- 如果有变化则把max value沿着边发送
- 重复以上步骤, 直到所有verte vote to halt(变灰)
这样我们就可以找到一个图里的最大值, 而且可以看到这个过程可以是一串的map reduce过程, 在Map中所有点接收信息并比较, 在Reduce中产生vetex与vetex之间的信息转移.
3 Pregel程序的执行过程
一下过程基于google自己的底层框架, 换到spark+hbase上一样说的通
- 首先启动一个
master点, 可以直接理解为spark里的driver, 用于管理整个程序的进度. 所有的woker需要把自己注册到这个master上. -
master负责把图划分为多个partition, 并指定每个worker需要负责的部分.worker需要对自己维护的这一部分graph执行用户写的computer()操作, 并维护这一部分图的状态. 每个worker都知道其它的任何一个worker维护的是哪部分graph, 这个问master就好了.
1.master开始分发用户的输入, 如果worker观察到用户的input与自己维护的graph有关, 则立即对这部分graph进行修改和操作, 否则就把数据送到它该去的地方. 分发过程结束后, 这个分布式的图已经和用户的input融合, 所有的vertex处于active状态 - 开始执行superstep, 回到vote to halt的状态到
master那里去 -
master观察到所有的vertex都halt了, 运行结束
4. Pregel下的图算法
4.1 PageRank
class PageRankVertex
: public Vertex<double, void, double> {
public:
// 假设一共有v个节点, 任何一个点初始的权重都是 1/v
virtual void Compute(MessageIterator* msgs) {
if (superstep() >= 1) {
double sum = 0;
for (; !msgs->Done(); msgs->Next())
sum += msgs->Value();
// 节点的权重调整如下, 引用它的网站越多, 它的权重越高, 引用它的网站约重要, 它的权重越高
*MutableValue() = 0.15 / NumVertices() + 0.85 * sum;
}
// 我们只进行30层运算
if (superstep() < 30) {
// 把这个vertex, 也就是网站的权重向所有它引用的网站进行广播
const int64 n = GetOutEdgeIterator().size();
SendMessageToAllNeighbors(GetValue() / n);
} else {
VoteToHalt();
}
}
};
4.2 Shortest Path
寻找source vertex和图中任何一个其它点之间的最短路径
一开始source vertex的距离是0, 其它任何一个点都是INF(无限), 然后source vertex开始广播
之后每个superstep, 每个接收从邻居那里来的message, 如果邻居记录的值加上边长比本地值小, 则修改记录. 否则就vote for halt.
重复以上步骤直到所有的点都不动了.
class ShortestPathVertex
:public Vertex<int, int, int> {
void Compute(MessageIterator* msgs) {
int mindist = IsSource(vertex_id()) ? 0 : INF;
for (; !msgs->Done(); msgs->Next())
mindist = min(mindist, msgs->Value());
if (mindist < GetValue()) {
*MutableValue() = mindist;
OutEdgeIterator iter = GetOutEdgeIterator();
for (; !iter.Done(); iter.Next())
SendMessageTo(iter.Target(),
mindist + iter.GetValue());
}
VoteToHalt();
}
};
class MinIntCombiner : public Combiner<int> {
virtual void Combine(MessageIterator* msgs) {
int mindist = INF;
for (; !msgs->Done(); msgs->Next())
mindist = min(mindist, msgs->Value());
Output("combined_source", mindist);
}
};
5. 待解决问题
- 如果存在supernode, 会导致某一部分图的计算量要远远大于其它部分, 而且这部分压力会集中在一个机器上
- 因为每个superstep结束后, 才能进行下一个, 所以计算速度取决于最慢的那台机器, 这对异构系统不友好. 在新老机器混杂的集群里, 需要非常强大的资源管理系统协调. 裸上一个yarn的默认策略绝对会跪.













网友评论