介绍
背景
- 普通的NN结构一般都是主备NN,当然其它的还有如联邦结构等,其实解决方案的核心还是为了给NN减负。为了确保 active NN挂掉后能快速的将主动权切换到standby NN,那么引入奇数JN集群来做保障,具体就是NN在每次写完磁盘后还得写JN。由于NN是一个支持百万级高并发的系统,因此NN是如何实现快速的刷写磁盘,值得一看。
- 原理性的东西就不多介绍了,请参考上面两位大佬的文章,此处不再重复造轮子
代码
- 核心的代码也就40多行真的非常的精彩
- 核心代码可看hadoop(2.7.2)源码FSEditLog类logSync方法
import java.util.LinkedList;
public class NNEditLog {
//事务ID:每个刷新的日志都有唯一的id
private long txid = 0L;
private DoubleBuffer editLogBuffer = new DoubleBuffer();
//是否正在刷写磁盘
private volatile Boolean isSyncRunning = false;
//是否正在等待同步
private volatile Boolean isWaitSync = false;
//正在等待同步中最大的线程ID
private volatile Long syncMaxTxid = 0L;
//每个线程都对应自己的一个副本
private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();
public void logEdit(String content) {
/**
* 1.加锁的目的就是为了事务ID的唯一,而且是递增
* 2.重要逻辑:若当前有线程 1,2,3,4,5进入到此方法
* 此时1先获取到锁,调起了logSync()方法
* 但是由于线程1 write(log)后就释放锁了(此过程很快)
* 那么其实在logSync()执行之前,2,3,4,5添加进了editLogBuffer
* 所以buffer 1 其实此时有 1.2.3.4.5 五个日志了
* 由于线程1调用了同步,因此 buffer2中也有1.2.3.4.5,其中syncMaxTxid=5
*/
synchronized (this) {
txid++;
localTxid.set(txid);
EditLog log = new EditLog(txid, content);
editLogBuffer.write(log);
}
logSync();
}
private void logSync() {
synchronized (this) {
//是否有人正在把数据同步到磁盘上面
if (isSyncRunning) {
long txid = localTxid.get();
/**
* 可能此时是线程2进来了,但此时syncMaxTxid是5
* 表示自己已经在同步list中了
*/
if (txid <= syncMaxTxid) {
return;
}
if (isWaitSync) {
return;
}
isWaitSync = true;
while (isSyncRunning) {
try {
wait(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
isWaitSync = false;
}
editLogBuffer.setReadyToSync();
//此处editLogBuffer.syncBuffer.size()不是1
if (editLogBuffer.syncBuffer.size() > 0) {
syncMaxTxid = editLogBuffer.getSyncMaxTxid();
}
isSyncRunning = true;
}
editLogBuffer.flush();
synchronized (this) {
isSyncRunning = false;
//释放锁
notify();
}
}
class EditLog {
long txid;
String content;
//构造函数
public EditLog(long txid, String content) {
this.txid = txid;
this.content = content;
}
//为了测试方便
@Override
public String toString() {
return "EditLog{" +
"txid=" + txid +
", content='" + content + '\'' +
'}';
}
}
/**
* 双缓存方案
*/
class DoubleBuffer {
//内存1
LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
//内存2
LinkedList<EditLog> syncBuffer = new LinkedList<EditLog>();
/**
* 把数据写到当前内存1
* @param log
*/
public void write(EditLog log) {
currentBuffer.add(log);
}
/**
* 交换内存
*/
public void setReadyToSync() {
LinkedList<EditLog> tmp = currentBuffer;
currentBuffer = syncBuffer;
syncBuffer = tmp;
}
/**
* 获取内存2里面的日志的最大的事务编号
* linklist是有序的,获取最后一个的id即为最大
*/
public Long getSyncMaxTxid() {
return syncBuffer.getLast().txid;
}
//刷写磁盘
public void flush() {
for (EditLog log : syncBuffer) {
//把数据写到磁盘上
System.out.println("存入磁盘日志信息:" + log);
}
//把内存2里面的数据要清空
syncBuffer.clear();
}
}
public static void main(String[] args) {
NNEditLog fs=new NNEditLog();
for (int i = 0; i < 1000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < 100; j++) {
fs.logEdit("namenode edit log ");
}
}
}).start();
}
}
}
网友评论