美文网首页
pyspark读写hbase

pyspark读写hbase

作者: EricLee_1900 | 来源:发表于2020-03-27 12:21 被阅读0次

安装pyspark

pip install pyspark==2..3.2 --default-timeout=100

注释spark相关环境变量 or export $SPARK_HOME=; export $PATH=

source ~/.bash_profile

重启终端

初始:将所依赖的hbase jar包拷贝到 spark/jars/

'''

function: pyspark - spark-2.3.2 - write data to hbase-1.4.9_stable

prepare before run this method

1. copy hbase-1.2.9/lib/hbase-*.jar to spark-2.1.2/jars/

2. copy hbase-1.4.9/lib/htrace-*.jar to spark-2.1.2/jars/

3. copy hbase-1.4.9/lib/metrics-core-*.jar to spark-2.1.2/jars/

4. download spark-example-*.jar and copy to spark-2.3.2/jars/

hbase下载地址:https://archive.apache.org/dist/hbase/下载对应稳定版本

spark-example-*.jar下载地址 :https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001

'''

使用:上线/测试时:使用anaconda 打包,然后将所依赖的jar包拷贝到 */pyspark/jars/ 目录下

程序实际运行:使用Anaconda 打包的python3.6.5 环境

本地读写线上测试集群时,无法得到表的meta信息,连接不到meta所在的regionServer所在的节点ip,

需要在本地/etc/hosts 添加:

sudo vim /etc/hosts

##

# Host Database

#

# localhost is used to configure the loopback interface

# when the system is booting. Do not change this entry.

##

127.0.0.1 localhost

255.255.255.255 broadcasthost

::1 localhost

10.10.126.40 ops-hadoop00.iyunxiao.com

10.10.126.41 ops-hadoop01.iyunxiao.com

10.10.126.42 ops-hadoop02.iyunxiao.com

10.10.126.43 ops-hadoop03.iyunxiao.com

10.10.126.44 ops-hadoop04.iyunxiao.com

10.10.126.45 ops-hadoop05.iyunxiao.com

10.10.126.46 ops-hadoop06.iyunxiao.com

10.10.126.47 ops-hadoop07.iyunxiao.com

pyspark 读取 base

# -*- coding: utf-8 -*-

import os

from pyspark import SparkContext, HiveContext, SparkConf

import pdb

from pyspark import sql

import os

from datetime import datetime

‘’‘

   1.function: pyspark - spark-2.3.2 - read data from hbase-1.4.9_stable

’‘’

print ('start time :', datetime.now())

os.environ['PYSPARK_PYTHON']="/Library/Frameworks/Python.framework/Versions/3.6/bin/python3"

#os.environ['PYSPARK_PYTHON']="/Users/lixujian/Documents/software/py3env/bin/python3"

conf = SparkConf()

conf.setMaster("local[*]")

conf.setAppName('zsk app')

conf.set("spark.driver.host", "localhost")

sc = SparkContext(conf = conf)

print ('&'*140)

print ('SparkContext create well done ...')

#host = 'localhost'

#host = '172.16.1.115'

#host = '10.10.126.40'

host = '10.10.2.2,10.10.2.3,10.10.2.4'

port = '2181'

table_name = 'lxj_test'

hbase_conf = {

        "hbase.zookeeper.quorum": host,

        "hbase.zookeeper.property.clientPort": port,

        "hbase.mapreduce.inputtable": table_name,

# "hbase.mapreduce.scan.columns": "cf:age",

# "hbase.mapreduce.scan.columns": "cf:name",

# "hbase.mapreduce.scan.row.start":

# "hbase.mapreduce.scan.row.stop":

# "hbase.mapreduce.scan.column.family":

# "hbase.mapreduce.scan.columns":

# "hbase.mapreduce.scan.timestamp":

# "hbase.mapreduce.scan.timerange.start":

# "hbase.mapreduce.scan.timerange.end":

# "hbase.mapreduce.scan.maxversions":

# "hbase.mapreduce.scan.cacheblocks":

# "hbase.mapreduce.scan.cachedrows":

        "hbase.mapreduce.scan.batchsize": '1000',

    }

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"

valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

print ('*' * 140)

# newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)

hbase_rdd = sc.newAPIHadoopRDD(

    "org.apache.hadoop.hbase.mapreduce.TableInputFormat",

    "org.apache.hadoop.hbase.io.ImmutableBytesWritable",

    "org.apache.hadoop.hbase.client.Result",

    keyConverter = keyConv,

    valueConverter = valueConv,

    conf = hbase_conf,

    batchSize = 1000,

    )

print (' connection hbase pass ...')

print ('#' * 140)

#count = hbase_rdd.count()

#hbase_rdd.cache()

#print ('count =', count)

output = hbase_rdd.collect()

for k, v in output:

    print ('k :', k)

    print ('v :', v)

    break

print ('End time :', datetime.now())

print ('All is well done ...')

pyspark 写 hbase

# -*- coding: utf-8 -*-

import os

from datetime import datetime

from pyspark import SparkContext, HiveContext, SparkConf

import pdb

from pyspark import sql

from pyspark.sql import SparkSession

'''

    function: pyspark - spark-2.3.2 - write data to hbase-1.4.9_stable

'''

print ('$'*140)

print ('start time :', datetime.now())

os.environ['PYSPARK_PYTHON']="/Library/Frameworks/Python.framework/Versions/3.6/bin/python3"

conf = SparkConf()

conf.setMaster("local[*]")

conf.setAppName('zsk app')

conf.set("spark.driver.host", "localhost")

sc = SparkContext(conf = conf)

print ('create SparkContext well done ...')

#host = '172.16.1.115'

#host = 'localhost,'

host = '10.10.2.2,10.10.2.3,10.10.2.4'

port = '2181'

table_name = 'lxj_test'

print ('*' * 140)

keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"

valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

hbase_conf={

    "hbase.zookeeper.quorum": host,

    "hbase.zookeeper.property.clientPort": port,

    "hbase.mapred.outputtable": table_name,

    "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",

    "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",

    "mapreduce.job.output.value.class": "org.apache.hadoop.hbase.client.Result",

# "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable",

    }

print ('start write data to hbase ...')

# rdd写入的格式为(rowkey, [rowkey,  col_family,  column,  value])

# ('rowkey002', ['rowkey002, cf, name, eric'])

rawData = ['rowkey013,cf,age,18', 'rowkey014,cf,age,20']

#sc.parallelize(rawData).map(lambda x: (x[0], x.split(','))).saveAsHadoopDataset(

sc.parallelize(rawData).map(lambda x: (x[0:], x.split(','))).saveAsNewAPIHadoopDataset(

        keyConverter = keyConv,

        valueConverter = valueConv,

        conf = hbase_conf,

        )

#pirnt ('start save data as pickle file ...')

#file_name = 'pickle_lixujian'

#sc.parallelize(range(10)).saveAsPickleFile(file_name, 5)

print ('End time :', datetime.now())

print ('All is well done ...')

相关文章

网友评论

      本文标题:pyspark读写hbase

      本文链接:https://www.haomeiwen.com/subject/csaoehtx.html