美文网首页
Spark分组取TopN

Spark分组取TopN

作者: 阿坤的博客 | 来源:发表于2018-09-28 13:58 被阅读156次

本文记录了利用Scala和Java两种语言来实现先分组,然后取每个分组的TopN。

1.文本内容

class1 90
class2 56
class1 87
class1 76
class2 88
class1 95
class1 74
class2 87
class2 67
class2 77

班级名 空格 分数

2.scala实现分组TopN

object ScalaGroupTop3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("ScalaGroupTop3")
      .setMaster("local[1]")

    val sc = new SparkContext(conf)

    sc.textFile("D:\\workspaces\\idea\\hadoop\\spark\\data\\score.txt")
      .map(line => {
        val datas = line.split(" ")
        (datas(0), datas(1))
      })
      .groupByKey()
      .map(group => (group._1, group._2.toList.sortWith(_ > _).take(3)))
      .sortByKey()
      .foreach(group => {
        println(group._1)
        group._2.foreach(println)
      })

    sc.stop()
  }
}

计算结果:

class1
95
90
87
class2
88
87
77

3.java实现分组TopN

public class GroupTop3 {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("Top3")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile("D:\\workspaces\\idea\\hadoop\\spark\\data\\score.txt");

        JavaPairRDD<String, Integer> pairs = lines.mapToPair(
                new PairFunction<String, String, Integer>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(String line) {
                        String[] lineSplited = line.split(" ");
                        return new Tuple2<>(lineSplited[0], Integer.valueOf(lineSplited[1]));
                    }
                });

        JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();

        JavaPairRDD<String, Iterable<Integer>> top3Score = groupedPairs.mapToPair(
                new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> classScores) {
                        Integer[] top3 = new Integer[3];

                        String className = classScores._1;
                        Iterator<Integer> scores = classScores._2.iterator();

                        while (scores.hasNext()) {
                            Integer score = scores.next();
                            for (int i = 0; i < 3; i++) {
                                if (top3[i] == null) {
                                    top3[i] = score;
                                    break;
                                } else if (score > top3[i]) {
                                    for (int j = 2; j > i; j--) {
                                        top3[j] = top3[j - 1];
                                    }
                                    top3[i] = score;
                                    break;
                                }
                            }
                        }
                        return new Tuple2<>(className, Arrays.asList(top3));
                    }

                });

        top3Score.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {

            private static final long serialVersionUID = 1L;

            @Override
            public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
                System.out.println("class: " + t._1);
                Iterator<Integer> scoreIterator = t._2.iterator();
                while (scoreIterator.hasNext()) {
                    Integer score = scoreIterator.next();
                    System.out.println(score);
                }
                System.out.println("=======================================");
            }
        });

        sc.close();
    }
}

计算结果:

class: class1
95
90
87
=======================================
class: class2
88
87
77
=======================================

相关文章

  • Spark分组取TopN

    本文记录了利用Scala和Java两种语言来实现先分组,然后取每个分组的TopN。 1.文本内容 班级名 空格 分...

  • spark分组取topN

    row_number() over(partition by cooperate_id order by day ...

  • Spark 分组TopN

  • 2020-11-27-Spark-6(Spark-Core)

    spark练习题处理数据上的分组和业务需求上的分组 1.案例topN(要点使用模式匹配重新分组) 2.基础练习题(...

  • Hive分组取TOPN数据

    1、ROW_NUMBER,RANK(),DENSE_RANK() 语法格式:row_number() OVER (...

  • hive分组取随机数

    hive取随机的数据,可以使用rand()函数,用rand()对数据排序,取topN如果要用到分组取随机数,比如每...

  • Spark-窗口函数实现原理及各种写法

    平时使用窗口函数最多的情况就是-根据某个字段分组,取组内的TopN(也可能是随机取N条),在没接触窗口函数之前,使...

  • Hive分组TopN

    People表明细如下: 需求:按照性别分组,求分组后年龄最大的两个年龄的人员信息 学生技能表如下 需求:按照技能...

  • mongodb 分组 topN

    来源:https://groups.google.com/forum/#!topic/mongodb-user/R...

  • Spark topN排序

    源数据 排序后 实现方法

网友评论

      本文标题:Spark分组取TopN

      本文链接:https://www.haomeiwen.com/subject/qtbsoftx.html