开发者

RocketMQ Broker消息如何刷盘源码解析

开发者 https://www.devze.com 2023-05-10 10:24 出处:网络 作者: 林师傅
目录前言刷盘相关类介绍Broker刷盘源码分析CommitLog构造&属性赋值TransientStorePoolEnabled介绍消息保存源码分析消息刷盘入口方法源码分析总结前言
目录
  • 前言
  • 刷盘相关类介绍
  • Broker刷盘源码分析
    • CommitLog构造&属性赋值
    • TransientStorePoolEnabled介绍
    • 消息保存源码分析
    • 消息刷盘入口方法源码分析
  • 总结

    前言

    我们在学习RocketMQ的时候,我们知道RocketMQ的刷盘策略有两个刷盘策略

    • 同步刷盘

    同步刷盘即Broker消息已经被持久化到硬盘后才会向客户端返回成功。同步刷盘的优点是能保证消息不丢失,但是这是以牺牲写入性能为代价的。

    • 异步刷盘

    异步刷盘是指Broker将信息存储到pagecache后就立即向客户端返回成功,然后会有一个异步线程定时将内存中的数据写入磁盘,默认时间间隔为500ms。

    Broker中的刷盘策略是通过Broker配置文件中flushDiskType进行配置,可以配置ASYNC_FLUSH(异步刷盘)和SYNC_FLUSH(同步刷盘),默认配置是ASYNC_FLUSH

    Broker的刷盘采用基于JDK NIO技术,消息首先会存储到内存中,然后再根据不同的刷盘策略在不同时间刷盘,如果有不了解的小伙伴可以参考这篇文章《【NIO实战】深入理解FileChannel》

    刷盘相关类介绍

    CommitLog中的内部类FlushCommitLogService及其子类CommitRealTimeService、GroupCommitService、FlushRealTimeService分别是用于不同场景下用于刷盘的刷盘行为,他们会单独或者配合起来使用。具体类图如下所示。

    RocketMQ Broker消息如何刷盘源码解析

    如果是同步刷盘会使用GroupCommitService。如果是异步刷盘,并且关闭了堆外缓存(TransientStorePool),则采用FlushRealTimeService刷盘。如果是异步刷盘,并且开启了堆外缓存,则会使用FlushRealTimeService与CommitRealTimeService配合刷盘。

    默认的输盘策略是异步关闭堆外缓存,因此默认是采用FlushRealTime编程客栈Service进行刷盘

    RocketMQ Broker消息如何刷盘源码解析

    Broker刷盘源码分析

    消息刷盘相关逻辑都是围绕在CommitLog,因此要想知道消息时如何刷盘的关键是研究CommitLog

    CommitLog构造&属性赋值

    CommitLog中与刷盘相关的属性有flushCommitLogService、commitLogService。如果是同步刷盘则在构造函数中会给flushCommitLogService赋值GroupCommitService,如果是异步刷盘则给flushCommitLogService赋值FlushRealTimeService。commitLogService的值是CommitRealTimeService,从上面我们可以很明显的看出它只有在异步且开启TransientStorePoolEnabled时才会被使用。

    public class CommitLog {
      // 如果是同步刷盘,则是GroupCommitService。如果是异步刷盘则是FlushRealTimeService
      // 默认是异步刷盘,因此是CommitLog$FlushRealTimeService
      private final FlushCommitLogService flushCommitLogService;
      // 开启TransientStorePoolEnable时使用CommitRealTimeService
      private final FlushCommitLogService commitLogService;
    	// 构造函数
      public CommitLog(final DefaultMessageStore defaultMessageStore) {
          // 默认是异步刷盘,因此这里是false
          if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
              this.flushCommitLogService = new GroupCommitService();
          } else {
              this.flushCommitLogService = new FlushRealTimeService();
          }
          this.commitLogService = new CommitRealTimeService();
          // 消息回调
          this.appendMessageCallback = new DefaultAppendMessageCallback();
          flushDiskWatcher = new FlushDiskWatcher();
      }
    }
    

    TransientStorePoolEnabled介绍

    transientStorePoolEnabled配置的默认值为false,开启transientStorePoolEnabled需要手动开启。如果开启transientStorePoolEnabled会开启堆外内存存储池,Broker在启动时会申请5个与CommitLog大小(1GB)相同的堆外内存交给TransientStorePool,创建MappedFile时会向TransientStorePool“借”一个堆外内存ByteBuffer,保存消息时会先将消息保存到堆外内存ByteBuffer中,然后在commit到MappedFile的FileChannel,最后再flush到硬盘中。TransientStorePool属性和一些核心方法源码如下,堆外内存ByteBuffer都是由它来管理。

    // org.apache.rocketmq.store.TransientStorePool
    public class TransientStorePool {
        // 存储池大小,默认是5
        private final int poolSize;
        // CommitLog MappedFile文件大小,默认1GB
        private final int fileSize;
        // 默认存5个ByteBuffer
        private final Deque<ByteBuffer> availableBuffers;
        // 消息存储配置
        private final MessageStoreConfig storeConfig;
    		// TransientStorePouSrPkbawol初始化
        public void init() {
            // 默认是5
            for (int i = 0; i < poolSize; i++) {
                // 分配1GB的直接内存
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
                final long address = ((DirectBuffer) byteBuffer).address();
                Pointer pointer = new Pointer(address);
                LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
          开发者_JS开发      // 生成的缓存保存到队列中
                availableBuffers.offer(byteBuffer);
            }
        }
        // 归还缓冲
        p编程客栈ublic void returnBuffer(ByteBuffer byteBuffer) {
            // 修改position和limit,"清空"缓冲
            byteBuffer.position(0);
            byteBuffer.limit(fileSize);
          	// 缓冲入队
            this.availableBuffers.offerFirst(byteBuffer);
        }
        // 向TransientStorePool借缓冲
        public ByteBuffer borrowBuffer() {
          	// 缓冲出队
            ByteBuffer buffer = availableBuffers.pollFirst();
            return buffer;
        }
    }
    

    消息保存源码分析

    前面文章《【RocketMQ | 源码分析】Broker是如何保存消息的? 》我们虽然介绍了消息的保存过程,但是开启或者关闭TransientStorePoolEnabled时,消息保存的细节是不同的,我们再打开消息保存MappedFile的源码如下,下面代码中如果writeBuffer不空,则会将消息先追加到writeBuffer,否者直接写入到MappedFile的内存映射文件中。

    // org.apache.rocketmq.store.MappedFile#appendMessagesInner
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
            PutMessageContext putMessageContext) {
        // 如果写文件位置小于文件size
        if (currentPos < this.fileSize) {
            // 如果writeBuffer不空,则获取writeBuffer的浅拷贝,否则获取MappedFile的内存映射(MappedByteBuffer)的浅拷贝
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            // 如果是单条消息
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件长度-当前写位置,可以写的长度*/,
                        (MessageExtBrokerInner) messageExt, putMessageContext);
            } // ...如果是批量消息
            return result;
        }
    }
    

    那么什么情况下MappedFile中的writeBuffer为空,什么情况下writeBuffer不为空呢?我们可以先来了解MappedFile是如何创建的,MappedFile是由AllocateMappedFileService创建的,具体源码如下,如果开启了TransientStorePoolEnabled,则在创建MappedFile时会向TransientStorePool“借”一个ByteBuffer,如果没有开启TransientStorePoolEnabled,MappedFile中的writeBuffer是空,在保存数据时会将数据直接保存到MappedFile的直接内存映射(MappedByteBuffer)中。

    private boolean mmapOperation() {
      // ...
      if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
          try {
              mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
            	// 初始化mappedFile会向TransientStorePool"借"一个writeBuffer
              mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
          } catch (RuntimeException e) {
              mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
          }
      } else {
        	// 创建MappedFile,没有writeBuffer
          mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
      }
      // ...
    }
    

    由上可知,消息保存如下图所示

    RocketMQ Broker消息如何刷盘源码解析

    消息刷盘入口方法源码分析

    消息保存和刷盘的入口方法CommitLog#asyncPutMessage,消息保存到mappedFile的缓存后,最后会调用submitFlushRequest方法提交刷盘请求,Broker会根据刷盘策略进行刷盘。

    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        //... 保存消息
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        // ...
        // 提交刷盘请求
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        // 提交复制请求
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
        // 合并提交刷盘请求和提交复制请求结果
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
            }
            return putMessageResult;
       javascript });
    }
    

    提交了刷盘请求后,根据刷盘策略,是否开启堆外缓存,推送消息中是否要等待消息保存有如下四种刷盘方式

    • 异步刷盘(关闭TransientStorePoolEnabled)

    异步刷盘(关闭TransientStorePoolEnabled)是默认的刷盘方案,这个刷盘方案先会**异步唤醒(wakeup)**FlushRealTimeService,然后直接返回消息保存成功。由于关闭了TransientStorePoolEnabled,消息是保存到MappedFile中的内存映射文件MappedByteBuffer,FlushRealTimeService将定时MappedByteBuffer刷到磁盘。

    • 异步刷盘(开启TransientStorePoolEnabled)

    异步刷盘(开启TransientStorePoolEnabled)会先**异步唤醒(wakeup)**CommitRealTimeService,然后直接返回消息保存成功。由于开启了TransientStorePoolEnabled,消息会保存到MappedFile中的内存映射文件ByteBuffer,CommitRealTimeService定时将ByteBuffer中的数据刷到FileChannel中。

    • 同步刷盘(等待消息保存)

    同步刷盘(等待消息保存)会先创建一个刷盘请求(GroupCommitRequest),然后向GroupCommitService提交刷盘请求,最后等待刷盘结果并返回

    • 同步刷盘(不等待消息保存)

    同步刷盘(不等待消息保存)也是通过GroupCommitService刷盘,与等待消息保存不同的是不等待的方式异步唤醒(wakeup)GroupCommitService后,直接返回消息保存成功。

    四种刷盘方式源码如下所示

    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // 同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // 获取同步刷盘Service
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                // 创建GroupCommitRequest 刷盘偏移量nextOffset = 当前写入偏移量 + 当前消息写入大小
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                // 向刷盘监视器(flushDistWatch)提交刷盘请求
                flushDiskWatcher.add(request);
                // 提交刷盘请求,并且唤醒同步刷盘线程
                service.putRequest(request);
                return request.future();
            } else {
                // 同步刷盘,但是不需要等待刷盘结果,那么唤醒同步刷盘线程
                service.wakeup();
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
        // 异步刷盘
        else {
            // 是否启动了堆外缓存
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                // 如果没有启动堆外缓存,则唤醒异步刷盘服务 flushRealTimeService
                flushCommitLogService.wakeup();
            } else  {
                // 如果启动了堆外缓存,则唤醒异步转存服务CommitRealTimeService
                commitLogService.wakeup();
            }
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    

    将上面四种场景及调用关系如下图所示

    RocketMQ Broker消息如何刷盘源码解析

    总结javascript

    本篇文章介绍了TransientStorePool机制以及开启和管理队消息保存的影响,我们还介绍了RocketMQ中四种刷盘策略

    • 同步刷盘-等待消息保存到磁盘
    • 同步刷盘-不等待消息保存到磁盘上
    • 异步刷盘-开启堆外缓存
    • 异步刷盘-不开启堆外缓存

    以上就是RocketMQ Broker消息如何刷盘源码解析的详细内容,更多关于RocketMQ Broker消息刷盘的资料请关注我们其它相关文章!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号