???关注微信公众号:【芋艿的后端小屋】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右

  • 1、概述
  • 2、CommitLog 结构
  • 3、CommitLog 存储消息
    • CommitLog#putMessage(...)
    • MappedFileQueue#getLastMappedFile(...)
    • MappedFile#appendMessage(...)
    • DefaultAppendMessageCallback#doAppend(...)
    • FlushCommitLogService
      • MappedFile#落盘
      • FlushRealTimeService
      • CommitRealTimeService
      • GroupCommitService
  • 结尾

1、概述

本文接《RocketMQ 源码分析 —— Message 发送与接收》。
主要解析 CommitLog 存储消息部分。

2、CommitLog 结构

CommitLogMappedFileQueueMappedFile 的关系如下:

CommitLog、MappedFileQueue、MappedFile的关系

CommitLog : MappedFileQueue : MappedFile = 1 : 1 : N。

反应到系统文件如下:

Yunai-MacdeMacBook-Pro-2:commitlog yunai$ pwd
/Users/yunai/store/commitlog
Yunai-MacdeMacBook-Pro-2:commitlog yunai$ ls -l
total 10485760
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:27 00000000000000000000
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:29 00000000001073741824
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000002147483648
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:33 00000000003221225472
-rw-r--r--  1 yunai  staff  1073741824  4 21 16:32 00000000004294967296复制代码

CommitLogMappedFileQueueMappedFile 的定义如下:

  • MappedFile :00000000000000000000、00000000001073741824、00000000002147483648等文件。
  • MappedFileQueueMappedFile 所在的文件夹,对 MappedFile 进行封装成文件队列,对上层提供可无限使用的文件容量。
    • 每个 MappedFile 统一文件大小。
    • 文件命名方式:fileName[n] = fileName[n - 1] + mappedFileSize。在 CommitLog 里默认为 1GB。
  • CommitLog :针对 MappedFileQueue 的封装使用。

CommitLog 目前存储在 MappedFile 有两种内容类型:

  1. MESSAGE :消息。
  2. BLANK :文件不足以存储消息时的空白占位。

CommitLog 存储在 MappedFile的结构:

MESSAGE[1] MESSAGE[2] ... MESSAGE[n - 1] MESSAGE[n] BLANK

MESSAGECommitLog 存储结构:

第几位 字段 说明 数据类型 字节数
1 MsgLen 消息总长度 Int 4
2 MagicCode MESSAGE_MAGIC_CODE Int 4
3 BodyCRC 消息内容CRC Int 4
4 QueueId 消息队列编号 Int 4
5 Flag flag Int 4
6 QueueOffset 消息队列位置 Long 8
7 PhysicalOffset 物理位置。在 CommitLog 的顺序存储位置。 Long 8
8 SysFlag MessageSysFlag Int 4
9 BornTimestamp 生成消息时间戳 Long 8
10 BornHost 生效消息的地址+端口 Long 8
11 StoreTimestamp 存储消息时间戳 Long 8
12 StoreHost 存储消息的地址+端口 Long 8
13 ReconsumeTimes 重新消费消息次数 Int 4
14 PreparedTransationOffset Long 8
15 BodyLength + Body 内容长度 + 内容 Int + Bytes 4 + bodyLength
16 TopicLength + Topic Topic长度 + Topic Byte + Bytes 1 + topicLength
17 PropertiesLength + Properties 拓展字段长度 + 拓展字段 Short + Bytes 2 + PropertiesLength

BLANKCommitLog 存储结构:

第几位 字段 说明 数据类型 字节数
1 maxBlank 空白长度 Int 4
2 MagicCode BLANK_MAGIC_CODE Int 4

3、CommitLog 存储消息

Broker存储发送消息顺序图

CommitLog#putMessage(...)

  1: public PutMessageResult putMessage(final MessageExtBrokerInner msg) {2:     // Set the storage time3:     msg.setStoreTimestamp(System.currentTimeMillis());4:     // Set the message body BODY CRC (consider the most appropriate setting5:     // on the client)6:     msg.setBodyCRC(UtilAll.crc32(msg.getBody()));7:     // Back to Results8:     AppendMessageResult result = null;9: 10:     StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();11: 12:     String topic = msg.getTopic();13:     int queueId = msg.getQueueId();14: 15:     // 事务相关 TODO 待读:事务相关16:     final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());17:     if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//18:         || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {19:         // Delay Delivery20:         if (msg.getDelayTimeLevel() > 0) {21:             if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {22:                 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());23:             }24: 25:             topic = ScheduleMessageService.SCHEDULE_TOPIC;26:             queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());27: 28:             // Backup real topic, queueId29:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());30:             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));31:             msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));32: 33:             msg.setTopic(topic);34:             msg.setQueueId(queueId);35:         }36:     }37: 38:     long eclipseTimeInLock = 0;39: 40:     // 获取写入映射文件41:     MappedFile unlockMappedFile = null;42:     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();43: 44:     // 获取写入锁45:     lockForPutMessage(); //spin...46:     try {47:         long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();48:         this.beginTimeInLock = beginLockTimestamp;49: 50:         // Here settings are stored timestamp, in order to ensure an orderly51:         // global52:         msg.setStoreTimestamp(beginLockTimestamp);53: 54:         // 当不存在映射文件时,进行创建55:         if (null == mappedFile || mappedFile.isFull()) {56:             mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise57:         }58:         if (null == mappedFile) {59:             log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());60:             beginTimeInLock = 0;61:             return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);62:         }63: 64:         // 存储消息65:         result = mappedFile.appendMessage(msg, this.appendMessageCallback);66:         switch (result.getStatus()) {67:             case PUT_OK:68:                 break;69:             case END_OF_FILE: // 当文件尾时,获取新的映射文件,并进行插入70:                 unlockMappedFile = mappedFile;71:                 // Create a new file, re-write the message72:                 mappedFile = this.mappedFileQueue.getLastMappedFile(0);73:                 if (null == mappedFile) {74:                     // XXX: warn and notify me75:                     log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());76:                     beginTimeInLock = 0;77:                     return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);78:                 }79:                 result = mappedFile.appendMessage(msg, this.appendMessageCallback);80:                 break;81:             case MESSAGE_SIZE_EXCEEDED:82:             case PROPERTIES_SIZE_EXCEEDED:83:                 beginTimeInLock = 0;84:                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);85:             case UNKNOWN_ERROR:86:                 beginTimeInLock = 0;87:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);88:             default:89:                 beginTimeInLock = 0;90:                 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);91:         }92: 93:         eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;94:         beginTimeInLock = 0;95:     } finally {96:         // 释放写入锁97:         releasePutMessageLock();98:     }99:
100:     if (eclipseTimeInLock > 500) {
101:         log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
102:     }
103:
104:     //
105:     if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
106:         this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
107:     }
108:
109:     PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
110:
111:     // Statistics
112:     storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
113:     storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
114:
115:     // 进行同步||异步 flush||commit
116:     GroupCommitRequest request = null;
117:     // Synchronization flush
118:     if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
119:         final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
120:         if (msg.isWaitStoreMsgOK()) {
121:             request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
122:             service.putRequest(request);
123:             boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
124:             if (!flushOK) {
125:                 log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
126:                     + " client address: " + msg.getBornHostString());
127:                 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
128:             }
129:         } else {
130:             service.wakeup();
131:         }
132:     }
133:     // Asynchronous flush
134:     else {
135:         if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
136:             flushCommitLogService.wakeup(); // important:唤醒commitLog线程,进行flush
137:         } else {
138:             commitLogService.wakeup();
139:         }
140:     }
141:
142:     // Synchronous write double 如果是同步Master,同步到从节点 // TODO 待读:数据同步
143:     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
144:         HAService service = this.defaultMessageStore.getHaService();
145:         if (msg.isWaitStoreMsgOK()) {
146:             // Determine whether to wait
147:             if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
148:                 if (null == request) {
149:                     request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
150:                 }
151:                 service.putRequest(request);
152:
153:                 service.getWaitNotifyObject().wakeupAll();
154:
155:                 boolean flushOK =
156:                     // TODO
157:                     request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
158:                 if (!flushOK) {
159:                     log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
160:                         + msg.getTags() + " client address: " + msg.getBornHostString());
161:                     putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
162:                 }
163:             }
164:             // Slave problem
165:             else {
166:                 // Tell the producer, slave not available
167:                 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
168:             }
169:         }
170:     }
171:
172:     return putMessageResult;
173: }复制代码
  • 说明 :存储消息,并返回存储结果。
  • 第 2 行 :设置存储时间等。
  • 第 16 至 36 行 :事务消息相关,暂未了解。
  • 第 45 & 97 行 :获取锁与释放锁。
  • 第 52 行 :再次设置存储时间。目前会有多处地方设置存储时间。
  • 第 55 至 62 行 :获取 MappedFile,若不存在或已满,则进行创建。详细解析见:MappedFileQueue#getLastMappedFile(...)。
  • 第 65 行 :插入消息MappedFile,解析解析见:MappedFile#appendMessage(...)。
  • 第 69 至 80 行 :MappedFile 已满,创建新的,再次插入消息
  • 第 116 至 140 行 :消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
  • 第 143 至 173 行 :Broker 主从同步。后面的文章会详细解析?。

MappedFileQueue#getLastMappedFile(...)

  1: public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {2:     long createOffset = -1; // 创建文件开始offset。-1时,不创建3:     MappedFile mappedFileLast = getLastMappedFile();4: 5:     if (mappedFileLast == null) { // 一个映射文件都不存在6:         createOffset = startOffset - (startOffset % this.mappedFileSize);7:     }8: 9:     if (mappedFileLast != null && mappedFileLast.isFull()) { // 最后一个文件已满10:         createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;11:     }12: 13:     if (createOffset != -1 && needCreate) { // 创建文件14:         String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);15:         String nextNextFilePath = this.storePath + File.separator16:             + UtilAll.offset2FileName(createOffset + this.mappedFileSize);17:         MappedFile mappedFile = null;18: 19:         if (this.allocateMappedFileService != null) {20:             mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,21:                 nextNextFilePath, this.mappedFileSize);22:         } else {23:             try {24:                 mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);25:             } catch (IOException e) {26:                 log.error("create mappedFile exception", e);27:             }28:         }29: 30:         if (mappedFile != null) {31:             if (this.mappedFiles.isEmpty()) {32:                 mappedFile.setFirstCreateInQueue(true);33:             }34:             this.mappedFiles.add(mappedFile);35:         }36: 37:         return mappedFile;38:     }39: 40:     return mappedFileLast;41: }复制代码
  • 说明 :获取最后一个 MappedFile,若不存在或文件已满,则进行创建。
  • 第 5 至 11 行 :计算当文件不存在或已满时,新创建文件的 createOffset
  • 第 14 行 :计算文件名。从此处我们可
    以得知,MappedFile的文件命名规则:

    fileName[n] = fileName[n - 1] + n * mappedFileSize
    fileName[0] = startOffset - (startOffset % this.mappedFileSize)

    目前 CommitLogstartOffset 为 0。
    此处有个疑问,为什么需要 (startOffset % this.mappedFileSize)。例如:

    | startOffset | mappedFileSize | createOffset |
    | --- | :-- | :-- |
    | 5 | 1 | 5 |
    | 5 | 2 | 4 |
    | 5 | 3 | 3 |
    | 5 | 4 | 4 |
    | 5 | > 5 | 0 |

    如果有知道的同学,麻烦提示下。?
    解答:fileName[0] = startOffset - (startOffset % this.mappedFileSize) 计算出来的是,以 this.mappedFileSize 为每个文件大小时,startOffset 所在文件的开始offset

  • 第 30 至 35 行 :设置 MappedFile是否是第一个创建的文件。该标识用于 ConsumeQueue 对应的 MappedFile ,详见 ConsumeQueue#fillPreBlank

MappedFile#appendMessage(...)

  1: public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {2:     assert msg != null;3:     assert cb != null;4: 5:     int currentPos = this.wrotePosition.get();6: 7:     if (currentPos < this.fileSize) {8:         ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();9:         byteBuffer.position(currentPos);10:         AppendMessageResult result =11:             cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);12:         this.wrotePosition.addAndGet(result.getWroteBytes());13:         this.storeTimestamp = result.getStoreTimestamp();14:         return result;15:     }16: 17:     log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "18:         + this.fileSize);19:     return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);20: }复制代码
  • 说明 :插入消息MappedFile,并返回插入结果。
  • 第 8 行 :获取需要写入的字节缓冲区。为什么会有 writeBuffer != null 的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。
  • 第 9 至 11 行 :设置写入 position,执行写入,更新 wrotePosition(当前写入位置,下次开始写入开始位置)。

DefaultAppendMessageCallback#doAppend(...)

  1: class DefaultAppendMessageCallback implements AppendMessageCallback {2:     // File at the end of the minimum fixed length empty3:     private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;4:     /**5:      * 存储在内存中的消息编号字节Buffer6:      */7:     private final ByteBuffer msgIdMemory;8:     /**9:      * Store the message content10:      * 存储在内存中的消息字节Buffer11:      * 当消息传递到{@link #doAppend(long, ByteBuffer, int, MessageExtBrokerInner)}方法时,最终写到该参数12:      */13:     private final ByteBuffer msgStoreItemMemory;14:     /**15:      * The maximum length of the message16:      * 消息最大长度17:      */18:     private final int maxMessageSize;19:     /**20:      * Build Message Key21:      * {@link #topicQueueTable}的key22:      * 计算方式:topic + "-" + queueId23:      */24:     private final StringBuilder keyBuilder = new StringBuilder();25:     /**26:      * host字节buffer27:      * 用于重复计算host的字节内容28:      */29:     private final ByteBuffer hostHolder = ByteBuffer.allocate(8);30: 31:     DefaultAppendMessageCallback(final int size) {32:         this.msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);33:         this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH);34:         this.maxMessageSize = size;35:     }36: 37:     public ByteBuffer getMsgStoreItemMemory() {38:         return msgStoreItemMemory;39:     }40: 41:     public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {42:         // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>43: 44:         // PHY OFFSET45:         long wroteOffset = fileFromOffset + byteBuffer.position();46: 47:         // 计算commitLog里的msgId48:         this.resetByteBuffer(hostHolder, 8);49:         String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);50: 51:         // Record ConsumeQueue information 获取队列offset52:         keyBuilder.setLength(0);53:         keyBuilder.append(msgInner.getTopic());54:         keyBuilder.append('-');55:         keyBuilder.append(msgInner.getQueueId());56:         String key = keyBuilder.toString();57:         Long queueOffset = CommitLog.this.topicQueueTable.get(key);58:         if (null == queueOffset) {59:             queueOffset = 0L;60:             CommitLog.this.topicQueueTable.put(key, queueOffset);61:         }62: 63:         // Transaction messages that require special handling // TODO 疑问:用途64:         final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());65:         switch (tranType) {66:             // Prepared and Rollback message is not consumed, will not enter the67:             // consumer queue68:             case MessageSysFlag.TRANSACTION_PREPARED_TYPE:69:             case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:70:                 queueOffset = 0L;71:                 break;72:             case MessageSysFlag.TRANSACTION_NOT_TYPE:73:             case MessageSysFlag.TRANSACTION_COMMIT_TYPE:74:             default:75:                 break;76:         }77: 78:         // 计算消息长度79:         final byte[] propertiesData =80:             msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);81:         final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;82:         if (propertiesLength > Short.MAX_VALUE) {83:             log.warn("putMessage message properties length too long. length={}", propertiesData.length);84:             return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);85:         }86:         final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);87:         final int topicLength = topicData.length;88:         final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;89:         final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);90:         // Exceeds the maximum message91:         if (msgLen > this.maxMessageSize) {92:             CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength93:                 + ", maxMessageSize: " + this.maxMessageSize);94:             return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);95:         }96: 97:         // Determines whether there is sufficient(足够) free space98:         if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {99:             this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
100:             // 1 TOTAL_SIZE
101:             this.msgStoreItemMemory.putInt(maxBlank);
102:             // 2 MAGIC_CODE
103:             this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
104:             // 3 The remaining space may be any value
105:             //
106:
107:             // Here the length of the specially set maxBlank
108:             final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
109:             byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
110:             return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
111:                 queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
112:         }
113:
114:         // Initialization of storage space
115:         this.resetByteBuffer(msgStoreItemMemory, msgLen);
116:         // 1 TOTAL_SIZE
117:         this.msgStoreItemMemory.putInt(msgLen);
118:         // 2 MAGIC_CODE
119:         this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
120:         // 3 BODY_CRC
121:         this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
122:         // 4 QUEUE_ID
123:         this.msgStoreItemMemory.putInt(msgInner.getQueueId());
124:         // 5 FLAG
125:         this.msgStoreItemMemory.putInt(msgInner.getFlag());
126:         // 6 QUEUE_OFFSET
127:         this.msgStoreItemMemory.putLong(queueOffset);
128:         // 7 PHYSICAL_OFFSET
129:         this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
130:         // 8 SYS_FLAG
131:         this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
132:         // 9 BORN_TIMESTAMP
133:         this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
134:         // 10 BORN_HOST
135:         this.resetByteBuffer(hostHolder, 8);
136:         this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
137:         // 11 STORE_TIMESTAMP
138:         this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
139:         // 12 STORE_HOST_ADDRESS
140:         this.resetByteBuffer(hostHolder, 8);
141:         this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
142:         //this.msgStoreItemMemory.put(msgInner.getStoreHostBytes());
143:         // 13 RECONSUME_TIMES
144:         this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
145:         // 14 Prepared Transaction Offset
146:         this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
147:         // 15 BODY
148:         this.msgStoreItemMemory.putInt(bodyLength);
149:         if (bodyLength > 0)
150:             this.msgStoreItemMemory.put(msgInner.getBody());
151:         // 16 TOPIC
152:         this.msgStoreItemMemory.put((byte) topicLength);
153:         this.msgStoreItemMemory.put(topicData);
154:         // 17 PROPERTIES
155:         this.msgStoreItemMemory.putShort((short) propertiesLength);
156:         if (propertiesLength > 0)
157:             this.msgStoreItemMemory.put(propertiesData);
158:
159:         final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
160:         // Write messages to the queue buffer
161:         byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
162:
163:         AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
164:             msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
165:
166:         switch (tranType) {
167:             case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
168:             case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
169:                 break;
170:             case MessageSysFlag.TRANSACTION_NOT_TYPE:
171:             case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
172:                 // The next update ConsumeQueue information 更新队列的offset
173:                 CommitLog.this.topicQueueTable.put(key, ++queueOffset);
174:                 break;
175:             default:
176:                 break;
177:         }
178:         return result;
179:     }
180:
181:     /**
182:      * 重置字节缓冲区
183:      *
184:      * @param byteBuffer 字节缓冲区
185:      * @param limit 长度
186:      */
187:     private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) {
188:         byteBuffer.flip();
189:         byteBuffer.limit(limit);
190:     }
191: }复制代码
  • 说明 :插入消息到字节缓冲区。
  • 第 45 行 :计算物理位置。在 CommitLog 的顺序存储位置。
  • 第 47 至 49 行 :计算 CommitLog 里的 offsetMsgId。这里一定要和 msgId 区分开。
计算方式 长度
offsetMsgId Broker存储时生成 Hex(storeHostBytes, wroteOffset) 32
msgId Client发送消息时生成 Hex(进程编号, IP, ClassLoader, startTime, currentTime, 自增序列) 32 《RocketMQ 源码分析 —— Message 基础》
  • 第 51 至 61 行 :获取队列位置(offset)。
  • 第 78 至 95 行 :计算消息总长度。
  • 第 98 至 112 行 :当文件剩余空间不足时,写入 BLANK 占位,返回结果。
  • 第 114 至 161 行 :写入 MESSAGE
  • 第 173 行 :更新队列位置(offset)。

FlushCommitLogService

FlushCommitLogService类图
线程服务 场景 插入消息性能
CommitRealTimeService 异步刷盘 && 开启内存字节缓冲区 第一
FlushRealTimeService 异步刷盘 && 关闭内存字节缓冲区 第二
GroupCommitService 同步刷盘 第三

MappedFile#落盘

方式
方式一 写入内存字节缓冲区(writeBuffer) 从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) 文件通道(fileChannel)flush
方式二 写入映射文件字节缓冲区(mappedByteBuffer) 映射文件字节缓冲区(mappedByteBuffer)flush
MappedFile的position迁移图

flush相关代码

考虑到写入性能,满足 flushLeastPages * OS_PAGE_SIZE 才进行 flush

  1: /**2:  * flush3:  *4:  * @param flushLeastPages flush最小页数5:  * @return The current flushed position6:  */7: public int flush(final int flushLeastPages) {8:     if (this.isAbleToFlush(flushLeastPages)) {9:         if (this.hold()) {10:             int value = getReadPosition();11: 12:             try {13:                 //We only append data to fileChannel or mappedByteBuffer, never both.14:                 if (writeBuffer != null || this.fileChannel.position() != 0) {15:                     this.fileChannel.force(false);16:                 } else {17:                     this.mappedByteBuffer.force();18:                 }19:             } catch (Throwable e) {20:                 log.error("Error occurred when force data to disk.", e);21:             }22: 23:             this.flushedPosition.set(value);24:             this.release();25:         } else {26:             log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());27:             this.flushedPosition.set(getReadPosition());28:         }29:     }30:     return this.getFlushedPosition();31: }32: 33: /**34:  * 是否能够flush。满足如下条件任意条件:35:  * 1. 映射文件已经写满36:  * 2. flushLeastPages > 0 && 未flush部分超过flushLeastPages37:  * 3. flushLeastPages = 0 && 有新写入部分38:  *39:  * @param flushLeastPages flush最小分页40:  * @return 是否能够写入41:  */42: private boolean isAbleToFlush(final int flushLeastPages) {43:     int flush = this.flushedPosition.get();44:     int write = getReadPosition();45: 46:     if (this.isFull()) {47:         return true;48:     }49: 50:     if (flushLeastPages > 0) {51:         return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;52:     }53: 54:     return write > flush;55: }复制代码

commit相关代码:

考虑到写入性能,满足 commitLeastPages * OS_PAGE_SIZE 才进行 commit

  1: /**2:  * commit3:  * 当{@link #writeBuffer}为null时,直接返回{@link #wrotePosition}4:  *5:  * @param commitLeastPages commit最小页数6:  * @return 当前commit位置7:  */8: public int commit(final int commitLeastPages) {9:     if (writeBuffer == null) {10:         //no need to commit data to file channel, so just regard wrotePosition as committedPosition.11:         return this.wrotePosition.get();12:     }13:     if (this.isAbleToCommit(commitLeastPages)) {14:         if (this.hold()) {15:             commit0(commitLeastPages);16:             this.release();17:         } else {18:             log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());19:         }20:     }21: 22:     // All dirty data has been committed to FileChannel. 写到文件尾时,回收writeBuffer。23:     if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {24:         this.transientStorePool.returnBuffer(writeBuffer);25:         this.writeBuffer = null;26:     }27: 28:     return this.committedPosition.get();29: }30: 31: /**32:  * commit实现,将writeBuffer写入fileChannel。33:  * @param commitLeastPages commit最小页数。用不上该参数34:  */35: protected void commit0(final int commitLeastPages) {36:     int writePos = this.wrotePosition.get();37:     int lastCommittedPosition = this.committedPosition.get();38: 39:     if (writePos - this.committedPosition.get() > 0) {40:         try {41:             // 设置需要写入的byteBuffer42:             ByteBuffer byteBuffer = writeBuffer.slice();43:             byteBuffer.position(lastCommittedPosition);44:             byteBuffer.limit(writePos);45:             // 写入fileChannel46:             this.fileChannel.position(lastCommittedPosition);47:             this.fileChannel.write(byteBuffer);48:             // 设置position49:             this.committedPosition.set(writePos);50:         } catch (Throwable e) {51:             log.error("Error occurred when commit data to FileChannel.", e);52:         }53:     }54: }55: 56: /**57:  * 是否能够commit。满足如下条件任意条件:58:  * 1. 映射文件已经写满59:  * 2. commitLeastPages > 0 && 未commit部分超过commitLeastPages60:  * 3. commitLeastPages = 0 && 有新写入部分61:  *62:  * @param commitLeastPages commit最小分页63:  * @return 是否能够写入64:  */65: protected boolean isAbleToCommit(final int commitLeastPages) {66:     int flush = this.committedPosition.get();67:     int write = this.wrotePosition.get();68: 69:     if (this.isFull()) {70:         return true;71:     }72: 73:     if (commitLeastPages > 0) {74:         return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;75:     }76: 77:     return write > flush;78: }复制代码

FlushRealTimeService

消息插入成功时,异步刷盘时使用。

  1: class FlushRealTimeService extends FlushCommitLogService {2:     /**3:      * 最后flush时间戳4:      */5:     private long lastFlushTimestamp = 0;6:     /**7:      * print计时器。8:      * 满足print次数时,调用{@link #printFlushProgress()}9:      */10:     private long printTimes = 0;11: 12:     public void run() {13:         CommitLog.log.info(this.getServiceName() + " service started");14: 15:         while (!this.isStopped()) {16:             boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();17:             int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();18:             int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();19:             int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();20: 21:             // Print flush progress22:             // 当时间满足flushPhysicQueueThoroughInterval时,即使写入的数量不足flushPhysicQueueLeastPages,也进行flush23:             boolean printFlushProgress = false;24:             long currentTimeMillis = System.currentTimeMillis();25:             if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {26:                 this.lastFlushTimestamp = currentTimeMillis;27:                 flushPhysicQueueLeastPages = 0;28:                 printFlushProgress = (printTimes++ % 10) == 0;29:             }30: 31:             try {32:                 // 等待执行33:                 if (flushCommitLogTimed) {34:                     Thread.sleep(interval);35:                 } else {36:                     this.waitForRunning(interval);37:                 }38: 39:                 if (printFlushProgress) {40:                     this.printFlushProgress();41:                 }42: 43:                 // flush commitLog44:                 long begin = System.currentTimeMillis();45:                 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);46:                 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();47:                 if (storeTimestamp > 0) {48:                     CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);49:                 }50:                 long past = System.currentTimeMillis() - begin;51:                 if (past > 500) {52:                     log.info("Flush data to disk costs {} ms", past);53:                 }54:             } catch (Throwable e) {55:                 CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);56:                 this.printFlushProgress();57:             }58:         }59: 60:         // Normal shutdown, to ensure that all the flush before exit61:         boolean result = false;62:         for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {63:             result = CommitLog.this.mappedFileQueue.flush(0);64:             CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));65:         }66: 67:         this.printFlushProgress();68: 69:         CommitLog.log.info(this.getServiceName() + " service end");70:     }71: 72:     @Override73:     public String getServiceName() {74:         return FlushRealTimeService.class.getSimpleName();75:     }76: 77:     private void printFlushProgress() {78:         // CommitLog.log.info("how much disk fall behind memory, "79:         // + CommitLog.this.mappedFileQueue.howMuchFallBehind());80:     }81: 82:     @Override83:     @SuppressWarnings("SpellCheckingInspection")84:     public long getJointime() {85:         return 1000 * 60 * 5;86:     }87: }复制代码
  • 说明:实时 flush线程服务,调用 MappedFile#flush 相关逻辑。
  • 第 23 至 29 行 :每 flushPhysicQueueThoroughInterval 周期,执行一次 flush 。因为不是每次循环到都能满足 flushCommitLogLeastPages 大小,因此,需要一定周期进行一次强制 flush 。当然,不能每次循环都去执行强制 flush,这样性能较差。
  • 第 33 行 至 37 行 :根据 flushCommitLogTimed 参数,可以选择每次循环是固定周期还是等待唤醒。默认配置是后者,所以,每次插入消息完成,会去调用 commitLogService.wakeup()
  • 第 45 行 :调用 MappedFile 进行 flush
  • 第 61 至 65 行 :Broker 关闭时,强制 flush,避免有未刷盘的数据。

CommitRealTimeService

消息插入成功时,异步刷盘时使用。
FlushRealTimeService 类似,性能更好。

  1: class CommitRealTimeService extends FlushCommitLogService {2: 3:     /**4:      * 最后 commit 时间戳5:      */6:     private long lastCommitTimestamp = 0;7: 8:     @Override9:     public String getServiceName() {10:         return CommitRealTimeService.class.getSimpleName();11:     }12: 13:     @Override14:     public void run() {15:         CommitLog.log.info(this.getServiceName() + " service started");16:         while (!this.isStopped()) {17:             int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();18:             int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();19:             int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();20: 21:             // 当时间满足commitDataThoroughInterval时,即使写入的数量不足commitDataLeastPages,也进行flush22:             long begin = System.currentTimeMillis();23:             if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {24:                 this.lastCommitTimestamp = begin;25:                 commitDataLeastPages = 0;26:             }27: 28:             try {29:                 // commit30:                 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);31:                 long end = System.currentTimeMillis();32:                 if (!result) { // TODO 疑问:未写入成功,为啥要唤醒flushCommitLogService33:                     this.lastCommitTimestamp = end; // result = false means some data committed.34:                     //now wake up flush thread.35:                     flushCommitLogService.wakeup();36:                 }37: 38:                 if (end - begin > 500) {39:                     log.info("Commit data to file costs {} ms", end - begin);40:                 }41: 42:                 // 等待执行43:                 this.waitForRunning(interval);44:             } catch (Throwable e) {45:                 CommitLog.log.error(this.getServiceName() + " service has exception. ", e);46:             }47:         }48: 49:         boolean result = false;50:         for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {51:             result = CommitLog.this.mappedFileQueue.commit(0);52:             CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));53:         }54:         CommitLog.log.info(this.getServiceName() + " service end");55:     }56: }复制代码

GroupCommitService

消息插入成功时,同步刷盘时使用。

  1: class GroupCommitService extends FlushCommitLogService {2:     /**3:      * 写入请求队列4:      */5:     private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<>();6:     /**7:      * 读取请求队列8:      */9:     private volatile List<GroupCommitRequest> requestsRead = new ArrayList<>();10: 11:     /**12:      * 添加写入请求13:      *14:      * @param request 写入请求15:      */16:     public synchronized void putRequest(final GroupCommitRequest request) {17:         // 添加写入请求18:         synchronized (this.requestsWrite) {19:             this.requestsWrite.add(request);20:         }21:         // 切换读写队列22:         if (hasNotified.compareAndSet(false, true)) {23:             waitPoint.countDown(); // notify24:         }25:     }26: 27:     /**28:      * 切换读写队列29:      */30:     private void swapRequests() {31:         List<GroupCommitRequest> tmp = this.requestsWrite;32:         this.requestsWrite = this.requestsRead;33:         this.requestsRead = tmp;34:     }35: 36:     private void doCommit() {37:         synchronized (this.requestsRead) {38:             if (!this.requestsRead.isEmpty()) {39:                 for (GroupCommitRequest req : this.requestsRead) {40:                     // There may be a message in the next file, so a maximum of41:                     // two times the flush (可能批量提交的messages,分布在两个MappedFile)42:                     boolean flushOK = false;43:                     for (int i = 0; i < 2 && !flushOK; i++) {44:                         // 是否满足需要flush条件,即请求的offset超过flush的offset45:                         flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();46:                         if (!flushOK) {47:                             CommitLog.this.mappedFileQueue.flush(0);48:                         }49:                     }50:                     // 唤醒等待请求51:                     req.wakeupCustomer(flushOK);52:                 }53: 54:                 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();55:                 if (storeTimestamp > 0) {56:                     CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);57:                 }58: 59:                 // 清理读取队列60:                 this.requestsRead.clear();61:             } else {62:                 // Because of individual messages is set to not sync flush, it63:                 // will come to this process 不合法的请求,比如message上未设置isWaitStoreMsgOK。64:                 // 走到此处的逻辑,相当于发送一条消息,落盘一条消息,实际无批量提交的效果。65:                 CommitLog.this.mappedFileQueue.flush(0);66:             }67:         }68:     }69: 70:     public void run() {71:         CommitLog.log.info(this.getServiceName() + " service started");72: 73:         while (!this.isStopped()) {74:             try {75:                 this.waitForRunning(10);76:                 this.doCommit();77:             } catch (Exception e) {78:                 CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);79:             }80:         }81: 82:         // Under normal circumstances shutdown, wait for the arrival of the83:         // request, and then flush84:         try {85:             Thread.sleep(10);86:         } catch (InterruptedException e) {87:             CommitLog.log.warn("GroupCommitService Exception, ", e);88:         }89: 90:         synchronized (this) {91:             this.swapRequests();92:         }93: 94:         this.doCommit();95: 96:         CommitLog.log.info(this.getServiceName() + " service end");97:     }98: 99:     /**
100:      * 每次执行完,切换读写队列
101:      */
102:     @Override
103:     protected void onWaitEnd() {
104:         this.swapRequests();
105:     }
106:
107:     @Override
108:     public String getServiceName() {
109:         return GroupCommitService.class.getSimpleName();
110:     }
111:
112:     @Override
113:     public long getJointime() {
114:         return 1000 * 60 * 5;
115:     }
116: }复制代码
  • 说明:批量写入线程服务。
  • 第 16 至 25 行 :添加写入请求。方法设置了 sync 的原因:this.requestsWrite 会和 this.requestsRead 不断交换,无法保证稳定的同步。
  • 第 27 至 34 行 :读写队列交换。
  • 第 38 至 60 行 :循环写入队列,进行 flush
    • 第 43 行 :考虑到有可能每次循环的消息写入的消息,可能分布在两个 MappedFile(写第N个消息时,MappedFile 已满,创建了一个新的),所以需要有循环2次。
    • 第 51 行 :唤醒等待写入请求线程,通过 CountDownLatch 实现。
  • 第 61 至 66 行 :直接刷盘。此处是由于发送的消息的 isWaitStoreMsgOK 未设置成 TRUE ,导致未走批量提交。
  • 第 73 至 80 行 :每 10ms 执行一次批量提交。当然,如果 wakeup() 时,则会立即进行一次批量提交。当 Broker 设置成同步落盘 && 消息 isWaitStoreMsgOK=true,消息需要略大于 10ms 才能发送成功。当然,性能相对异步落盘较差,可靠性更高,需要我们在实际使用时去取舍。

结尾

写的第二篇与RocketMQ源码相关的博文,看到有阅读、点赞、收藏甚至订阅,很受鼓舞。

《Message存储》比起《Message发送&接收》从难度上说是更大的,当然也是更有趣的,如果存在理解错误或者表达不清晰,还请大家多多包含。如果可以的话,还请麻烦添加 QQ:7685413 进行指出,避免自己的理解错误,给大家造成困扰。

推荐《Kafka设计解析(六)- Kafka高性能架构之道》,作者站在的高度比我高的多的多,嗯,按照李小璐的说法:高一个喜马拉雅山。?认真啃读《Linux内核设计与实现(原书第3版)》,day day up。

再次感谢大家的阅读、点赞、收藏。

下一篇:《RocketMQ 源码分析 —— Message 拉取与消费》 起航!

RocketMQ源码解析:Message存储相关推荐

  1. RocketMQ源码解析之消息消费者(consume Message)

    原创不易,转载请注明出处 文章目录 前言 1.消息流程介绍 2.源码解析 2.1 并发消费 2.2 顺序消费 前言 我们在<RocketMQ源码解析之消息消费者(pullMessage)> ...

  2. RocketMQ源码解析之broker文件清理

    原创不易,转载请注明出处 文章目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2.源码解析 2.1 清理commitlog 2.2 Consu ...

  3. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  4. RocketMQ源码解析:Filtersrv

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  5. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

  6. 消息中间件RocketMQ源码解析-- --调试环境搭建

    1. 依赖工具 JDK :1.8+ Maven IntelliJ IDEA 2. 源码拉取 从官方仓库 [https://github.com/apache/rocketmq) Fork 出属于自己的 ...

  7. RocketMQ源码解析-Broker部分之Broker启动过程

    目录 broker启动流程 broker启动可配置参数 启动入口`BrokerStartup` 1.创建brokerController 2.`BrokerController`构造函数 3.Brok ...

  8. RocketMQ源码解析-事务消息的二阶段提交

    在生产者producer当中,通过sendMessageInTransaction()方法来发送事务消息,但是在一开始向Broker发送的事务消息的时候,具体的事务操作还并没有进行处理,而是相当于向B ...

  9. RocketMQ源码解析-Broker的HA实现

    以master异步复制为例子. 在rocketmq的slave broker机子当中,会在DefaultMessageStore的启动当中启动自己的HaService来进行自己的ha服务. publi ...

  10. RocketMQ源码解析-Consumer启动(2)

    接着上文的Pull消费者启动继续讲. public void start() throws MQClientException {switch (this.serviceState) {case CR ...

最新文章

  1. excel学习笔记之一
  2. Windows 2000/NT/XP管理员密码丢失解决方法
  3. 大话TreeMap的put,get过程
  4. iis 日志 post数据_云原生日志的趋势(1):logscape和logiq
  5. shell正则表达式去除注释行
  6. addWeighted()
  7. Tomcat下中文乱码的解决方法
  8. 如果让AI来续写这些小说……
  9. 点名软件—使用二项分布实现点名均匀分布
  10. Wireshark基础知识(一)
  11. “Defaulting to user installation because normal site-packages is not writeable“
  12. 【C/C++】关于计算机内存地址的理解
  13. 华为OD机试 - 九宫格按键输入(Python) | 机试算法备考思路
  14. L1-022 奇偶分家(c)
  15. 微信小程序开发之webview组件内网页实现微信原生支付
  16. ssh 使用新法:公网(合法 ip)用户访问内网(私有 ip)服务器(http,ftp,sshd,cvs...),
  17. 谷歌浏览器无法翻译此网页,最新解决方案(谷歌浏览器无法翻译成中文)
  18. IT审计实务沟通与实践讨论之三IT审计中IT与财务的角度转换
  19. (1)有一函数: 当x<1时,y=x 当1<=x<10时,y=2x-1 当x>=10时,y=3x-11 编写程序输入x,输出y的值
  20. Bacterial seed endophyte shapes disease resistance in rice种子内生菌与水稻抗病性的关系

热门文章

  1. lassAtitit事件代理机制原理 基于css class的事件代理的事件代理titi
  2. Atitit.git的存储结构and 追踪
  3. paip.函数式编程方法概述以及总结
  4. paip.环境配置整合 ibatis mybatis proxool
  5. Rust : 异步编程
  6. 海外资管业价格战有多疯狂?史上首个零费率基金横空出世
  7. Julia: 奇技淫巧......
  8. 【答题卡识别】基于matlab GUI hough变换答题卡判定与成绩统计(带面板)【含Matlab源码 1017期】
  9. 【优化算法】吉萨金字塔建造优化算法(GPC)【含Matlab源码 1438期】
  10. 【优化算法】混沌单纯形法算子布谷鸟搜索优化算法【含Matlab源码 1193期】