point
- InitMsg, 各个节点的id 作为初始化标签
- sendMsg, 沿着outEdge sendMsg, source 和 target 都发送
- merge:计算收到邻居节点的 标签,存在map里面;
- vproj: 更新节点的属性,从收到的消息里面,取标签数量最大的。
SparkConf conf = new SparkConf().setAppName("Graphx Learning");
conf.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
ClassTag<Long> LongTag = scala.reflect.ClassTag$.MODULE$.apply(Long.class);
ClassTag<HashMap<Object,Long>> mapTag = scala.reflect.ClassTag$.MODULE$.apply(Map.class);
List<String> edges = Lists.newArrayList("1 2", "1 3", "2 4", "3 4", "3 5", "4 5", "5 6", "6 7",
"6 9", "7 11", "7 8", "9 8", "9 13", "8 10", "10 13",
"13 12", "10 11", "11 12");
JavaRDD<Edge<Long>> rddEdges = sc.parallelize(edges).map(x->
new Edge(Long.parseLong(x.split(" ")[0]),
Long.parseLong(x.split(" ")[1]),1L));
Graph<Long,Long> graph = Graph.fromEdges(rddEdges.rdd(),0L,
StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),LongTag,LongTag);
Object obj = reflexivity();
Predef.$eq$colon$eq<Long,Long> tpEqus = ( Predef.$eq$colon$eq<Long,Long>) obj;
Graph<Long, Long> ready = graph.mapVertices(new SimpleInitGraph(),LongTag,tpEqus);
Graph<Long,Long> LPA = ready.ops().pregel(new HashMap<Object,Long>(),
10, EdgeDirection.Either(),
new VProj(), new SendMsg(), new ReduceMsg(),mapTag );
LPA.vertices().toJavaRDD().foreach(x->{
System.out.println(x._1+"\t"+x._2);
});
}
static public <T> Predef.$eq$colon$eq<T, T> reflexivity() {
return Predef.$eq$colon$eq$.MODULE$.tpEquals();
}
static class SimpleInitGraph extends AbstractFunction2<Object,Long,Long> implements Serializable {
@Override
public Long apply(Object v1, Long v2) {
return (Long)v1;
}
}
static class VProj extends AbstractFunction3<Object,Long,HashMap<Object,Long>,Long> implements Serializable {
@Override
public Long apply(Object v1, Long v2, HashMap<Object,Long> v3) {
if(MapUtils.isEmpty(v3)) return v2;
//找出标签数最多的节点;LPA
//如果是多社区挖掘;这里可以使用多个 节点;按照标签数目排序等;变成SLPA算法
List<Map.Entry<Object,Long>> list = new ArrayList(v3.entrySet());
Collections.sort(list, (o1, o2) -> (o2.getValue().compareTo(o1.getValue())));
return (Long)list.get(0).getKey();
}
}
static class SendMsg extends AbstractFunction1<EdgeTriplet<Long,Long>, Iterator<Tuple2<Object,HashMap<Object,Long>>>>
implements Serializable{
@Override
public Iterator<Tuple2<Object, HashMap<Object, Long>>> apply(EdgeTriplet<Long, Long> v1) {
HashMap<Object,Long> msg = new HashMap<>();
msg.put(v1.dstAttr(),1L);
res.add(new Tuple2<Object, HashMap<Object, Long>>((Object)v1.srcId(), msg));
HashMap<Object,Long> msg_rev = new HashMap<>();
msg_rev.put(v1.srcAttr(),1L);
res.add(new Tuple2<Object, HashMap<Object, Long>>((Object)v1.dstId(), msg_rev));
return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
}
}
static class ReduceMsg extends AbstractFunction2<HashMap<Object,Long>,HashMap<Object,Long>,HashMap<Object,Long>>
implements Serializable {
@Override
public HashMap<Object, Long> apply(HashMap<Object, Long> v1, HashMap<Object, Long> v2) {
HashMap<Object,Long> msg = new HashMap<>();
v1.forEach((k,v)->{
msg.put(k, msg.getOrDefault(k,0L)+v);
});
v2.forEach((k,v)->{
msg.put(k, msg.getOrDefault(k,0L)+v);
});
return msg;
}
}
网友评论