目录
- 前言
- 刷盘相关类介绍
- Broker刷盘源码分析
- CommitLog构造&属性赋值
- TransientStorePoolEnabled介绍
- 消息保存源码分析
- 消息刷盘入口方法源码分析
- 总结
前言
我们在学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个刷盘策略
- 同步刷盘
同步刷盘即Broker消息已经被持久化到硬盘后才会向客户端返回成功。同步刷盘的优点是能保证消息不丢失,但是这是以牺牲写入性能为代价的。
- 异步刷盘
异步刷盘是指Broker将信息存储到pagecache后就立即向客户端返回成功,然后会有一个异步线程定时将内存中的数据写入磁盘,默认时间间隔为500ms。
Broker中的刷盘策略是通过Broker配置文件中flushDiskType
进行配置,可以配置ASYNC_FLUSH
(异步刷盘)和SYNC_FLUSH
(同步刷盘),默认配置是ASYNC_FLUSH
。
Broker的刷盘采用基于JDK NIO技术,消息首先会存储到内存中,然后再根据不同的刷盘策略在不同时间刷盘,如果有不了解的小伙伴可以参考这篇文章《【NIO实战】深入理解FileChannel》
刷盘相关类介绍
CommitLog中的内部类FlushCommitLogService及其子类CommitRealTimeService、GroupCommitService、FlushRealTimeService分别是用于不同场景下用于刷盘的刷盘行为,他们会单独或者配合起来使用。具体类图如下所示。
如果是同步刷盘会使用GroupCommitService。如果是异步刷盘,并且关闭了堆外缓存(TransientStorePool),则采用FlushRealTimeService刷盘。如果是异步刷盘,并且开启了堆外缓存,则会使用FlushRealTimeService与CommitRealTimeService配合刷盘。
默认的输盘策略是异步且关闭堆外缓存,因此默认是采用FlushRealTime编程客栈Service进行刷盘
Broker刷盘源码分析
消息刷盘相关逻辑都是围绕在CommitLog,因此要想知道消息时如何刷盘的关键是研究CommitLog
CommitLog构造&属性赋值
CommitLog中与刷盘相关的属性有flushCommitLogService、commitLogService。如果是同步刷盘则在构造函数中会给flushCommitLogService赋值GroupCommitService,如果是异步刷盘则给flushCommitLogService赋值FlushRealTimeService。commitLogService的值是CommitRealTimeService,从上面我们可以很明显的看出它只有在异步且开启TransientStorePoolEnabled时才会被使用。
public class CommitLog { // 如果是同步刷盘,则是GroupCommitService。如果是异步刷盘则是FlushRealTimeService // 默认是异步刷盘,因此是CommitLog$FlushRealTimeService private final FlushCommitLogService flushCommitLogService; // 开启TransientStorePoolEnable时使用CommitRealTimeService private final FlushCommitLogService commitLogService; // 构造函数 public CommitLog(final DefaultMessageStore defaultMessageStore) { // 默认是异步刷盘,因此这里是false if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { this.flushCommitLogService = new GroupCommitService(); } else { this.flushCommitLogService = new FlushRealTimeService(); } this.commitLogService = new CommitRealTimeService(); // 消息回调 this.appendMessageCallback = new DefaultAppendMessageCallback(); flushDiskWatcher = new FlushDiskWatcher(); } }
TransientStorePoolEnabled介绍
transientStorePoolEnabled配置的默认值为false,开启transientStorePoolEnabled需要手动开启。如果开启transientStorePoolEnabled会开启堆外内存存储池,Broker在启动时会申请5个与CommitLog大小(1GB)相同的堆外内存交给TransientStorePool,创建MappedFile时会向TransientStorePool“借”一个堆外内存ByteBuffer,保存消息时会先将消息保存到堆外内存ByteBuffer中,然后在commit到MappedFile的FileChannel,最后再flush到硬盘中。TransientStorePool属性和一些核心方法源码如下,堆外内存ByteBuffer都是由它来管理。
// org.apache.rocketmq.store.TransientStorePool public class TransientStorePool { // 存储池大小,默认是5 private final int poolSize; // CommitLog MappedFile文件大小,默认1GB private final int fileSize; // 默认存5个ByteBuffer private final Deque<ByteBuffer> availableBuffers; // 消息存储配置 private final MessageStoreConfig storeConfig; // TransientStorePouSrPkbawol初始化 public void init() { // 默认是5 for (int i = 0; i < poolSize; i++) { // 分配1GB的直接内存 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize); final long address = ((DirectBuffer) byteBuffer).address(); Pointer pointer = new Pointer(address); LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize)); 开发者_JS开发 // 生成的缓存保存到队列中 availableBuffers.offer(byteBuffer); } } // 归还缓冲 p编程客栈ublic void returnBuffer(ByteBuffer byteBuffer) { // 修改position和limit,"清空"缓冲 byteBuffer.position(0); byteBuffer.limit(fileSize); // 缓冲入队 this.availableBuffers.offerFirst(byteBuffer); } // 向TransientStorePool借缓冲 public ByteBuffer borrowBuffer() { // 缓冲出队 ByteBuffer buffer = availableBuffers.pollFirst(); return buffer; } }
消息保存源码分析
前面文章《【RocketMQ | 源码分析】Broker是如何保存消息的? 》我们虽然介绍了消息的保存过程,但是开启或者关闭TransientStorePoolEnabled时,消息保存的细节是不同的,我们再打开消息保存MappedFile的源码如下,下面代码中如果writeBuffer不空,则会将消息先追加到writeBuffer,否者直接写入到MappedFile的内存映射文件中。
// org.apache.rocketmq.store.MappedFile#appendMessagesInner public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, PutMessageContext putMessageContext) { // 如果写文件位置小于文件size if (currentPos < this.fileSize) { // 如果writeBuffer不空,则获取writeBuffer的浅拷贝,否则获取MappedFile的内存映射(MappedByteBuffer)的浅拷贝 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; // 如果是单条消息 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件长度-当前写位置,可以写的长度*/, (MessageExtBrokerInner) messageExt, putMessageContext); } // ...如果是批量消息 return result; } }
那么什么情况下MappedFile中的writeBuffer为空,什么情况下writeBuffer不为空呢?我们可以先来了解MappedFile是如何创建的,MappedFile是由AllocateMappedFileService创建的,具体源码如下,如果开启了TransientStorePoolEnabled,则在创建MappedFile时会向TransientStorePool“借”一个ByteBuffer,如果没有开启TransientStorePoolEnabled,MappedFile中的writeBuffer是空,在保存数据时会将数据直接保存到MappedFile的直接内存映射(MappedByteBuffer)中。
private boolean mmapOperation() { // ... if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); // 初始化mappedFile会向TransientStorePool"借"一个writeBuffer mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } } else { // 创建MappedFile,没有writeBuffer mappedFile = new MappedFile(req.getFilePath(), req.getFileSize()); } // ... }
由上可知,消息保存如下图所示
消息刷盘入口方法源码分析
消息保存和刷盘的入口方法CommitLog#asyncPutMessage
,消息保存到mappedFile的缓存后,最后会调用submitFlushRequest方法提交刷盘请求,Broker会根据刷盘策略进行刷盘。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { //... 保存消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); // ... // 提交刷盘请求 CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); // 提交复制请求 CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg); // 合并提交刷盘请求和提交复制请求结果 return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } return putMessageResult; javascript }); }
提交了刷盘请求后,根据刷盘策略,是否开启堆外缓存,推送消息中是否要等待消息保存有如下四种刷盘方式
- 异步刷盘(关闭TransientStorePoolEnabled)
异步刷盘(关闭TransientStorePoolEnabled)是默认的刷盘方案,这个刷盘方案先会**异步唤醒(wakeup)**FlushRealTimeService,然后直接返回消息保存成功。由于关闭了TransientStorePoolEnabled,消息是保存到MappedFile中的内存映射文件MappedByteBuffer,FlushRealTimeService将定时MappedByteBuffer刷到磁盘。
- 异步刷盘(开启TransientStorePoolEnabled)
异步刷盘(开启TransientStorePoolEnabled)会先**异步唤醒(wakeup)**CommitRealTimeService,然后直接返回消息保存成功。由于开启了TransientStorePoolEnabled,消息会保存到MappedFile中的内存映射文件ByteBuffer,CommitRealTimeService定时将ByteBuffer中的数据刷到FileChannel中。
- 同步刷盘(等待消息保存)
同步刷盘(等待消息保存)会先创建一个刷盘请求(GroupCommitRequest),然后向GroupCommitService提交刷盘请求,最后等待刷盘结果并返回
- 同步刷盘(不等待消息保存)
同步刷盘(不等待消息保存)也是通过GroupCommitService刷盘,与等待消息保存不同的是不等待的方式异步唤醒(wakeup)GroupCommitService后,直接返回消息保存成功。
四种刷盘方式源码如下所示
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // 同步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 获取同步刷盘Service final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { // 创建GroupCommitRequest 刷盘偏移量nextOffset = 当前写入偏移量 + 当前消息写入大小 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 向刷盘监视器(flushDistWatch)提交刷盘请求 flushDiskWatcher.add(request); // 提交刷盘请求,并且唤醒同步刷盘线程 service.putRequest(request); return request.future(); } else { // 同步刷盘,但是不需要等待刷盘结果,那么唤醒同步刷盘线程 service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // 异步刷盘 else { // 是否启动了堆外缓存 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 如果没有启动堆外缓存,则唤醒异步刷盘服务 flushRealTimeService flushCommitLogService.wakeup(); } else { // 如果启动了堆外缓存,则唤醒异步转存服务CommitRealTimeService commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }
将上面四种场景及调用关系如下图所示
总结javascript
本篇文章介绍了TransientStorePool机制以及开启和管理队消息保存的影响,我们还介绍了RocketMQ中四种刷盘策略
- 同步刷盘-等待消息保存到磁盘
- 同步刷盘-不等待消息保存到磁盘上
- 异步刷盘-开启堆外缓存
- 异步刷盘-不开启堆外缓存
以上就是RocketMQ Broker消息如何刷盘源码解析的详细内容,更多关于RocketMQ Broker消息刷盘的资料请关注我们其它相关文章!
精彩评论