nsq 支持分布式事务
原意
对于微服务架构来说,当数据垂直拆分到各个服务之后,通常通过调用接口的方式(或者异步消息)来保证数据一致性,但是如果出现被调用服务暂时性宕机(或者非平滑发布),那么两边数据就不一致了。
ps : 我认为还有一种情况,由于互联网人员流动性比较高,许多非核心业务中转好几个产品经理手,最后在新业务与旧业务联动的时候,会有遗漏一些业务异常情况,我暂时称之为"数据联动性",目前我除了规范业务流程,没有更好的方案;而本文所要解决的场景,称之为"分布式事务一致性"
dt协议
对于A、B两个服务之间某个请求有分布式事务关联,使用如下方式,将分布式事务
转化成单表事务
其中 send preapred
和 A服务的单表sql
顺序是可以随意的
实现这里需要修改一下nsq的代码,每一个client
连接到nsqd
都需要做一个协商(IDENTIFY)协议的版本号,通过magic
字段来协商,目前nsq支持的是 V1
和V2
两个版本
所以我们需要添加一个文件protocol_dt.go
,里面的内容主要是照搬protocol_v2.go
,提交主要记录主要在 84dfb9
dtPrePQ
我们需要一些方式来存储 preapred
这种中间态的消息
对于nsq来说,当整个流程都在内存进行的时候(后续做镜像队列的时候再考虑落磁盘的问题),消息经过topic.go
会被复制到 多个消费者产生的channel.go
(c.put
),主要是在memoryMsgChan
通道处理的(对应处理逻辑在文件nsqd/protocol_dt.go 函数messagePump);
注意channel.go
给消费者投递消息之前,是需要将消息通过StartInFlightTimeout
塞到in_flight
队列(主要为了保证每条消息至少被消费一次语义);如果投递消费者的消息长时间没有得到ACK
,in_flight
会触发超时逻辑,拿出来重新塞到in_deffer
队列(为了做延迟重试);
为了快速实现preapred
队列,可以照搬一些优先队列in_flight
的逻辑,当然有几点不同:
- 将
dt
协议类型的message
赋予几种状态pre cmt cnl
- 插入时机是在
topic.go
复制消息 到channel.go
的时候,此时暂时不需要put
,临时保存消息即可 - 返回给生产者的内容,不能是再是简单的
ok
,需要将此时产生的分布式消息id返回,方便后续commit
、cancel
主要提交在da935cf
commit/cancel
前面的步骤完成了,commit
和cancel
就比较简单了,主要是利用 pre
返回的msg id
,发送channel
的时候,channel
检查对应的id
;如果是commit
就用put
,否则直接丢弃即可
调试
需要加一些状态来确认数据是否正常
// dt debug list, don't use api in production env because it affect performance.
router.Handle("GET", "/topic/list", http_api.Decorate(s.topicListHandler, log, http_api.DtJSON))
router.Handle("GET", "/channel/list", http_api.Decorate(s.channelListHandler, log, http_api.DtJSON))
router.Handle("GET", "/premsg/list", http_api.Decorate(s.preMsgListHandler, log, http_api.DtJSON))
nsq
中使用了router
回查
回查目的本身在于commit/cancel可能会失败
实际为避免出现永久性的prepared消息,有几种方式规避:
- mq回查生产者,查询A服务中的单条事务是否成功
- A服务对于每次修改都有一条伴生记录表,A服务定时器扫描伴生记录表,将未确定的事务状态重新告知mq
- B服务每次都会回调方式查询
对于事务型中间件,回查是MQ与生产者之前的调用,需要MQ回查生产者,nsq
修改难度在于
- nsq设计,生产者与MQ之间不需要保持长链接,按需要时候进行链接
- nsq设计,生产者的单条连接与MQ之间是一问一答模式,不提供类似异步回调的方式
对于个人开发成本太高,还不如新建长链接(心跳包存活)只为回查准备的
最后查询了一下nsq的代码,发现只要建立链接过后,nsq都会维持心跳包(不论生产者,还是消费者)
nsq 往 client 发送 _heartbeat_
,client 回复 NOP
对于 nsq producer的sdk来说,正常对于nsq发送过来的报文,只是识别FrameTypeResponse
格式报文(典型的tlv格式)
回查只需要在异步返回的报文,加入识别FrameTypeMessage
的报文即可
sdk 的提交记录 commit 60b9a7e
dtnsq的提交commit 02d675d
test
截止完成上述功能提交点 dtnsq commit c797454 go-dtnsq commit 60b9a7e
cat consumer.go
package main
import (
"fmt"
dtnsq "github.com/chainhelen/go-dtnsq"
)
func main() {
cfg := dtnsq.NewConfig()
// cfg.Set("agreement_version", "DT")
MQConsumer, err := dtnsq.NewConsumer("testdtnsq", "ch1", cfg)
if err != nil {
panic(fmt.Sprintf("new mq consumer fail, err: %v", err))
}
// 设置消息处理函数
MQConsumer.AddHandler(dtnsq.HandlerFunc(func(message *dtnsq.Message) error {
fmt.Printf("id:%d, body:%s\n", message.ID, message.Body)
return nil
}))
err = MQConsumer.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
panic(fmt.Sprintf("conect mq consumer fail, err: %v", err))
}
<-MQConsumer.StopChan
}
cat producer.go
package main
import (
"fmt"
dtnsq "github.com/chainhelen/go-dtnsq"
"time"
)
func NewMQProducer() *dtnsq.Producer {
cfg := dtnsq.NewConfig()
cfg.Set("agreement_version", "DT")
MQProducer, err := dtnsq.NewProducer("127.0.0.1:4150", cfg)
if err != nil {
panic(fmt.Sprintf("new mq producer fail, err: %v", err))
}
return MQProducer
}
func main() {
producer := NewMQProducer()
// pre、wait 3s、cmt
func() {
if preres, err := producer.PublishDtPre("normal_topic", []byte("hello dtnsq one")); err != nil {
fmt.Printf("pre:%s\n", err.Error())
} else {
time.Sleep(time.Duration(3) * time.Second)
fmt.Printf("pre:%s\n", preres)
if cmtres, err := producer.PublishDtCmt("normal_topic", preres); err != nil {
fmt.Printf("cmt:%s\n", err.Error())
} else {
fmt.Printf("cmt:%s\n", cmtres)
}
}
}()
// pre、wait 3s、cnl
func() {
if preres, err := producer.PublishDtPre("normal_topic", []byte("hello dtnsq tow")); err != nil {
fmt.Printf("pre:%s\n", err.Error())
} else {
time.Sleep(time.Duration(3) * time.Second)
fmt.Printf("pre:%s\n", preres)
if cmtres, err := producer.PublishDtCnl("normal_topic", preres); err != nil {
fmt.Printf("cnl:%s\n", err.Error())
} else {
fmt.Printf("cnl:%s\n", cmtres)
}
}
}()
}
落盘
剩下的功能基本又要致敬别的项目了,待续...