原意
对于微服务架构来说,当数据垂直拆分到各个服务之后,通常通过调用接口的方式(或者异步消息)来保证数据一致性,但是如果出现被调用服务暂时性宕机(或者非平滑发布),那么两边数据就不一致了。
ps : 我认为还有一种情况,由于互联网人员流动性比较高,许多非核心业务中转好几个产品经理手,最后在新业务与旧业务联动的时候,会有遗漏一些业务异常情况,我暂时称之为"数据联动性",目前我除了规范业务流程,没有更好的方案;而本文所要解决的场景,称之为"分布式事务一致性"
dt协议
对于A、B两个服务之间某个请求有分布式事务关联,使用如下方式,将分布式事务转化成单表事务   
    
其中 send preapred 和 A服务的单表sql 顺序是可以随意的  
实现这里需要修改一下nsq的代码,每一个client连接到nsqd都需要做一个协商(IDENTIFY)协议的版本号,通过magic字段来协商,目前nsq支持的是 V1和V2两个版本  
所以我们需要添加一个文件protocol_dt.go,里面的内容主要是照搬protocol_v2.go,提交主要记录主要在 84dfb9  
dtPrePQ
我们需要一些方式来存储 preapred 这种中间态的消息  
对于nsq来说,当整个流程都在内存进行的时候(后续做镜像队列的时候再考虑落磁盘的问题),消息经过topic.go 会被复制到 多个消费者产生的channel.go(c.put),主要是在memoryMsgChan通道处理的(对应处理逻辑在文件nsqd/protocol_dt.go 函数messagePump);  
注意channel.go给消费者投递消息之前,是需要将消息通过StartInFlightTimeout塞到in_flight队列(主要为了保证每条消息至少被消费一次语义);如果投递消费者的消息长时间没有得到ACK,in_flight会触发超时逻辑,拿出来重新塞到in_deffer队列(为了做延迟重试);    
为了快速实现preapred队列,可以照搬一些优先队列in_flight的逻辑,当然有几点不同:  
- 将
dt协议类型的message赋予几种状态pre cmt cnl - 插入时机是在
topic.go复制消息 到channel.go的时候,此时暂时不需要put,临时保存消息即可 - 返回给生产者的内容,不能是再是简单的
ok,需要将此时产生的分布式消息id返回,方便后续commit、cancel 
主要提交在da935cf
commit/cancel
前面的步骤完成了,commit和cancel就比较简单了,主要是利用 pre 返回的msg id ,发送channel的时候,channel检查对应的id;如果是commit就用put,否则直接丢弃即可      
调试
需要加一些状态来确认数据是否正常
// dt debug list, don't use api in production env because it affect performance.
 router.Handle("GET", "/topic/list", http_api.Decorate(s.topicListHandler, log, http_api.DtJSON))
router.Handle("GET", "/channel/list", http_api.Decorate(s.channelListHandler, log, http_api.DtJSON))  
router.Handle("GET", "/premsg/list", http_api.Decorate(s.preMsgListHandler, log, http_api.DtJSON))  
nsq中使用了router  
回查
回查目的本身在于commit/cancel可能会失败 
实际为避免出现永久性的prepared消息,有几种方式规避:   
- mq回查生产者,查询A服务中的单条事务是否成功
 - A服务对于每次修改都有一条伴生记录表,A服务定时器扫描伴生记录表,将未确定的事务状态重新告知mq
 - B服务每次都会回调方式查询
 
对于事务型中间件,回查是MQ与生产者之前的调用,需要MQ回查生产者,nsq修改难度在于  
- nsq设计,生产者与MQ之间不需要保持长链接,按需要时候进行链接
 - nsq设计,生产者的单条连接与MQ之间是一问一答模式,不提供类似异步回调的方式
 
对于个人开发成本太高,还不如新建长链接(心跳包存活)只为回查准备的
最后查询了一下nsq的代码,发现只要建立链接过后,nsq都会维持心跳包(不论生产者,还是消费者) 
nsq 往 client 发送 _heartbeat_,client 回复 NOP  
对于 nsq producer的sdk来说,正常对于nsq发送过来的报文,只是识别FrameTypeResponse格式报文(典型的tlv格式) 
回查只需要在异步返回的报文,加入识别FrameTypeMessage的报文即可 
sdk 的提交记录 commit 60b9a7e 
dtnsq的提交commit 02d675d
test
截止完成上述功能提交点 dtnsq commit c797454 go-dtnsq commit 60b9a7e
cat consumer.go  
package main
import (  
    "fmt"
    dtnsq "github.com/chainhelen/go-dtnsq"
)
func main() {  
    cfg := dtnsq.NewConfig()
    // cfg.Set("agreement_version", "DT")
    MQConsumer, err := dtnsq.NewConsumer("testdtnsq", "ch1", cfg)
    if err != nil {
        panic(fmt.Sprintf("new mq consumer fail, err: %v", err))
    }
    // 设置消息处理函数
    MQConsumer.AddHandler(dtnsq.HandlerFunc(func(message *dtnsq.Message) error {
        fmt.Printf("id:%d, body:%s\n", message.ID, message.Body)
        return nil
    }))
    err = MQConsumer.ConnectToNSQD("127.0.0.1:4150")
    if err != nil {
        panic(fmt.Sprintf("conect mq consumer fail, err: %v", err))
    }
    <-MQConsumer.StopChan
}
cat producer.go  
package main
import (  
    "fmt"
    dtnsq "github.com/chainhelen/go-dtnsq"
    "time"
)
func NewMQProducer() *dtnsq.Producer {  
    cfg := dtnsq.NewConfig()
    cfg.Set("agreement_version", "DT")
    MQProducer, err := dtnsq.NewProducer("127.0.0.1:4150", cfg)
    if err != nil {
        panic(fmt.Sprintf("new mq producer fail, err: %v", err))
    }
    return MQProducer
}
func main() {  
    producer := NewMQProducer()
    // pre、wait 3s、cmt
    func() {
        if preres, err := producer.PublishDtPre("normal_topic", []byte("hello dtnsq one")); err != nil {
            fmt.Printf("pre:%s\n", err.Error())
        } else {
            time.Sleep(time.Duration(3) * time.Second)
            fmt.Printf("pre:%s\n", preres)
            if cmtres, err := producer.PublishDtCmt("normal_topic", preres); err != nil {
                fmt.Printf("cmt:%s\n", err.Error())
            } else {
                fmt.Printf("cmt:%s\n", cmtres)
            }
        }
    }()
    // pre、wait 3s、cnl
    func() {
        if preres, err := producer.PublishDtPre("normal_topic", []byte("hello dtnsq tow")); err != nil {
            fmt.Printf("pre:%s\n", err.Error())
        } else {
            time.Sleep(time.Duration(3) * time.Second)
            fmt.Printf("pre:%s\n", preres)
            if cmtres, err := producer.PublishDtCnl("normal_topic", preres); err != nil {
                fmt.Printf("cnl:%s\n", err.Error())
            } else {
                fmt.Printf("cnl:%s\n", cmtres)
            }
        }
    }()
}
落盘
剩下的功能基本又要致敬别的项目了,待续...