下面这种方式是全表扫描,Spark如果通过RS来访问Hbase数据进行数据分析,对RS会产生很大的压力。不太建议使用下面的方式
先看代码
object SparkSqlReadHbase {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(SparkSqlReadHbase.getClass.getSimpleName)
.setMaster("local[5]")
val sc = new SparkContext(conf)
val conf1 = HBaseConfiguration.create
conf1.set(TableInputFormat.INPUT_TABLE, "tableName");
val hbaseRDD = sc.newAPIHadoopRDD(conf1, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val data = hbaseRDD.map(result => {
val filed1= Bytes.toString(result._2.getValue("cf".getBytes(), "filed1".getBytes()))
val filed2= Bytes.toString(result._2.getValue("cf".getBytes(), "filed2".getBytes()))
val filed3= Bytes.toString(result._2.getValue("cf".getBytes(), "filed3".getBytes()))
val filed4= Bytes.toString(result._2.getValue("cf".getBytes(), "filed3".getBytes()))
caseClass(filed1, filed2, filed3, filed4)
}).toDF().registerTempTable("tmptable")
val sql =
"""
| select *
| from tmptable
""".stripMargin
sqlContext.sql(sql).show()
}
}
在本地测试时返现运行的很慢,后来看到以下日志
18/11/27 16:05:31 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
18/11/27 16:05:31 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
18/11/27 16:05:31 INFO NewHadoopRDD: Input split: HBase table split(table name: ebay_product_info, scan: , start row: , end row: 2525931568352018-11-09, region location: cdh-005)
18/11/27 16:05:31 INFO NewHadoopRDD: Input split: HBase table split(table name: ebay_product_info, scan: , start row: 2525931568352018-11-09, end row: , region location: cdh-005)
由于Hbase表中只有两个region,所以只启动两个Task,此时并行度为二!
那么也就是说Spark读取Hbase的并行度取决于这个表有多少个region。然后根据region的startkey和endkey来获取数据








网友评论