import happybase
import pandas as pd
def get_hbase_pool(host, size=5):
"""获取一个连接池
:param host: hbase主机ip
:return: 连接池
"""
pool = happybase.ConnectionPool(host=host, size=int(size)) # 因R传递的size过来非int,需用int转换
return pool
def read_hbase_data(pool, table_name, col_filter, col, row_prefix=None):
"""读取hbase的数据表并转换成dataframe输出
:param pool: 连接池
:param col_filter: 过滤器(数据筛选)
:param col: dataframe的列名与read_col对应
:param row_prefix: 指定row_key的前缀
:param table_name: 要读取hbase的表名
:return: 数据读取结果dataframe
"""
read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定读取hbase数据表的列名(base64编码)
row_prefix = row_prefix.encode()
result = pd.DataFrame(columns=col)
with pool.connection() as connection:
try:
#print(connection.tables()) #所有数据表
tab = connection.table(table_name)
for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
#print('key= ', key, '\nvalue= ', value)
col_value = pd.DataFrame.from_dict(value, orient='index').T # dict转dataframe
col_value = col_value.applymap(lambda x: str(x, 'utf-8')) # 将bytes解码为utf-8
col_value.columns = col
result = result.append(col_value)
connection.close()
except Exception as e:
connection.close()
print('Error:', e)
return result
def read_hbase_data_nopool(host, table_name, col_filter, col, row_prefix=None):
"""读取hbase的数据表并转换成dataframe输出
:param host: hbase主机ip
:param col_filter: 过滤器(数据筛选)
:param col: dataframe的列名与read_col对应
:param row_prefix: 指定row_key的前缀
:param table_name: 要读取hbase的表名
:return: 数据读取结果dataframe
"""
read_col = [i.encode() for i in ['f:{}'.format(col[i]) for i in range(len(col))]] #指定读取hbase数据表的列名(base64编码)
row_prefix = row_prefix.encode()
result = pd.DataFrame(columns=col)
connection = happybase.Connection(host, autoconnect=False) # ip
connection.open()
try:
#print(connection.tables()) #所有数据表
tab = connection.table(table_name)
for key, value in tab.scan(row_prefix=row_prefix, columns=read_col, filter=col_filter):
#print('key= ', key, '\nvalue= ', value)
col_value = pd.DataFrame.from_dict(value, orient='index').T # dict转dataframe
col_value = col_value.applymap(lambda x: str(x, 'utf-8')) # 将bytes解码为utf-8
col_value.columns = col
result = result.append(col_value)
connection.close()
except Exception as e:
connection.close()
print('Error:', e)
return result
if __name__ == "__main__":
host = 'ip'
table_name = '表名'
row_prefix = None # row_key的前缀
col_filter = "SingleColumnValueFilter('f', 'x', =, 'binary:a')" # 过滤器 筛选x=a的
col = ['id', 'x', 'y'] # dataframe的列名
pool = get_hbase_pool(host, size=5)
result = read_hbase_data(pool=pool, table_name=table_name, col_filter=col_filter,
col=col, row_prefix=row_prefix)
网友评论