美文网首页
graphX 实现LPA算法,Java 版本

graphX 实现LPA算法,Java 版本

作者: NazgulSun | 来源:发表于2021-08-13 12:41 被阅读0次

point

  • InitMsg, 各个节点的id 作为初始化标签
  • sendMsg, 沿着outEdge sendMsg, source 和 target 都发送
  • merge:计算收到邻居节点的 标签,存在map里面;
  • vproj: 更新节点的属性,从收到的消息里面,取标签数量最大的。
        SparkConf conf = new SparkConf().setAppName("Graphx Learning");
        conf.setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        ClassTag<Long> LongTag = scala.reflect.ClassTag$.MODULE$.apply(Long.class);
        ClassTag<HashMap<Object,Long>> mapTag = scala.reflect.ClassTag$.MODULE$.apply(Map.class);

        List<String> edges = Lists.newArrayList("1 2", "1 3", "2 4", "3 4", "3 5", "4 5", "5 6", "6 7",
                "6 9", "7 11", "7 8", "9 8", "9 13", "8 10", "10 13",
                "13 12", "10 11", "11 12");



        JavaRDD<Edge<Long>> rddEdges = sc.parallelize(edges).map(x->
                 new Edge(Long.parseLong(x.split(" ")[0]),
                         Long.parseLong(x.split(" ")[1]),1L));

        Graph<Long,Long> graph = Graph.fromEdges(rddEdges.rdd(),0L,
                StorageLevel.MEMORY_ONLY(), StorageLevel.MEMORY_ONLY(),LongTag,LongTag);

            Object obj = reflexivity();
            Predef.$eq$colon$eq<Long,Long> tpEqus = ( Predef.$eq$colon$eq<Long,Long>) obj;
        Graph<Long, Long> ready = graph.mapVertices(new SimpleInitGraph(),LongTag,tpEqus);

        Graph<Long,Long> LPA = ready.ops().pregel(new HashMap<Object,Long>(),
                10, EdgeDirection.Either(),
                new VProj(), new SendMsg(), new ReduceMsg(),mapTag );
        LPA.vertices().toJavaRDD().foreach(x->{
            System.out.println(x._1+"\t"+x._2);
        });
    }


    static public <T> Predef.$eq$colon$eq<T, T> reflexivity() {
        return Predef.$eq$colon$eq$.MODULE$.tpEquals();
    }


    static class SimpleInitGraph extends AbstractFunction2<Object,Long,Long> implements Serializable {


        @Override
        public Long apply(Object v1, Long v2) {
            return (Long)v1;
        }
    }

    static class VProj extends AbstractFunction3<Object,Long,HashMap<Object,Long>,Long> implements Serializable {

        @Override
        public Long apply(Object v1, Long v2, HashMap<Object,Long> v3) {
            if(MapUtils.isEmpty(v3)) return v2;
            //找出标签数最多的节点;LPA
            //如果是多社区挖掘;这里可以使用多个 节点;按照标签数目排序等;变成SLPA算法
            List<Map.Entry<Object,Long>> list = new ArrayList(v3.entrySet());
            Collections.sort(list, (o1, o2) -> (o2.getValue().compareTo(o1.getValue())));
            return (Long)list.get(0).getKey();
        }
    }

    static class SendMsg extends AbstractFunction1<EdgeTriplet<Long,Long>, Iterator<Tuple2<Object,HashMap<Object,Long>>>>
            implements Serializable{

        @Override
        public Iterator<Tuple2<Object, HashMap<Object, Long>>> apply(EdgeTriplet<Long, Long> v1) {
            HashMap<Object,Long> msg = new HashMap<>();
            msg.put(v1.dstAttr(),1L);
            res.add(new Tuple2<Object, HashMap<Object, Long>>((Object)v1.srcId(), msg));

            HashMap<Object,Long> msg_rev = new HashMap<>();
            msg_rev.put(v1.srcAttr(),1L);
            res.add(new Tuple2<Object, HashMap<Object, Long>>((Object)v1.dstId(), msg_rev));
            return JavaConverters.asScalaIteratorConverter(res.iterator()).asScala();
        }
    }

    static class ReduceMsg extends AbstractFunction2<HashMap<Object,Long>,HashMap<Object,Long>,HashMap<Object,Long>>
                            implements Serializable {

        @Override
        public HashMap<Object, Long> apply(HashMap<Object, Long> v1, HashMap<Object, Long> v2) {
            HashMap<Object,Long> msg = new HashMap<>();
            v1.forEach((k,v)->{
                msg.put(k, msg.getOrDefault(k,0L)+v);
            });
            v2.forEach((k,v)->{
                msg.put(k, msg.getOrDefault(k,0L)+v);
            });
            return msg;
        }
    }

相关文章

网友评论

      本文标题:graphX 实现LPA算法,Java 版本

      本文链接:https://www.haomeiwen.com/subject/ilvdbltx.html