美文网首页
2018-06-04 初识spark

2018-06-04 初识spark

作者: 江江江123 | 来源:发表于2018-12-27 10:03 被阅读0次

什么是spark:
用户大数据计算的引擎
特点:非常快 原因:内存迭代运算
易用
通用
不能做什么?
不能做数据存储,依赖于hbase,hdfs

入门第一步 wordcount

object WordCount{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ScalaWordCount")
    //描述下mast spark://必带,集群master的ip:hadoop1, 端口号7077 可以在浏览器ip:8080查看界面,当然得是已经正常启动的spark  
    conf.set("spark.master","spark://hadoop1:7077")
    //非常重要的一个对象SparkContext
    val sc = new SparkContext(conf)
//hdfs 
    val textFile = sc.textFile("hdfs://hadoop1:9000/user/test.txt")
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    counts.saveAsTextFile("hdfs://hadoop1:9000/user/testResult.txt")
  }
}

入门第二步:读写hive

object SparkHive {
//配置本地启动
  var conf = new SparkConf().setAppName("HiveApp").setMaster("local").setJars(List("F:\\ideaWorkSpace\\spark\\target\\scalaDemo.jar"))
  System.setProperty("hadoop.home.dir", "E:/hadoops")
  val sc = new SparkContext(conf);
  val sqlContext = new HiveContext(sc);
  
//加载数据
  def loadDate(filePath:String,tableName:String):Unit={
    sqlContext.sql("load data local inpath '"+filePath+"' into table "+tableName);
  }
//查询数据
  def getPeopleByName(name: String):Person={
    val row = sqlContext.sql("select * from people t where t.name='"+name+"'").collect().apply(0);
    return new Person(row.getAs[String](0),row.getAs[Int](1));
  }
//关闭连接
  def destory():Unit={
    conf = null;
    sc.stop();
  }
  def main(args: Array[String]): Unit = {
//测试
    var filePath = "D:/a.txt";
    val tableName = "people";
    sqlContext.sql("drop table if exists people");
    sqlContext.sql("create table people (name string,account int)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE");
    sqlContext.sql("show tables").show();
    loadDate(filePath,tableName);
    sqlContext.sql("select * from people").collect().foreach(println)
    getPeopleByName("1");
    sc.stop()
  }
}
case class Person (name: String, var account: Int){

}

入门第三步:读写hbse

object SparkHbse {
  System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
  private val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local");
  private val sc = new SparkContext(sparkConf);
  val conf = HBaseConfiguration.create()
  //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
  conf.set("hbase.zookeeper.quorum","hadoop1,hadoop2,hadoop3")
  //设置zookeeper连接端口,默认2181
  conf.set("hbase.zookeeper.property.clientPort", "2181")

  def main(args: Array[String]): Unit = {
    //测试 新增,修改,查询
    val people1 = new Person("1",1000);
   /* val people2 = new Person("2",800);
    val persons =  List(people1,people2);*/
   /* SparkHbse.putRdd(persons,"table1")*/
    var hBaseRDD=SparkHbse.getTableRdd("table1");
    hBaseRDD = hBaseRDD.filter(peopleRdd => people1.name.equals(Bytes.toString(peopleRdd._2.getRow)));
    hBaseRDD.foreach{case (_,result) =>{
      //获取行键
      val key = Bytes.toString(result.getRow)
      println(key +":"+ people1.name +"=="+key.equals(people1.name))
      //通过列族和列名获取列
      val cid = Bytes.toInt(result.getValue("cf".getBytes,"cid".getBytes))
      println("Row key:"+key+" cid:"+cid)
    }}
    sc.stop();
  }
  def getTableRdd(tableName: String):RDD[(ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result)]={
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    val admin = new HBaseAdmin(conf)
    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    admin.close();
    return  hBaseRDD;
  }
  def putRdd(persons: List[Person],tableName: String):Unit={
    val jobConf = new JobConf(conf);
    jobConf.setOutputFormat(classOf[TableOutputFormat]);
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
    val indataRdd = sc.makeRDD(persons);
    val rdd = indataRdd.map{person=>{
      val put = new Put(Bytes.toBytes(person.name));
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("cid"),Bytes.toBytes(person.account));
      (new ImmutableBytesWritable,put);
    }}
    rdd.saveAsHadoopDataset(jobConf);
  }
  def destory():Unit = {
    sc.stop()
  }
}

入门第四步:spark stream 读取kafka数据

object LogStream {
  def main(args: Array[String]): Unit = {
//设置本地启动
    val sparkConf = new SparkConf().setAppName("logStream").setMaster("local");
    println("start")
//线程必须大于0
    val numThreads = 1;
  //组id若未在kafka种设置可随意添加
    val groupId = "groupid"
//设置每6秒执行一次
    val ssc = new StreamingContext(sparkConf,Seconds(6))
    val topics = Set("log-flume");
    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    val kafkaParam = Map[String,String]("metadata.broker.list" -> brokers,
      "serializer.class" -> "kafka.serializer.StringEncoder",
      "group.id" -> groupId,
//这个参数表示每次从头开始获取。。如果获取实时数据可不添加,此处用于测试
      "auto.offset.reset" -> OffsetRequest.SmallestTimeString);
    //获取所有数据
    val message = KafkaUtils.createDirectStream[String,String,StringDecoder
      ,StringDecoder](ssc,kafkaParam,topics).map(_._2);
    message.foreachRDD(lines=>{
        println(lines);
      })

    })
    ssc.start();
    ssc.awaitTermination();
  }
}

相关文章

  • 2018-06-04 初识spark

    什么是spark:用户大数据计算的引擎特点:非常快 原因:内存迭代运算易用通用不能做什么?不能做数据存储,依赖于h...

  • spark学习一初识、编译、安装

    spark简介: Spark学习之路 (一)Spark初识 - 扎心了,老铁 - 博客园 1、下载 地址:http...

  • 阿里巴巴资深架构师熬几个通宵肛出来的Spark+Hadoop+中

    Spark大数据分析实战 1、Spark简介 初识Spark Sp ark生态系统BDAS Sp ark架构与运行...

  • 阿里巴巴资深架构师熬几个通宵肛出来的Spark+Hadoop+中

    Spark大数据分析实战 1、Spark简介 初识Spark Sp ark生态系统BDAS Sp ark架构与运行...

  • 初识spark

    大数据及分析环境 bigdata主要体现在:量,速度,多样性数据量,数据流速度(实时,批量,串流),数据多样性(结...

  • Spark初识

    科普 首先推荐一篇文章与 Hadoop 对比,如何看待 Spark 技术里面有很多优秀的回答者,引用用心阁大神的回...

  • 初识spark

    一、官网介绍 1、什么是Spark 官网地址:http://spark.apache.org/ Apache Sp...

  • spark初识篇

    spark初识篇 Spark HA的情况下,zookeeper中存放了哪些数据信息: Application Dr...

  • 【Spark学习笔记】初识spark

    1.Spark简介 快速且通用的集群计算平台 1.1.快速性: Spark扩充了流行的mapreduce计算模型 ...

  • 初识Apache Spark

    第一次接触Spark,自己整理了(从网络,书籍,同事那里)一些Spark的相关内容当做笔记。路过的朋友仅供参考,不...

网友评论

      本文标题:2018-06-04 初识spark

      本文链接:https://www.haomeiwen.com/subject/fcsysftx.html