第8章 Hbase实战之谷粒微博
8.1 需求分析
- 微博内容的浏览,数据库表设计
- 用户社交体现:关注用户,取关用户
- 拉取关注的人的微博内容
8.2 代码实现
8.2.1 代码设计总览: - 创建命名空间以及表名的定义
- 创建微博内容表
- 创建用户关系表
- 创建用户微博内容接收邮件表
- 发布微博内容
- 添加关注用户
- 移除(取关)用户
- 获取关注的人的微博内容
- 测试
8.2.2 创建命名空间以及表名的定义
//获取配置conf
private Configuration conf = HBaseConfiguration.create();
//微博内容表的表名
private static final byte[] TABLE_CONTENT = Bytes.toBytes("weibo:content");
//用户关系表的表名
private static final byte[] TABLE_RELATIONS = Bytes.toBytes("weibo:relations");
//微博收件箱表的表名
private static final byte[] TABLE_RECEIVE_CONTENT_EMAIL = Bytes.toBytes("weibo:receive_content_email");
public void initNamespace(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//命名空间类似于关系型数据库中的schema,可以想象成文件夹
NamespaceDescriptor weibo = NamespaceDescriptor
.create("weibo")
.addConfiguration("creator", "Jinji")
.addConfiguration("create_time", System.currentTimeMillis() + "")
.build();
admin.createNamespace(weibo);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.3 创建微博内容表
表结构:
方法名 creatTableeContent
Table Name weibo:content
RowKey 用户ID_时间戳
ColumnFamily info
ColumnLabel 标题,内容,图片
Version 1个版本
代码:
/**
-
创建微博内容表
-
Table Name:weibo:content
-
RowKey:用户ID_时间戳
-
ColumnFamily:info
-
ColumnLabel:标题 内容 图片URL
-
Version:1个版本
*/
public void createTableContent(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
//创建表表述
HTableDescriptor content = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
//创建列族描述
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));
//设置块缓存
info.setBlockCacheEnabled(true);
//设置块缓存大小
info.setBlocksize(2097152);
//设置压缩方式
// info.setCompressionType(Algorithm.SNAPPY);
//设置版本确界
info.setMaxVersions(1);
info.setMinVersions(1);content.addFamily(info); admin.createTable(content);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.4 创建用户关系表
表结构:
方法名 createTableRelations
Table Name weibo:relations
RowKey 用户ID
ColumnFamily attends、fans
ColumnLabel 关注用户ID,粉丝用户ID
ColumnValue 用户ID
Version 1个版本
代码:
/** -
用户关系表
-
Table Name:weibo:relations
-
RowKey:用户ID
-
ColumnFamily:attends,fans
-
ColumnLabel:关注用户ID,粉丝用户ID
-
ColumnValue:用户ID
-
Version:1个版本
*/
public void createTableRelations(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor relations = new HTableDescriptor(TableName.valueOf(TABLE_RELATIONS));//关注的人的列族 HColumnDescriptor attends = new HColumnDescriptor(Bytes.toBytes("attends")); //设置块缓存 attends.setBlockCacheEnabled(true); //设置块缓存大小 attends.setBlocksize(2097152); //设置压缩方式
// info.setCompressionType(Algorithm.SNAPPY);
//设置版本确界
attends.setMaxVersions(1);
attends.setMinVersions(1);
//粉丝列族
HColumnDescriptor fans = new HColumnDescriptor(Bytes.toBytes("fans"));
fans.setBlockCacheEnabled(true);
fans.setBlocksize(2097152);
fans.setMaxVersions(1);
fans.setMinVersions(1);
relations.addFamily(attends);
relations.addFamily(fans);
admin.createTable(relations);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.5 创建微博收件箱表
表结构:
方法名 createTableReceiveContentEmails
Table Name weibo:receive_content_email
RowKey 用户ID
ColumnFamily info
ColumnLabel 用户ID
ColumnValue 取微博内容的RowKey
Version 1000
代码:
/**
-
创建微博收件箱表
-
Table Name: weibo:receive_content_email
-
RowKey:用户ID
-
ColumnFamily:info
-
ColumnLabel:用户ID-发布微博的人的用户ID
-
ColumnValue:关注的人的微博的RowKey
-
Version:1000
*/
public void createTableReceiveContentEmail(){
HBaseAdmin admin = null;
try {
admin = new HBaseAdmin(conf);
HTableDescriptor receive_content_email = new HTableDescriptor(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL));
HColumnDescriptor info = new HColumnDescriptor(Bytes.toBytes("info"));info.setBlockCacheEnabled(true); info.setBlocksize(2097152); info.setMaxVersions(1000); info.setMinVersions(1000); receive_content_email.addFamily(info);; admin.createTable(receive_content_email);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != admin){
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.6 发布微博内容
a、微博内容表中添加1条数据
b、微博收件箱表对所有粉丝用户添加数据
代码:Message.java
package com.atguigu.weibo;
public class Message {
private String uid;
private String timestamp;
private String content;
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Message [uid=" + uid + ", timestamp=" + timestamp + ", content=" + content + "]";
}
}
代码:public void publishContent(String uid, String content)
/**
-
发布微博
-
a、微博内容表中数据+1
-
b、向微博收件箱表中加入微博的Rowkey
*/
public void publishContent(String uid, String content){
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//a、微博内容表中添加1条数据,首先获取微博内容表描述
HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT));
//组装Rowkey
long timestamp = System.currentTimeMillis();
String rowKey = uid + "_" + timestamp;Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes("info"), Bytes.toBytes("content"), timestamp, Bytes.toBytes(content)); contentTBL.put(put); //b、向微博收件箱表中加入发布的Rowkey //b.1、查询用户关系表,得到当前用户有哪些粉丝 HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS)); //b.2、取出目标数据 Get get = new Get(Bytes.toBytes(uid)); get.addFamily(Bytes.toBytes("fans")); Result result = relationsTBL.get(get); List<byte[]> fans = new ArrayList<byte[]>(); //遍历取出当前发布微博的用户的所有粉丝数据 for(Cell cell : result.rawCells()){ fans.add(CellUtil.cloneQualifier(cell)); } //如果该用户没有粉丝,则直接return if(fans.size() <= 0) return; //开始操作收件箱表 HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL)); List<Put> puts = new ArrayList<Put>(); for(byte[] fan : fans){ Put fanPut = new Put(fan); fanPut.add(Bytes.toBytes("info"), Bytes.toBytes(uid), timestamp, Bytes.toBytes(rowKey)); puts.add(fanPut); } recTBL.put(puts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
8.2.7 添加关注用户
a、在微博用户关系表中,对当前主动操作的用户添加新关注的好友
b、在微博用户关系表中,对被关注的用户添加新的粉丝
c、微博收件箱表中添加所关注的用户发布的微博
代码实现:public void addAttends(String uid, String... attends)
/** -
关注用户逻辑
-
a、在微博用户关系表中,对当前主动操作的用户添加新的关注的好友
-
b、在微博用户关系表中,对被关注的用户添加粉丝(当前操作的用户)
-
c、当前操作用户的微博收件箱添加所关注的用户发布的微博rowkey
*/
public void addAttends(String uid, String... attends){
//参数过滤
if(attends == null || attends.length <= 0 || uid == null || uid.length() <= 0){
return;
}
HConnection connection = null;
try {
connection = HConnectionManager.createConnection(conf);
//用户关系表操作对象(连接到用户关系表)
HTableInterface relationsTBL = connection.getTable(TableName.valueOf(TABLE_RELATIONS));
List<Put> puts = new ArrayList<Put>();
//a、在微博用户关系表中,添加新关注的好友
Put attendPut = new Put(Bytes.toBytes(uid));
for(String attend : attends){
//为当前用户添加关注的人
attendPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
//b、为被关注的人,添加粉丝
Put fansPut = new Put(Bytes.toBytes(attend));
fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
//将所有关注的人一个一个的添加到puts(List)集合中
puts.add(fansPut);
}
puts.add(attendPut);
relationsTBL.put(puts);//c.1、微博收件箱添加关注的用户发布的微博内容(content)的rowkey HTableInterface contentTBL = connection.getTable(TableName.valueOf(TABLE_CONTENT)); Scan scan = new Scan(); //用于存放取出来的关注的人所发布的微博的rowkey List<byte[]> rowkeys = new ArrayList<byte[]>(); for(String attend : attends){ //过滤扫描rowkey,即:前置位匹配被关注的人的uid_ RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_")); //为扫描对象指定过滤规则 scan.setFilter(filter); //通过扫描对象得到scanner ResultScanner result = contentTBL.getScanner(scan); //迭代器遍历扫描出来的结果集 Iterator<Result> iterator = result.iterator(); while(iterator.hasNext()){ //取出每一个符合扫描结果的那一行数据 Result r = iterator.next(); for(Cell cell : r.rawCells()){ //将得到的rowkey放置于集合容器中 rowkeys.add(CellUtil.cloneRow(cell)); } } } //c.2、将取出的微博rowkey放置于当前操作用户的收件箱中 if(rowkeys.size() <= 0) return; //得到微博收件箱表的操作对象 HTableInterface recTBL = connection.getTable(TableName.valueOf(TABLE_RECEIVE_CONTENT_EMAIL)); //用于存放多个关注的用户的发布的多条微博rowkey信息 List<Put> recPuts = new ArrayList<Put>(); for(byte[] rk : rowkeys){ Put put = new Put(Bytes.toBytes(uid)); //uid_timestamp String rowKey = Bytes.toString(rk); //借取uid String attendUID = rowKey.substring(0, rowKey.indexOf("_")); long timestamp = Long.parseLong(rowKey.substring(rowKey.indexOf("_") + 1)); //将微博rowkey添加到指定单元格中 put.add(Bytes.toBytes("info"), Bytes.toBytes(attendUID), timestamp, rk); recPuts.add(put); } recTBL.put(recPuts);
} catch (IOException e) {
e.printStackTrace();
}finally{
if(null != connection){
try {
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。
网友评论