相关连接:https://www.cnblogs.com/xing901022/p/8486290.html
http://hbase.group/article/93
1.常用的错误方法:
<1> 自己实现一个Connection对象的资源池,每次使用都从资源池中取出一个Connection对象;
<2> 每个线程一个Connection对象。
<3> 每次访问HBase的时候临时创建一个Connection对象,使用完之后调用close关闭连接。
这些做法是我们连接mysql等jdbc连接的时候常用的,显然,我们是将Connection对象当成了单机数据库里面的连接对象来用了。
然而作为分布式数据库,HBase客户端需要和多个服务器中的不同服务角色建立连接,所以HBase客户端中的Connection对象并不是简单对应一个socket连接。
截取一张工作中的代码:
image.png
对DataFrame重分区的时候,在每个分区建一个HbaseServer ====》这样的做法到底对不对呢??
【解析:】
这里的HbaseServer是公司前辈封装的一个类,里面都是Hbase常用的API,代码如下
public class HbaseServer implements Closeable {
protected Connection connection;
protected Admin admin;
protected Table table;
protected TableName tableName;
public HbaseServer(String tableName, Configuration config) throws IOException {
this.connection = null;
this.admin = null;
this.table = null;
if (config == null) {
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.property.clientPort", "2181");
config.set("hbase.zookeeper.quorum", "hnode2,hnode4,hnode5");
}
config.set("hbase.client.write.buffer", "6291456");
this.connection = ConnectionFactory.createConnection(config);
this.admin = this.connection.getAdmin();
if (tableName != null) {
this.setTableName(tableName);
}
}
public HbaseServer(String tableName) throws IOException {
this(tableName, (Configuration)null);
}
public HbaseServer() throws IOException {
this((String)null, (Configuration)null);
}
public HbaseServer(Configuration config) throws IOException {
this((String)null, config);
}
public HbaseServer createTable(String tableName) throws IOException {
if (this.admin.tableExists(TableName.valueOf(tableName))) {
throw new IOException("Table create failed: table already exists!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
this.admin.createTable(tableDesc);
this.setTableName(tableName);
return this;
}
}
public HbaseServer createTable(String tableName, String[] familys, Integer version) throws IOException {
TableName tName = TableName.valueOf(tableName);
if (this.admin.tableExists(tName)) {
throw new IOException("Table create failed: table already exists!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(tName);
for(int i = 0; i < familys.length; ++i) {
HColumnDescriptor cd = new HColumnDescriptor(familys[i]);
if (version != null) {
cd.setMaxVersions(version);
}
tableDesc.addFamily(cd);
}
this.admin.createTable(tableDesc);
this.admin.flush(tName);
this.setTableName(tableName);
return this;
}
}
public void flush(String tableName) throws IOException {
TableName tName = TableName.valueOf(tableName);
this.admin.split(tName);
}
public boolean isTableExist() throws IOException {
return this.admin.tableExists(this.tableName);
}
public boolean isTableExist(String tableName) throws IOException {
return this.admin.tableExists(TableName.valueOf(tableName));
}
public boolean isRowExist(String rowKey) throws IOException {
Get get = new Get(Bytes.toBytes(rowKey));
Result rs = this.getTable().get(get);
return !rs.isEmpty();
}
public void addColumnFamily(String columnFamily) throws IOException {
if (!this.isFamilyExist(columnFamily)) {
this.admin.disableTable(this.tableName);
HColumnDescriptor column = new HColumnDescriptor(columnFamily);
this.admin.addColumn(this.tableName, column);
this.admin.enableTable(this.tableName);
}
}
public HbaseServer setTableName(String tableName) throws IOException {
if (tableName == null) {
throw new RuntimeException("tableName can not be null");
} else {
if (this.table != null) {
this.table.close();
this.table = null;
this.tableName = null;
}
this.tableName = TableName.valueOf(tableName);
this.table = this.connection.getTable(this.tableName);
return this;
}
}
public void deleteTable() throws IOException, InterruptedException {
this.admin.disableTable(this.tableName);
this.admin.deleteTable(this.tableName);
}
public void deleteTable(String tableName) throws IOException, InterruptedException {
TableName tName = TableName.valueOf(tableName);
if (this.isTableExist(tableName)) {
this.admin.disableTable(tName);
this.admin.deleteTable(tName);
}
}
public void delete(String rowKey) throws IOException {
Delete del = new Delete(Bytes.toBytes(rowKey));
this.getTable().delete(del);
}
public void delete(List<Delete> list) throws IOException {
this.getTable().delete(list);
}
public boolean isFamilyExist(String family) throws IOException {
return this.admin.getTableDescriptor(this.tableName).hasFamily(Bytes.toBytes(family));
}
public void deleteTableFamilyData(String family) throws IOException {
if (this.isFamilyExist(family)) {
this.admin.disableTable(this.tableName);
this.admin.deleteColumn(this.tableName, Bytes.toBytes(family));
this.admin.addColumn(this.tableName, new HColumnDescriptor(family));
this.admin.enableTable(this.tableName);
}
}
public void deleteFamily(String rowKey, String family) throws IOException {
if (this.isFamilyExist(family)) {
Delete del = new Delete(Bytes.toBytes(rowKey));
del.addFamily(Bytes.toBytes(family));
this.getTable().delete(del);
}
}
public void deleteColumn(String rowKey, String family, String column) throws IOException {
Delete del = new Delete(Bytes.toBytes(rowKey));
del.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
this.getTable().delete(del);
}
public List<HRegionInfo> getRegionInfo() throws IOException {
return this.admin.getTableRegions(this.tableName);
}
public <T> List<Pair<T, T>> getRegionRowkeyPair(Class<T> kls) throws IOException {
Iterator<HRegionInfo> ir = this.getRegionInfo().iterator();
HRegionInfo regionInfo = null;
ArrayList rowkeyPair = new ArrayList();
while(ir.hasNext()) {
regionInfo = (HRegionInfo)ir.next();
T startRowkey = HBytes.getValue(regionInfo.getStartKey(), kls);
T endRowkey = HBytes.getValue(regionInfo.getEndKey(), kls);
rowkeyPair.add(new Pair(startRowkey, endRowkey));
}
return rowkeyPair;
}
public void flush() throws IOException, InterruptedException {
if (this.admin.tableExists(this.tableName)) {
this.admin.flush(this.tableName);
}
}
protected Table getTable() {
if (this.table == null) {
throw new RuntimeException("no table specified");
} else {
return this.table;
}
}
public void close() throws IOException {
if (this.table != null) {
this.table.close();
}
if (this.admin != null) {
this.admin.close();
}
if (this.connection != null) {
this.connection.close();
}
}
public static void saveToHbase(String tableName, List<Putable> putter) throws Exception {
HbaseServer server = null;
try {
server = new HbaseServer(tableName);
Iterator var3 = putter.iterator();
while(var3.hasNext()) {
Putable putable = (Putable)var3.next();
putable.put(server);
}
} finally {
if (server != null) {
server.close();
}
}
}
public void put(String rowkey, String columnFamily, String qualifier, Object cell) throws IOException {
Put put = (new Put(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), HBytes.toHBytes(cell));
this.put(put);
}
public void put(String rowkey, String columnFamily, String qualifier, long timestamp, Object cell) throws IOException {
Put put = (new Put(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), timestamp, HBytes.toHBytes(cell));
this.put(put);
}
public void put(String rowkey, String columnFamily, String qualifier, long timestamp, Set<String> cell) throws IOException {
Put put = (new Put(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), timestamp, HBytes.toHBytes(StringUtils.join(cell, StringSetFilter.SPILTER)));
this.put(put);
}
public void put(Put put) throws IOException {
this.getTable().put(put);
}
public void put(List<Put> puts) throws IOException {
this.getTable().put(puts);
}
public <T> List<T> getValueSortByFrequncy(String rowkey, String columnFamily, String qualifier, Class<T> kls) throws IOException {
List<T> values = this.getAllVersions(rowkey, columnFamily, qualifier, kls);
return values != null && values.size() > 0 ? HListUtil.sortByFrequcy(values) : null;
}
public <T> List<T> getValueSortByFrequncy(Result result, String columnFamily, String qualifier, Class<T> kls) throws IOException {
List<T> values = this.getAllVersions(result, columnFamily, qualifier, kls);
return values != null && values.size() > 0 ? HListUtil.sortByFrequcy(values) : null;
}
public <T> T getTopFrequncyValue(String rowkey, String columnFamily, String qualifier, Class<T> kls) throws IOException {
List<T> result = this.getValueSortByFrequncy(rowkey, columnFamily, qualifier, kls);
return result != null && result.size() >= 1 ? result.get(0) : null;
}
public <T> T getTopFrequncyValue(Result result, String columnFamily, String qualifier, Class<T> kls) throws IOException {
List<T> r = this.getValueSortByFrequncy(result, columnFamily, qualifier, kls);
return r != null && r.size() >= 1 ? r.get(0) : null;
}
public List<Cell> getAllVersions(String rowkey, String columnFamily, String qualifier) throws IOException {
Get get = (new Get(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)).setMaxVersions();
Result result = this.getTable().get(get);
return result.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
}
public Cell getLastest(String rowkey, String columnFamily, String qualifier) throws IOException {
Get get = (new Get(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
Result result = this.getTable().get(get);
return this.getLastest(result, columnFamily, qualifier);
}
public Cell getLastest(Result result, String columnFamily, String qualifier) throws IOException {
return result != null && !result.isEmpty() ? result.getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)) : null;
}
public <T> T getLastest(String rowkey, String columnFamily, String qualifier, Class<T> kls) throws IOException {
Cell cell = this.getLastest(rowkey, columnFamily, qualifier);
return cell == null ? null : HBytes.getValue(CellUtil.cloneValue(cell), kls);
}
public <T> T getLastest(Result result, String columnFamily, String qualifier, Class<T> kls) throws IOException {
Cell cell = this.getLastest(result, columnFamily, qualifier);
return cell == null ? null : HBytes.getValue(CellUtil.cloneValue(cell), kls);
}
public <T> List<T> getAllVersions(String rowkey, String columnFamily, String qualifier, Class<T> kls) throws IOException {
List<Cell> cells = this.getAllVersions(rowkey, columnFamily, qualifier);
List<T> r = new ArrayList();
Iterator var7 = cells.iterator();
while(var7.hasNext()) {
Cell cell = (Cell)var7.next();
r.add(HBytes.getValue(CellUtil.cloneValue(cell), kls));
}
return r;
}
public <T> List<T> getAllVersions(Result result, String columnFamily, String qualifier, Class<T> kls) throws IOException {
List<T> r = new ArrayList();
if (result != null && !result.isEmpty()) {
List<Cell> cells = result.getColumnCells(HBytes.toBytes(columnFamily), HBytes.toBytes(qualifier));
Iterator var7 = cells.iterator();
while(var7.hasNext()) {
Cell cell = (Cell)var7.next();
r.add(HBytes.getValue(CellUtil.cloneValue(cell), kls));
}
return r;
} else {
return r;
}
}
public Result[] get(List<Get> gets) throws IOException {
return this.getTable().get(gets);
}
public Result get(Get get) throws IOException {
return this.getTable().get(get);
}
public Result[] get(List<String> rowkeys, String columnFamily, String qualifier) throws IOException {
return this.get(rowkeys, columnFamily, Arrays.asList(qualifier));
}
public Result[] get(List<String> rowkeys, String columnFamily, List<String> qualifiers) throws IOException {
List<Get> gets = new ArrayList();
Iterator var5 = rowkeys.iterator();
while(var5.hasNext()) {
String rowkey = (String)var5.next();
Get get = (new Get(Bytes.toBytes(rowkey))).setMaxVersions();
Iterator var8 = qualifiers.iterator();
while(var8.hasNext()) {
String qualifier = (String)var8.next();
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
}
gets.add(get);
}
return this.get((List)gets);
}
public Result get(String rowkey, String columnFamily, String qualifier) throws IOException {
Set<String> qulifiers = new HashSet();
if (qualifier != null) {
qulifiers.add(qualifier);
}
return this.get((String)rowkey, columnFamily, (Set)qulifiers);
}
public Result get(String rowkey, String columnFamily, Set<String> qualifiers) throws IOException {
Get get = (new Get(Bytes.toBytes(rowkey))).setMaxVersions();
if (columnFamily != null) {
if (qualifiers != null && !qualifiers.isEmpty()) {
Iterator var5 = qualifiers.iterator();
while(var5.hasNext()) {
String qualifier = (String)var5.next();
get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
}
} else {
get.addFamily(Bytes.toBytes(columnFamily));
}
}
return this.getTable().get(get);
}
public Result get(String rowkey, String columnFamily) throws IOException {
return this.get(rowkey, columnFamily, (String)null);
}
public Result get(String rowkey, int versions) throws IOException {
Get get = new Get(Bytes.toBytes(rowkey));
if (versions > 0) {
get.setMaxVersions(versions);
} else {
get.setMaxVersions();
}
return this.getTable().get(get);
}
public Result get(String rowkey) throws IOException {
return this.get(rowkey, -1);
}
public ResultScanner scan(Scan scan) throws IOException {
return this.getTable().getScanner(scan);
}
public static Map<String, List<String>> filterQualifier(Result result, String... qualifiersPrefix) {
Map<String, List<String>> mapedQualifiers = new HashMap();
if (qualifiersPrefix != null && qualifiersPrefix.length != 0) {
if (result != null && !result.isEmpty()) {
List<Cell> cells = result.listCells();
if (cells != null) {
Iterator var4 = cells.iterator();
while(var4.hasNext()) {
Cell cell = (Cell)var4.next();
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String[] var7 = qualifiersPrefix;
int var8 = qualifiersPrefix.length;
for(int var9 = 0; var9 < var8; ++var9) {
String prefix = var7[var9];
if (qualifier.startsWith(prefix)) {
List<String> q = (List)mapedQualifiers.get(prefix);
if (q == null) {
q = new ArrayList();
}
((List)q).add(qualifier);
mapedQualifiers.put(prefix, q);
}
}
}
}
}
return mapedQualifiers;
} else {
return mapedQualifiers;
}
}
}
其实就是在每一个partition上创建了一个Connection,1个partition肯定是在1个executor上的,所以这里创建Connection是没问题的。
相反的,如果多个partition共用1个Connection,反而是会出问题的,如果Partition A中数据较少,而Partition B中数据较多,A插入数据到Hbase中后,可能就会关闭连接,导致B还有数据没有插完。
注意错误:如果如下图所示,在每个partition上创建Connection就会出问题,partition A创建了一个connection后,b就不会再创建了。
def getConnection(): Connection = {
if (conf == null) {
conf.set("hbase.zookeeper.quorum", "172.16.26.6:2181,172.16.26.10:2181,172.16.26.13:2181")
}
if ((connection == null || connection.isClosed()) && conf != null) {
try {
connection = ConnectionFactory.createConnection(conf)
} catch {
case e: Exception => e.printStackTrace()
}
}
return connection;
}
2.连接Hbase的正确做法
但也要具体问题具体分析
//所有进程共用一个Connection对象
connection=ConnectionFactory.createConnection(config);
...
//每个线程使用单独的Table对象
Table table = connection.getTable(TableName.valueOf("test"));
try {
...
} finally {
table.close();
}










网友评论