美文网首页
[Hbase] 如何正确连接Hbase?

[Hbase] 如何正确连接Hbase?

作者: LZhan | 来源:发表于2019-09-27 10:14 被阅读0次

相关连接:https://www.cnblogs.com/xing901022/p/8486290.html
http://hbase.group/article/93

1.常用的错误方法:

<1> 自己实现一个Connection对象的资源池,每次使用都从资源池中取出一个Connection对象;
<2> 每个线程一个Connection对象。
<3> 每次访问HBase的时候临时创建一个Connection对象,使用完之后调用close关闭连接。

这些做法是我们连接mysql等jdbc连接的时候常用的,显然,我们是将Connection对象当成了单机数据库里面的连接对象来用了。
然而作为分布式数据库,HBase客户端需要和多个服务器中的不同服务角色建立连接,所以HBase客户端中的Connection对象并不是简单对应一个socket连接。

截取一张工作中的代码:


image.png

对DataFrame重分区的时候,在每个分区建一个HbaseServer ====》这样的做法到底对不对呢??

【解析:】
这里的HbaseServer是公司前辈封装的一个类,里面都是Hbase常用的API,代码如下

public class HbaseServer implements Closeable {
    protected Connection connection;
    protected Admin admin;
    protected Table table;
    protected TableName tableName;

    public HbaseServer(String tableName, Configuration config) throws IOException {
        this.connection = null;
        this.admin = null;
        this.table = null;
        if (config == null) {
            config = HBaseConfiguration.create();
            config.set("hbase.zookeeper.property.clientPort", "2181");
            config.set("hbase.zookeeper.quorum", "hnode2,hnode4,hnode5");
        }

        config.set("hbase.client.write.buffer", "6291456");
        this.connection = ConnectionFactory.createConnection(config);
        this.admin = this.connection.getAdmin();
        if (tableName != null) {
            this.setTableName(tableName);
        }

    }

    public HbaseServer(String tableName) throws IOException {
        this(tableName, (Configuration)null);
    }

    public HbaseServer() throws IOException {
        this((String)null, (Configuration)null);
    }

    public HbaseServer(Configuration config) throws IOException {
        this((String)null, config);
    }

    public HbaseServer createTable(String tableName) throws IOException {
        if (this.admin.tableExists(TableName.valueOf(tableName))) {
            throw new IOException("Table create failed: table already exists!");
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            this.admin.createTable(tableDesc);
            this.setTableName(tableName);
            return this;
        }
    }

    public HbaseServer createTable(String tableName, String[] familys, Integer version) throws IOException {
        TableName tName = TableName.valueOf(tableName);
        if (this.admin.tableExists(tName)) {
            throw new IOException("Table create failed: table already exists!");
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(tName);

            for(int i = 0; i < familys.length; ++i) {
                HColumnDescriptor cd = new HColumnDescriptor(familys[i]);
                if (version != null) {
                    cd.setMaxVersions(version);
                }

                tableDesc.addFamily(cd);
            }

            this.admin.createTable(tableDesc);
            this.admin.flush(tName);
            this.setTableName(tableName);
            return this;
        }
    }

    public void flush(String tableName) throws IOException {
        TableName tName = TableName.valueOf(tableName);
        this.admin.split(tName);
    }

    public boolean isTableExist() throws IOException {
        return this.admin.tableExists(this.tableName);
    }

    public boolean isTableExist(String tableName) throws IOException {
        return this.admin.tableExists(TableName.valueOf(tableName));
    }

    public boolean isRowExist(String rowKey) throws IOException {
        Get get = new Get(Bytes.toBytes(rowKey));
        Result rs = this.getTable().get(get);
        return !rs.isEmpty();
    }

    public void addColumnFamily(String columnFamily) throws IOException {
        if (!this.isFamilyExist(columnFamily)) {
            this.admin.disableTable(this.tableName);
            HColumnDescriptor column = new HColumnDescriptor(columnFamily);
            this.admin.addColumn(this.tableName, column);
            this.admin.enableTable(this.tableName);
        }

    }

    public HbaseServer setTableName(String tableName) throws IOException {
        if (tableName == null) {
            throw new RuntimeException("tableName can not be null");
        } else {
            if (this.table != null) {
                this.table.close();
                this.table = null;
                this.tableName = null;
            }

            this.tableName = TableName.valueOf(tableName);
            this.table = this.connection.getTable(this.tableName);
            return this;
        }
    }

    public void deleteTable() throws IOException, InterruptedException {
        this.admin.disableTable(this.tableName);
        this.admin.deleteTable(this.tableName);
    }

    public void deleteTable(String tableName) throws IOException, InterruptedException {
        TableName tName = TableName.valueOf(tableName);
        if (this.isTableExist(tableName)) {
            this.admin.disableTable(tName);
            this.admin.deleteTable(tName);
        }

    }

    public void delete(String rowKey) throws IOException {
        Delete del = new Delete(Bytes.toBytes(rowKey));
        this.getTable().delete(del);
    }

    public void delete(List<Delete> list) throws IOException {
        this.getTable().delete(list);
    }

    public boolean isFamilyExist(String family) throws IOException {
        return this.admin.getTableDescriptor(this.tableName).hasFamily(Bytes.toBytes(family));
    }

    public void deleteTableFamilyData(String family) throws IOException {
        if (this.isFamilyExist(family)) {
            this.admin.disableTable(this.tableName);
            this.admin.deleteColumn(this.tableName, Bytes.toBytes(family));
            this.admin.addColumn(this.tableName, new HColumnDescriptor(family));
            this.admin.enableTable(this.tableName);
        }

    }

    public void deleteFamily(String rowKey, String family) throws IOException {
        if (this.isFamilyExist(family)) {
            Delete del = new Delete(Bytes.toBytes(rowKey));
            del.addFamily(Bytes.toBytes(family));
            this.getTable().delete(del);
        }

    }

    public void deleteColumn(String rowKey, String family, String column) throws IOException {
        Delete del = new Delete(Bytes.toBytes(rowKey));
        del.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
        this.getTable().delete(del);
    }

    public List<HRegionInfo> getRegionInfo() throws IOException {
        return this.admin.getTableRegions(this.tableName);
    }

    public <T> List<Pair<T, T>> getRegionRowkeyPair(Class<T> kls) throws IOException {
        Iterator<HRegionInfo> ir = this.getRegionInfo().iterator();
        HRegionInfo regionInfo = null;
        ArrayList rowkeyPair = new ArrayList();

        while(ir.hasNext()) {
            regionInfo = (HRegionInfo)ir.next();
            T startRowkey = HBytes.getValue(regionInfo.getStartKey(), kls);
            T endRowkey = HBytes.getValue(regionInfo.getEndKey(), kls);
            rowkeyPair.add(new Pair(startRowkey, endRowkey));
        }

        return rowkeyPair;
    }

    public void flush() throws IOException, InterruptedException {
        if (this.admin.tableExists(this.tableName)) {
            this.admin.flush(this.tableName);
        }

    }

    protected Table getTable() {
        if (this.table == null) {
            throw new RuntimeException("no table specified");
        } else {
            return this.table;
        }
    }

    public void close() throws IOException {
        if (this.table != null) {
            this.table.close();
        }

        if (this.admin != null) {
            this.admin.close();
        }

        if (this.connection != null) {
            this.connection.close();
        }

    }

    public static void saveToHbase(String tableName, List<Putable> putter) throws Exception {
        HbaseServer server = null;

        try {
            server = new HbaseServer(tableName);
            Iterator var3 = putter.iterator();

            while(var3.hasNext()) {
                Putable putable = (Putable)var3.next();
                putable.put(server);
            }
        } finally {
            if (server != null) {
                server.close();
            }

        }

    }

    public void put(String rowkey, String columnFamily, String qualifier, Object cell) throws IOException {
        Put put = (new Put(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), HBytes.toHBytes(cell));
        this.put(put);
    }

    public void put(String rowkey, String columnFamily, String qualifier, long timestamp, Object cell) throws IOException {
        Put put = (new Put(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), timestamp, HBytes.toHBytes(cell));
        this.put(put);
    }

    public void put(String rowkey, String columnFamily, String qualifier, long timestamp, Set<String> cell) throws IOException {
        Put put = (new Put(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), timestamp, HBytes.toHBytes(StringUtils.join(cell, StringSetFilter.SPILTER)));
        this.put(put);
    }

    public void put(Put put) throws IOException {
        this.getTable().put(put);
    }

    public void put(List<Put> puts) throws IOException {
        this.getTable().put(puts);
    }

    public <T> List<T> getValueSortByFrequncy(String rowkey, String columnFamily, String qualifier, Class<T> kls) throws IOException {
        List<T> values = this.getAllVersions(rowkey, columnFamily, qualifier, kls);
        return values != null && values.size() > 0 ? HListUtil.sortByFrequcy(values) : null;
    }

    public <T> List<T> getValueSortByFrequncy(Result result, String columnFamily, String qualifier, Class<T> kls) throws IOException {
        List<T> values = this.getAllVersions(result, columnFamily, qualifier, kls);
        return values != null && values.size() > 0 ? HListUtil.sortByFrequcy(values) : null;
    }

    public <T> T getTopFrequncyValue(String rowkey, String columnFamily, String qualifier, Class<T> kls) throws IOException {
        List<T> result = this.getValueSortByFrequncy(rowkey, columnFamily, qualifier, kls);
        return result != null && result.size() >= 1 ? result.get(0) : null;
    }

    public <T> T getTopFrequncyValue(Result result, String columnFamily, String qualifier, Class<T> kls) throws IOException {
        List<T> r = this.getValueSortByFrequncy(result, columnFamily, qualifier, kls);
        return r != null && r.size() >= 1 ? r.get(0) : null;
    }

    public List<Cell> getAllVersions(String rowkey, String columnFamily, String qualifier) throws IOException {
        Get get = (new Get(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)).setMaxVersions();
        Result result = this.getTable().get(get);
        return result.getColumnCells(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
    }

    public Cell getLastest(String rowkey, String columnFamily, String qualifier) throws IOException {
        Get get = (new Get(Bytes.toBytes(rowkey))).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
        Result result = this.getTable().get(get);
        return this.getLastest(result, columnFamily, qualifier);
    }

    public Cell getLastest(Result result, String columnFamily, String qualifier) throws IOException {
        return result != null && !result.isEmpty() ? result.getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)) : null;
    }

    public <T> T getLastest(String rowkey, String columnFamily, String qualifier, Class<T> kls) throws IOException {
        Cell cell = this.getLastest(rowkey, columnFamily, qualifier);
        return cell == null ? null : HBytes.getValue(CellUtil.cloneValue(cell), kls);
    }

    public <T> T getLastest(Result result, String columnFamily, String qualifier, Class<T> kls) throws IOException {
        Cell cell = this.getLastest(result, columnFamily, qualifier);
        return cell == null ? null : HBytes.getValue(CellUtil.cloneValue(cell), kls);
    }

    public <T> List<T> getAllVersions(String rowkey, String columnFamily, String qualifier, Class<T> kls) throws IOException {
        List<Cell> cells = this.getAllVersions(rowkey, columnFamily, qualifier);
        List<T> r = new ArrayList();
        Iterator var7 = cells.iterator();

        while(var7.hasNext()) {
            Cell cell = (Cell)var7.next();
            r.add(HBytes.getValue(CellUtil.cloneValue(cell), kls));
        }

        return r;
    }

    public <T> List<T> getAllVersions(Result result, String columnFamily, String qualifier, Class<T> kls) throws IOException {
        List<T> r = new ArrayList();
        if (result != null && !result.isEmpty()) {
            List<Cell> cells = result.getColumnCells(HBytes.toBytes(columnFamily), HBytes.toBytes(qualifier));
            Iterator var7 = cells.iterator();

            while(var7.hasNext()) {
                Cell cell = (Cell)var7.next();
                r.add(HBytes.getValue(CellUtil.cloneValue(cell), kls));
            }

            return r;
        } else {
            return r;
        }
    }

    public Result[] get(List<Get> gets) throws IOException {
        return this.getTable().get(gets);
    }

    public Result get(Get get) throws IOException {
        return this.getTable().get(get);
    }

    public Result[] get(List<String> rowkeys, String columnFamily, String qualifier) throws IOException {
        return this.get(rowkeys, columnFamily, Arrays.asList(qualifier));
    }

    public Result[] get(List<String> rowkeys, String columnFamily, List<String> qualifiers) throws IOException {
        List<Get> gets = new ArrayList();
        Iterator var5 = rowkeys.iterator();

        while(var5.hasNext()) {
            String rowkey = (String)var5.next();
            Get get = (new Get(Bytes.toBytes(rowkey))).setMaxVersions();
            Iterator var8 = qualifiers.iterator();

            while(var8.hasNext()) {
                String qualifier = (String)var8.next();
                get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
            }

            gets.add(get);
        }

        return this.get((List)gets);
    }

    public Result get(String rowkey, String columnFamily, String qualifier) throws IOException {
        Set<String> qulifiers = new HashSet();
        if (qualifier != null) {
            qulifiers.add(qualifier);
        }

        return this.get((String)rowkey, columnFamily, (Set)qulifiers);
    }

    public Result get(String rowkey, String columnFamily, Set<String> qualifiers) throws IOException {
        Get get = (new Get(Bytes.toBytes(rowkey))).setMaxVersions();
        if (columnFamily != null) {
            if (qualifiers != null && !qualifiers.isEmpty()) {
                Iterator var5 = qualifiers.iterator();

                while(var5.hasNext()) {
                    String qualifier = (String)var5.next();
                    get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
                }
            } else {
                get.addFamily(Bytes.toBytes(columnFamily));
            }
        }

        return this.getTable().get(get);
    }

    public Result get(String rowkey, String columnFamily) throws IOException {
        return this.get(rowkey, columnFamily, (String)null);
    }

    public Result get(String rowkey, int versions) throws IOException {
        Get get = new Get(Bytes.toBytes(rowkey));
        if (versions > 0) {
            get.setMaxVersions(versions);
        } else {
            get.setMaxVersions();
        }

        return this.getTable().get(get);
    }

    public Result get(String rowkey) throws IOException {
        return this.get(rowkey, -1);
    }

    public ResultScanner scan(Scan scan) throws IOException {
        return this.getTable().getScanner(scan);
    }

    public static Map<String, List<String>> filterQualifier(Result result, String... qualifiersPrefix) {
        Map<String, List<String>> mapedQualifiers = new HashMap();
        if (qualifiersPrefix != null && qualifiersPrefix.length != 0) {
            if (result != null && !result.isEmpty()) {
                List<Cell> cells = result.listCells();
                if (cells != null) {
                    Iterator var4 = cells.iterator();

                    while(var4.hasNext()) {
                        Cell cell = (Cell)var4.next();
                        String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
                        String[] var7 = qualifiersPrefix;
                        int var8 = qualifiersPrefix.length;

                        for(int var9 = 0; var9 < var8; ++var9) {
                            String prefix = var7[var9];
                            if (qualifier.startsWith(prefix)) {
                                List<String> q = (List)mapedQualifiers.get(prefix);
                                if (q == null) {
                                    q = new ArrayList();
                                }

                                ((List)q).add(qualifier);
                                mapedQualifiers.put(prefix, q);
                            }
                        }
                    }
                }
            }

            return mapedQualifiers;
        } else {
            return mapedQualifiers;
        }
    }
}

其实就是在每一个partition上创建了一个Connection,1个partition肯定是在1个executor上的,所以这里创建Connection是没问题的。
相反的,如果多个partition共用1个Connection,反而是会出问题的,如果Partition A中数据较少,而Partition B中数据较多,A插入数据到Hbase中后,可能就会关闭连接,导致B还有数据没有插完。

注意错误:如果如下图所示,在每个partition上创建Connection就会出问题,partition A创建了一个connection后,b就不会再创建了。

  def getConnection(): Connection = {

    if (conf == null) {
      conf.set("hbase.zookeeper.quorum", "172.16.26.6:2181,172.16.26.10:2181,172.16.26.13:2181")
    }

    if ((connection == null || connection.isClosed()) && conf != null) {
      try {
        connection = ConnectionFactory.createConnection(conf)
      } catch {
        case e: Exception => e.printStackTrace()
      }
    }
    return connection;
  }

2.连接Hbase的正确做法

但也要具体问题具体分析

//所有进程共用一个Connection对象
connection=ConnectionFactory.createConnection(config);
        ...
//每个线程使用单独的Table对象
Table table = connection.getTable(TableName.valueOf("test"));
try {
    ...
} finally {
    table.close();
}

相关文章

网友评论

      本文标题:[Hbase] 如何正确连接Hbase?

      本文链接:https://www.haomeiwen.com/subject/vkfgectx.html