说明
对于 commitlog、 consumequeue、 index 三类大文件进行磁盘读写操作,均是通过 MappedFile 类来完成。
MappedFile与MappedFileQueue关系如下

主要的方法,作用如下
init:初始化文件名,大小,以及writeBuffer
cleanup:实现父类函数,用于处理directByteBuffer相关的资源回收
destroy:关闭FileChannel,删除文件
appendMessagesInner:追加消息,调用AppendMessageCallback#doAppend完成
flush,isAbleToFlush: flush日志
commit,commit0,isAbleToCommit: commit日志
selectMappedBuffer:完成ByteBuffer随机读取
下面对源码进行分析
属性
public static final int OS_PAGE_SIZE = 1024 * 4;
protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);//占用内存大小
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);//文件数
protected final AtomicInteger wrotePosition = new AtomicInteger(0);//写到的位置,当值==fileSize代表文件写满了
//ADD BY ChenYang
/**
* 当writeBuffer为空时,就不会有真正的commit行为
* wrotePosition意义上就代表了committedPosition,真正的committedPosition值只会为初始值0,参考commit函数
*/
protected final AtomicInteger committedPosition = new AtomicInteger(0);
//刷盘刷到的位置
private final AtomicInteger flushedPosition = new AtomicInteger(0);
protected int fileSize;//mappedFile文件大小,参照MessageStoreConfig.mapedFileSizeCommitLog,默认1G
protected FileChannel fileChannel;
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;//从transientStorePool的池中拿到ByteBuffer,可能为null
protected TransientStorePool transientStorePool = null;//池对象,init函数中赋值
private String fileName;
private long fileFromOffset;//fileFromOffset就是文件名称,一般为20位数字,代表这个文件开始时的offset
private File file;
private MappedByteBuffer mappedByteBuffer;//内存映射文件
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;
函数
构造,初始化函数
public MappedFile() {
}
public MappedFile(final String fileName, final int fileSize) throws IOException {
init(fileName, fileSize);
}
public MappedFile(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize, transientStorePool);
}
/**
* 初始化文件名,大小
* 设置transientStorePool
* 设置writeBuffer,从transientStorePool借一个出来,可能为null
*/
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();//从复用池中获取ByteBuffer,可能为null
this.transientStorePool = transientStorePool;
}
/**
* 初始化文件名,大小
* 设置fileFromOffset代表文件对应的偏移量
* 得到fileChannel,mappedByteBuffer 得到IO相关对象
* 计数TOTAL_MAPPED_VIRTUAL_MEMORY,TOTAL_MAPPED_FILES更新
*/
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());//fileFromOffset就是文件名称,一般为20位数字
boolean ok = false;
ensureDirOK(this.file.getParent());//确保父目录是存在的
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);//内存映射文件MappedByteBuffer对象
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("create file channel " + this.fileName + " Failed. ", e);
throw e;
} catch (IOException e) {
log.error("map file " + this.fileName + " Failed. ", e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
//看指定目录是否存在,否则创建一个
public static void ensureDirOK(final String dirName) {
if (dirName != null) {
File f = new File(dirName);
if (!f.exists()) {
boolean result = f.mkdirs();
log.info(dirName + " mkdir " + (result ? "OK" : "Failed"));
}
}
}
clean相关函数
/**
* 清理,监视是否可用,是否已经清理过,条件适当进行清理
* 更新内存记录以及文件数记录
* @param currentRef
* @return
*/
@Override
public boolean cleanup(final long currentRef) {
if (this.isAvailable()) {//cleanup时,不允许该mappedFile可用
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have not shutdown, stop unmapping.");
return false;
}
if (this.isCleanupOver()) {//如果已经clean完了就不用再clean了
log.error("this file[REF:" + currentRef + "] " + this.fileName
+ " have cleanup, do not do it again.");
return true;
}
clean(this.mappedByteBuffer);//清理mappedByteBuffer
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));//占用内存减少
TOTAL_MAPPED_FILES.decrementAndGet();//文件数减少
log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
return true;
}
/**
* 清理mappedByteBuffer
* 如果是directByteBuffer且容量不为0
* 则获取attachment转成directByteBuffer,调用其clearner.clean方法
*/
public static void clean(final ByteBuffer buffer) {
if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
return;
//下面的逻辑一定是directByteBuffer的
/**
* 下面这段是
* 嵌套拿到directByteBuffer的最内部的attachment,强制转换成ByteBuffer对象(实际运行应该会是directByteBuffer)
* 然后获得directByteBuffer的Cleaner对象 cleaner
* 然后调用cleaner.clean方法,进行深度的释放资源
*/
invoke(invoke(viewed(buffer), "cleaner"), "clean");
}
/**
* 嵌套调用,获取最深层的attachment 或者 viewedBuffer方法
* 转化为ByteBuffer对象
*/
private static ByteBuffer viewed(ByteBuffer buffer) {
String methodName = "viewedBuffer";
Method[] methods = buffer.getClass().getMethods();
for (int i = 0; i < methods.length; i++) {
if (methods[i].getName().equals("attachment")) {
methodName = "attachment";
break;
}
}
ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
//执行DirectByteBuffer.attachment()方法
if (viewedBuffer == null)
return buffer;
else
return viewed(viewedBuffer);
}
//java反射,执行某个object的某方法,把所有访问级别都设置为Accessible再执行
private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
return AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
Method method = method(target, methodName, args);
method.setAccessible(true);
return method.invoke(target);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
});
}
//根据object,参数类型列表以及函数名得到Method对象
private static Method method(Object target, String methodName, Class<?>[] args)
throws NoSuchMethodException {
try {
return target.getClass().getMethod(methodName, args);
} catch (NoSuchMethodException e) {
return target.getClass().getDeclaredMethod(methodName, args);
}
}
/**
* 参数intervalForcibly代表距离上一次shutdown至少要这么长时间
* 调用shutdown
* 如果引用清理干净了删除文件,打log
* 否则false
*/
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
//如果引用清理干净了删除文件,打log
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeEclipseTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
追加消息相关函数
追加MessageExt消息
//插入MessageExtBrokerInner消息
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
return appendMessagesInner(msg, cb);
}
//插入MessageExtBatch消息
public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) {
return appendMessagesInner(messageExtBatch, cb);
}
/**
* append消息,处理MessageExt消息
* 只要文件还有剩余空间,调用AppendMessageCallback#doAppend方法完成底层append逻辑
* 更新写的位置以及存储时间
* 返回AppendMessageResult
*/
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize) {//文件还有剩余空间
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);//标记当前消息的position
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner) {//追加MessageExtBrokerInner消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
} else if (messageExt instanceof MessageExtBatch) {//追加MessageExtBatch消息
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
} else {
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
this.wrotePosition.addAndGet(result.getWroteBytes());//更新写的位置
this.storeTimestamp = result.getStoreTimestamp();//更新存储时间
return result;
}
log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
追加byte[]内容
/**
* 空间足够的话,将data[]直接写入fileChannel
* 更新wrotePosition
*/
public boolean appendMessage(final byte[] data) {
int currentPos = this.wrotePosition.get();
if ((currentPos + data.length) <= this.fileSize) {//空间足够
try {
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data));
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
this.wrotePosition.addAndGet(data.length);
return true;
}
return false;
}
/**
* 空间足够的话,将data[]的一部分写入fileChannel
* Content of data from offset to offset + length will be wrote to file.
*
* @param offset The offset of the subarray to be used.
* @param length The length of the subarray to be used.
*/
public boolean appendMessage(final byte[] data, final int offset, final int length) {
int currentPos = this.wrotePosition.get();
if ((currentPos + length) <= this.fileSize) {
try {
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data, offset, length));
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
this.wrotePosition.addAndGet(length);
return true;
}
return false;
}
flush 与 commit相关
/**
* @return The current flushed position
*/
public int flush(final int flushLeastPages) {
if (this.isAbleToFlush(flushLeastPages)) {//是否可以刷
if (this.hold()) {//引用+1
int value = getReadPosition();//获得可以读到的最大位置
try {
//We only append data to fileChannel or mappedByteBuffer, never both.
//从缓存刷到磁盘
if (writeBuffer != null || this.fileChannel.position() != 0) {
this.fileChannel.force(false);
} else {
this.mappedByteBuffer.force();
}
} catch (Throwable e) {
log.error("Error occurred when force data to disk.", e);
}
this.flushedPosition.set(value);//更新刷到的位置
this.release();//释放引用
} else {
log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
this.flushedPosition.set(getReadPosition());
}
}
return this.getFlushedPosition();
}
/**
* 提交:writeBuffer写入FileChannel
* 1.当writeBuffer为null,直接返回
* 2.否则将writeBuffer中[lastCommittedPosition,writePos)的部分 写入fileChannel
* 3.然后对writeBuffer归还给transientStorePool,返回committedPosition
*/
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {//是否可以提交
if (this.hold()) {//引用+1
commit0(commitLeastPages);//进行提交
this.release();//释放已用
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);//还给复用池
this.writeBuffer = null;
}
return this.committedPosition.get();
}
/**
* commit实现,持久化到日志
* 将writeBuffer中[lastCommittedPosition,writePos)的部分 写入fileChannel
* 更新committedPosition
* 参数 commitLeastPages 没用
*/
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);//持久化
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
/**
* 是否能够flush。满足如下条件任意条件:
* 1. 文件已经写满
* 2. flushLeastPages > 0 && 未flush部分超过flushLeastPages
* 3. flushLeastPages = 0 && 有新写入部分
*
* @param flushLeastPages flush最小分页
* @return 是否能够写入
*/
private boolean isAbleToFlush(final int flushLeastPages) {
int flush = this.flushedPosition.get();
int write = getReadPosition();//最后有效的写到的位置
if (this.isFull()) {
return true;
}
if (flushLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush;
}
/**
* 是否能够commit。满足如下条件任意条件:
* 1. 映射文件已经写满
* 2. commitLeastPages > 0 && 未commit部分超过commitLeastPages
* 3. commitLeastPages = 0 && 有新写入部分
*
* @param commitLeastPages commit的最小页数
*/
protected boolean isAbleToCommit(final int commitLeastPages) {
int flush = this.committedPosition.get();
int write = this.wrotePosition.get();
if (this.isFull()) {
return true;
}
if (commitLeastPages > 0) {
return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
}
return write > flush;
}
//如果写的位置和要求文件大小一样,则写满了
public boolean isFull() {
return this.fileSize == this.wrotePosition.get();
}
/**
* @return The max position which have valid data
* 返回的是可以读到的最大位置(就是实际写到的位置)
* 函数名字容易让人误解
*/
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
随机读取相关函数
//返回从pos到 pos + size的内存映射
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
int readPosition = getReadPosition();//获取当前有效数据的最大位置
if ((pos + size) <= readPosition) {//返回的数据 必须是有效的
if (this.hold()) {//引用+1
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();//切片,复制一个byteBuffer(与原byteBuffer共享数据, 只是指针位置独立)
byteBuffer.position(pos);
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);//设置结束位置,byteBufferNew的[0,size) 与 原有的mappedByteBuffer的[pos,pos+size)相同
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);//绝对offset,
} else {
log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
+ this.fileFromOffset);
}
} else {
log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
+ ", fileFromOffset: " + this.fileFromOffset);
}
return null;
}
//返回从pos到最大有效位置的所有数据
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {//返回的数据必须是有效的
if (this.hold()) {//引用+1
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;//剩下的都返回
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
其他函数
get set相关函数
warmMappedFile,用于预热mappedByteBuffer的IO的
还有mlock和munlock用LibC库的,没有深入研究
思考
flush和commit的关系
注意两个是都会执行的,先commit再flush,不会只执行一个函数
对应CommitLog里面的不同的内部类
flush对应FlushRealTimeService
commit对应CommitRealTimeService
refer中的博客是这样介绍的

我觉得上图的表格有点不准确,应该是这样
步骤1:appendMessagesInner | 步骤2:commit | 步骤3:flush | |
---|---|---|---|
方式1:writeBuffer为null | 写入writeBuffer | writeBuffer写入channel | fileChannel.force |
方式2:writeBuffer非null | 写入mappedByteBuffer | 基本没干事情 | mappedByteBuffer.force() |
wrotePosition和committedPosition关系是什么
当writeBuffer为空时,就不会有真正的commit行为
此时,wrotePosition意义上就代表了committedPosition,真正的committedPosition值只会为初始值0,参考commit函数
isAbleToFlush和isAbleToCommit函数的区别
区别上,两者的int write获取方式不容
int write = getReadPosition();//前者
int write = this.wrotePosition.get();//后者
后者被调用的时候,writeBuffer不为null,因此不能调用getReadPosition,否则write和flush的值会一直一样
writeBuffer什么时候用,mappedByteBuffer什么时候用
见flush函数和appendMessagesInner函数
writeBuffer能用就用writeBuffer,否则就用mappedByteBuffer
fileSize就一定是文件大小吗
类中这个属性只是一个限制,不代表创建一个mappedFile,它立刻占有这么大空间
只不过随着数据写入,占有空间最大允许达到fileSize这个值
问题
flush函数hold失败也会设置新的flushedPosition
为什么,这和hold成功最终的返回值不就一样了吗?
init函数中,只要FileChannel或者mappetByteBuffer写入就能完成文件的创建吗
一般写法可能就是file.create()了,这里我不是很清楚IO操作,感觉上是FileChannel或者MappedByteBuffer写入就能完成文件创建
destroy函数为什么要intervalForcibly配置
这个干吗用的,为什么两次destroy之间还有相隔这么久才有效,有什么好处吗
refer:
https://github.com/YunaiV/Blog/blob/master/RocketMQ/1004-RocketMQ%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%EF%BC%9AMessage%E5%AD%98%E5%82%A8.md
http://blog.csdn.net/KilluaZoldyck/article/details/76775397 mappedFile作用
http://www.jianshu.com/p/f9052d9b637c mappedFile与mappedFileQueue
http://www.jianshu.com/p/6494e33c9b1f rocketmq中内存映射与零拷贝
网友评论