nsq 支持分布式事务

原意

对于微服务架构来说,当数据垂直拆分到各个服务之后,通常通过调用接口的方式(或者异步消息)来保证数据一致性,但是如果出现被调用服务暂时性宕机(或者非平滑发布),那么两边数据就不一致了。

ps : 我认为还有一种情况,由于互联网人员流动性比较高,许多非核心业务中转好几个产品经理手,最后在新业务与旧业务联动的时候,会有遗漏一些业务异常情况,我暂时称之为"数据联动性",目前我除了规范业务流程,没有更好的方案;而本文所要解决的场景,称之为"分布式事务一致性"

dt协议

对于A、B两个服务之间某个请求有分布式事务关联,使用如下方式,将分布式事务转化成单表事务

svg

其中 send preapred 和 A服务的单表sql 顺序是可以随意的

实现这里需要修改一下nsq的代码,每一个client连接到nsqd都需要做一个协商(IDENTIFY)协议的版本号,通过magic字段来协商,目前nsq支持的是 V1V2两个版本

所以我们需要添加一个文件protocol_dt.go,里面的内容主要是照搬protocol_v2.go,提交主要记录主要在 84dfb9

dtPrePQ

我们需要一些方式来存储 preapred 这种中间态的消息

对于nsq来说,当整个流程都在内存进行的时候(后续做镜像队列的时候再考虑落磁盘的问题),消息经过topic.go 会被复制到 多个消费者产生的channel.goc.put),主要是在memoryMsgChan通道处理的(对应处理逻辑在文件nsqd/protocol_dt.go 函数messagePump);

注意channel.go给消费者投递消息之前,是需要将消息通过StartInFlightTimeout塞到in_flight队列(主要为了保证每条消息至少被消费一次语义);如果投递消费者的消息长时间没有得到ACKin_flight会触发超时逻辑,拿出来重新塞到in_deffer队列(为了做延迟重试);

为了快速实现preapred队列,可以照搬一些优先队列in_flight的逻辑,当然有几点不同:

  1. dt协议类型的message赋予几种状态pre cmt cnl
  2. 插入时机是在topic.go 复制消息 到 channel.go的时候,此时暂时不需要 put,临时保存消息即可
  3. 返回给生产者的内容,不能是再是简单的ok,需要将此时产生的分布式消息id返回,方便后续commitcancel

主要提交在da935cf

commit/cancel

前面的步骤完成了,commitcancel就比较简单了,主要是利用 pre 返回的msg id ,发送channel的时候,channel检查对应的id;如果是commit就用put,否则直接丢弃即可

调试

需要加一些状态来确认数据是否正常

// dt debug list, don't use api in production env because it affect performance.
 router.Handle("GET", "/topic/list", http_api.Decorate(s.topicListHandler, log, http_api.DtJSON))
router.Handle("GET", "/channel/list", http_api.Decorate(s.channelListHandler, log, http_api.DtJSON))  
router.Handle("GET", "/premsg/list", http_api.Decorate(s.preMsgListHandler, log, http_api.DtJSON))  

nsq中使用了router

回查

回查目的本身在于commit/cancel可能会失败
实际为避免出现永久性的prepared消息,有几种方式规避:

  1. mq回查生产者,查询A服务中的单条事务是否成功
  2. A服务对于每次修改都有一条伴生记录表,A服务定时器扫描伴生记录表,将未确定的事务状态重新告知mq
  3. B服务每次都会回调方式查询

对于事务型中间件,回查是MQ与生产者之前的调用,需要MQ回查生产者,nsq修改难度在于

  1. nsq设计,生产者与MQ之间不需要保持长链接,按需要时候进行链接
  2. nsq设计,生产者的单条连接与MQ之间是一问一答模式,不提供类似异步回调的方式

对于个人开发成本太高,还不如新建长链接(心跳包存活)只为回查准备的

最后查询了一下nsq的代码,发现只要建立链接过后,nsq都会维持心跳包(不论生产者,还是消费者)
nsq 往 client 发送 _heartbeat_,client 回复 NOP

对于 nsq producer的sdk来说,正常对于nsq发送过来的报文,只是识别FrameTypeResponse格式报文(典型的tlv格式)
回查只需要在异步返回的报文,加入识别FrameTypeMessage的报文即可
sdk 的提交记录 commit 60b9a7e
dtnsq的提交commit 02d675d

test

截止完成上述功能提交点 dtnsq commit c797454 go-dtnsq commit 60b9a7e

cat consumer.go  
package main

import (  
    "fmt"
    dtnsq "github.com/chainhelen/go-dtnsq"
)

func main() {  
    cfg := dtnsq.NewConfig()
    // cfg.Set("agreement_version", "DT")
    MQConsumer, err := dtnsq.NewConsumer("testdtnsq", "ch1", cfg)
    if err != nil {
        panic(fmt.Sprintf("new mq consumer fail, err: %v", err))
    }

    // 设置消息处理函数
    MQConsumer.AddHandler(dtnsq.HandlerFunc(func(message *dtnsq.Message) error {
        fmt.Printf("id:%d, body:%s\n", message.ID, message.Body)
        return nil
    }))

    err = MQConsumer.ConnectToNSQD("127.0.0.1:4150")
    if err != nil {
        panic(fmt.Sprintf("conect mq consumer fail, err: %v", err))
    }
    <-MQConsumer.StopChan
}
cat producer.go  
package main

import (  
    "fmt"
    dtnsq "github.com/chainhelen/go-dtnsq"
    "time"
)

func NewMQProducer() *dtnsq.Producer {  
    cfg := dtnsq.NewConfig()
    cfg.Set("agreement_version", "DT")
    MQProducer, err := dtnsq.NewProducer("127.0.0.1:4150", cfg)
    if err != nil {
        panic(fmt.Sprintf("new mq producer fail, err: %v", err))
    }
    return MQProducer
}
func main() {  
    producer := NewMQProducer()

    // pre、wait 3s、cmt
    func() {
        if preres, err := producer.PublishDtPre("normal_topic", []byte("hello dtnsq one")); err != nil {
            fmt.Printf("pre:%s\n", err.Error())
        } else {
            time.Sleep(time.Duration(3) * time.Second)
            fmt.Printf("pre:%s\n", preres)
            if cmtres, err := producer.PublishDtCmt("normal_topic", preres); err != nil {
                fmt.Printf("cmt:%s\n", err.Error())
            } else {
                fmt.Printf("cmt:%s\n", cmtres)
            }
        }
    }()

    // pre、wait 3s、cnl
    func() {
        if preres, err := producer.PublishDtPre("normal_topic", []byte("hello dtnsq tow")); err != nil {
            fmt.Printf("pre:%s\n", err.Error())
        } else {
            time.Sleep(time.Duration(3) * time.Second)
            fmt.Printf("pre:%s\n", preres)
            if cmtres, err := producer.PublishDtCnl("normal_topic", preres); err != nil {
                fmt.Printf("cnl:%s\n", err.Error())
            } else {
                fmt.Printf("cnl:%s\n", cmtres)
            }
        }
    }()
}
落盘

剩下的功能基本又要致敬别的项目了,待续...