修改dtpre
队列
在考虑落盘之前,先思考另一个问题,由于dtpre
本身跟消费者无关,只跟生产者有关,其实并不需要等到消息从topic
分发到各个channel
里之后再进行commit(cacel)
,浪费存储空间(如果落盘就浪费了磁盘空间)
所以我们将dtpre
的部分移至topic
中存储
落盘逻辑
对于落盘逻辑来说,不只是dtpre
的逻辑需要修改,如此对于一般的消息来说,都应该进行落盘
而channel
来说只需要落盘数据索引
即可
修改细节
实际这次提交修改了比较多的内容,最核心的还是移除了memoryMsgChan
(无论是 topic 还是 channel),添加了backendMsgChan
.
读写轮盘在于
1.nsqd/diskqueue_reader.go
采用了定时器pull
方式,而不是之前的push
方式
这个地方可能存在性能问题,因为每次是拉去tryReadOne
的方式
2. nsqd/diskqueue_writer.go
里面writeOne
函数,其实没有写入磁盘,而是写入了一个buffer
(注意对于rocketmq
、kafka
都是采用的mmap
的方式),buffer
设定了定时器参数会定时刷盘(还有一种策略是读写次数超过多少就进行刷盘,避免短时间内出现高峰读写消息过多没有落盘的风险)
负载均衡在于
1. 消费同一channel
的多个client
,保留原有的的负载均衡,同时竞争同一个channel
顺序消费
1. 顺序消费只需要保证inflight
至多只有一个就可以了,但是dtnsq
并没有实现
2. 本质上控制inflight
的个数就可以实现了,需要在tryReadOne
调用前check
一下当前状态
索引
1. 消息在文件中保存位置,需要记录4个信息 文件index
文件偏移量
虚拟偏移量
逻辑消息index
延迟落盘的问题
断电瞬间,是会返回数据不会丢,但是会造成数据不一致的,例如:writer index索引记下来,但是数据还没来得及落盘,正常来讲,应该需要做一些修复工作;类似mysql里面的redo
和undo
(但是我偷懒没有做)
事务
由于事务型消息需要有二次confirm
,如果超时没有confirm
,会有一次回查
这个回查过程像极了消费逻辑
,如果超时没有反馈重新投递,所以这里使用了隐藏的innner topic、inner channel
去实现回查
剩下就是confirmed
的逻辑,需要在内存保存pre
状态的消息.
因为confirmed
可能是无序的,所以如果confirmed
的刚好是连续的,那么直接1.移除等待超时队列 2.将消息从inner topic
移交到正常的topic
中;
否则在内存里碎片化,需要做一些合并的逻辑,这里面参考(抄?)了有赞的nsq改造
参考
https://www.bowdoin.edu/~ltoma/teaching/cs231/spring14/Lectures/10-augmentedTrees/augtrees.pdf