RocketMQ的刷盘策略以及实现同步刷盘和异步刷盘的实例代码

这篇文章主要介绍“RocketMQ的刷盘策略以及实现同步刷盘和异步刷盘的实例代码”,在日常操作中,相信很多人在RocketMQ的刷盘策略以及实现同步刷盘和异步刷盘的实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”RocketMQ的刷盘策略以及实现同步刷盘和异步刷盘的实例代码”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

我们提供的服务有:网站制作、成都网站设计、微信公众号开发、网站优化、网站认证、临城ssl等。为上1000家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的临城网站制作公司

1、刷盘策略

RocketMQ提供了两种刷盘策略同步刷盘、异步刷盘

同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。

异步刷盘:,同步刷盘是指数据到达内存之后,返回producer说数据已经发送成功。,然后再写入commitlog日志。

复制方式优点缺点适应场景
同步刷盘保证了消息不丢失吞吐率相对于异步刷盘要低消息可靠性要求较高的场景
异步刷盘系统的吞吐量提高系统断电等异常时会有部分丢失对应吞吐量要求较高的场景

下面我们从源码的角度分析其实现的逻辑

RocketMQ的刷盘策略以及实现同步刷盘和异步刷盘的实例代码

2、同步刷盘

CommitLog.putMessage()方法中的刷盘的核心方法handleDiskFlush()

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush  同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        //客户端确认要等待刷盘成功
        if (messageExt.isWaitStoreMsgOK()) {
        	//封装刷盘请求对象 nextoffset : 当前内存写的位置 + 本次要写入的字节数
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //添加刷盘请求(后台定时任务进行刷盘,每隔10毫秒批量刷盘。10毫秒中如果有多个请求,则多个请求一块刷盘)
            service.putRequest(request);
            //等待刷盘请求结果(最长等待5秒钟,刷盘成功后马上可以获取结果。)
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }else {// Asynchronous flush 异步刷盘
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        	//唤醒FlushRealTimeService服务线程
            flushCommitLogService.wakeup();
        } else {
        	//唤醒CommitRealTimeService服务线程
            commitLogService.wakeup();
        }
    }
}

查看同步刷盘的核心类GroupCommitService中的核心属性

private volatile List requestsWrite = new ArrayList(); private volatile List requestsRead = new ArrayList(); requestsWrite : 写队列,主要用于向该线程添加刷盘任务 requestsRead : 读队列,主要用于执行特定的刷盘任务,这是是GroupCommitService 设计的一个亮点,把读写分离,每处理完requestsRead中的任务,就交换这两个队列。

我们查看其run()方法

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
        	//等待通知,如果数据过来,提前结束等待执行onWaitEnd()方法交换读写swapRequests()
        	//刷盘请求的requestsWrite->requestsRead
            this.waitForRunning(10);
            //执行刷盘
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    //省略代码...
}

waitForRunning方法中执行了swapRequests()方法

private void swapRequests() {
    List tmp = this.requestsWrite;
    this.requestsWrite = this.requestsRead;
    this.requestsRead = tmp;
}

GroupCommitService接收到的刷盘请求通过putRequest()方法加入到requestsWrite集合中,swapRequests()方法将requestsWrite请求集合交换到requestsRead集合中供刷盘使用,我们重点查看doCommit()方法

private void doCommit() {
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
        	//循环每一个刷盘请求
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; i < 2 && !flushOK; i++) {
                	//判断是否已经刷盘过了,刷盘的位置和当前消息下次刷盘需要的位置比较
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    if (!flushOK) {
                    	//0代码立刻刷盘,不管缓存中消息有多少
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
                //返回刷盘的结果
                req.wakeupCustomer(flushOK);
            }
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            //设置刷盘的时间点
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            //清空requestsRead对象
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}

mappedFileQueue.flush(0)立刻刷盘

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        //刷盘,返回刷写到磁盘指针
        int offset = mappedFile.flush(flushLeastPages);
        //计算当前的刷盘指针,之前的所有数据已经持久化到磁盘中
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }
    return result;
}

mappedFile.flush(0);保证立刻刷盘后面异步刷盘时也会调用mappedFile.flush()方法

3、异步刷盘

if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
	//唤醒FlushRealTimeService服务线程
    flushCommitLogService.wakeup();
} else {
	//唤醒CommitRealTimeService服务线程
    commitLogService.wakeup();
}

我们发现异步刷盘的时候有两种方式,一种是堆外内存池开启时启动CommitRealTimeService服务线程,另一个是默认执行的FlushRealTimeService服务线程进行刷盘操作,关于TransientStorePoolEnable在《RocketMQ内存映射》章节中的**“创建映射文件MappedFile”**中有介绍

RocketMQ的刷盘策略以及实现同步刷盘和异步刷盘的实例代码

图3-1

1、FlushRealTimeService

查看其run()方法

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
    	// 每次刷盘的间隔时间,默认 200ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
        // 每次commit最少的页数 默认4页
        int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
        // 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200
        int commitDataThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

        long begin = System.currentTimeMillis();
        //距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0
        if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
            this.lastCommitTimestamp = begin;
            commitDataLeastPages = 0;
        }
        try {
        	//刷盘
            boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
            long end = System.currentTimeMillis();
            if (!result) {
                this.lastCommitTimestamp = end; // result = false means some data committed.
                //now wake up flush thread.
                flushCommitLogService.wakeup();
            }

            if (end - begin > 500) {
                log.info("Commit data to file costs {} ms", end - begin);
            }
            this.waitForRunning(interval);
        } catch (Throwable e) {
            CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
        }
    }

    boolean result = false;
    for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
        result = CommitLog.this.mappedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }
    CommitLog.log.info(this.getServiceName() + " service end");
    }
}

这种方式和同步刷盘一样就是mappedFileQueue.commit(commitDataLeastPages)参数有限制,数据达到一定量的时候才进行刷盘操作提高数据的刷盘性能。

2、CommitRealTimeService

查看其run()方法

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
    	// 每次刷盘的间隔时间,默认 200ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
        // 每次commit最少的页数 默认4页
        int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
        // 如果上次刷新的时间+该值 小于当前时间,则改变flushPhysicQueueLeastPages =0 默认为200
        int commitDataThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

        long begin = System.currentTimeMillis();
        //距离上一次刷盘时间超过200ms则立刻刷盘,commit最少的页数置为0
        if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
            this.lastCommitTimestamp = begin;
            commitDataLeastPages = 0;
        }
        try {
        	//刷盘
            boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
            long end = System.currentTimeMillis();
            //返回的是false说明数据已经commit到了fileChannel中
            if (!result) {
                this.lastCommitTimestamp = end; // result = false means some data committed.
                //now wake up flush thread.
                flushCommitLogService.wakeup();
            }
            if (end - begin > 500) {
                log.info("Commit data to file costs {} ms", end - begin);
            }
            this.waitForRunning(interval);
        } catch (Throwable e) {
            CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
        }
    }
    boolean result = false;
    for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
        result = CommitLog.this.mappedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }
    CommitLog.log.info(this.getServiceName() + " service end");
    }
}

我们发现其刷盘方法不一样mappedFileQueue.commit()调用MappedFile.commit()方法

public int commit(final int commitLeastPages) {
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
    //如果提交的数据不满commitLeastPages则不执行本次的提交,待下一次提交
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }

    return this.committedPosition.get();
}

查看其核心刷盘方法

protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();
    if (writePos - this.committedPosition.get() > 0) {
        try {
        	//创建writeBuffer的共享缓存区
            ByteBuffer byteBuffer = writeBuffer.slice();
            //将指针回退到上一次提交的位置
            byteBuffer.position(lastCommittedPosition);
            //设置limit为writePos
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            //将committedPosition指针到wrotePosition的数据复制(写入)到fileChannel中
            this.fileChannel.write(byteBuffer);
            //更新committedPosition指针为writePos
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

commit0()只是将缓存数据加入到fileChannel中,我们在CommitRealTimeService.run()方法中看到唤醒flushCommitLogService线程需要将fileChannel中的数据flush到磁盘中,我们发现两种方式都需要走flushCommitLogService.run()方法最后都执行MappedFile.flush(int)

public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();
            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
            //设置刷盘后的指针
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

两种缓存方式走的刷盘逻辑也不同,可以查看**“图3-1”**两种方式的处理流程图

我们还发现一个方法isAbleToFlush()判断是否需要刷盘

private boolean isAbleToFlush(final int flushLeastPages) {
    int flush = this.flushedPosition.get();
    int write = getReadPosition();
    if (this.isFull()) {
        return true;
    }
    if (flushLeastPages > 0) {
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
    }
    return write > flush;
}

同步刷盘时flushLeastPages=0立刻刷盘

异步刷盘时flushLeastPages=4 ,默认是4,需要刷盘的数据达到PageCache的页数4倍时才会刷盘,或者距上一次刷盘时间>=200ms则设置flushLeastPages=0立刻刷盘

同步刷盘时无论消息的大小都立刻刷盘,线程阻塞等待刷盘结果

异步刷盘有两种方式但是其逻辑都是需要刷盘的数据OS_PAGE_SIZE的4倍即(1024 * 4)*4=16k或者距上一次刷盘时间>=200ms时才刷盘,提高数据的刷盘性能

到此,关于“RocketMQ的刷盘策略以及实现同步刷盘和异步刷盘的实例代码”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


分享文章:RocketMQ的刷盘策略以及实现同步刷盘和异步刷盘的实例代码
分享URL:http://csdahua.cn/article/jjpidg.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流