mysql数据一致性闲谈

外部连接

首先让mysql在远程服务器上,现在希望它能被外部ip访问 在/etc/my.cnf里面,找到bind-address修改为0.0.0.0,

//重启mysql
service mysql restart  
//查看mysql服务
netstat -anp | grep mysqld  

看到如下监听端口已经是0.0.0.0

mysql_0.0.0.0

但是发现外部ip还是访问不了远程的mysql
现在远程服务器上用mysql-client连上

//root 是用户名
mysql -u root -p 

//mysql本身的数据库
use mysql

//创建 create user 'mysqlremoteusername'@'0.0.0.0' identified by 'mysqlremoteuserpassword';
//YourUser 即将使用该用户管理数据库
//YourPassword 即将使用的密码
//%的地方可以写0.0.0.0
grant all privileges on *.* to 'YourUser'@'%' identified by 'YourPassword' with grant option;

//当前user和privilige表中的用户信息/权限设置从mysql库(MySQL数据库的内置库)中提取到内存里
//这样就可以不用重启数据库
flush privileges;

//使用select查看是否插入成功
select Host, User,Password from user;  
数据

新建一个名为test_concurrency数据库

//新建顾客表
CREATE TABLE `customer_table` (  
`id`  varchar(3) NOT NULL ,
`name`  varchar(255) NOT NULL ,
`money`  int(11) NOT NULL ,
PRIMARY KEY (`id`)  
);

//在顾客表里插入两条数据
INSERT INTO `customer_table` (`id`, `name`, `money`) VALUES ('001', 'customer1', '100')  
INSERT INTO `customer_table` (`id`, `name`, `money`) VALUES ('002', 'customer2', '400')

//新建商家表
CREATE TABLE `merchant_table` (  
`id`  varchar(3) NOT NULL ,
`money`  int(11) NOT NULL ,
 PRIMARY KEY (`id`)
);

//在商家表里插入两条数据
INSERT INTO `merchant_table` (`id`, `credit`, `money`) VALUES ('100', '1000', '1000')  

使用[此代码] (http://imgqi.hacking.pub/%E5%B9%B6%E5%8F%91%20mysql%E4%B8%8A%E9%94%81/mysql.go)查看数据,结果如下

test_customer2.png

测试场景

原始
顾客001,100金额
顾客002,400金额
商家100,100金额
过程
顾客001用50金额从商家那买了物品
商家给顾客002退款100金额
期望结果
顾客001,50金额
顾客002,500金额
商家100,50金额

使用正常代码执行,过程

func simulateConcurrencyCustomerCostMoney(CustomerId string, MerchantId string, costMoney int, db *sql.DB) {  
    var stmt *sql.Stmt
    var res sql.Result
    var err error

    //update customer
    stmt, err = db.Prepare("UPDATE `customer_table` SET `money` = `money` - ? where id = ? ")
    if nil != err {
        log.Printf("err %v", err)
        return
    }
    defer stmt.Close()

    res, err = stmt.Exec(costMoney, CustomerId)
    if nil != err {
        log.Printf("exec err %v", err)
        return
    }

    //update merchant
    stmt, err = db.Prepare("UPDATE `merchant_table` SET `money` = `money` + ? where id = ?")
    if nil != err {
        log.Printf("err %v", err)
        return
    }
    defer stmt.Close()

    res, err = stmt.Exec(costMoney, MerchantId)
    if nil != err {
        log.Printf("exec err %v", err)
        return
    }
}

可以看到数据是跟预想的一样

$ go run mysql.go 
2016/11/28 22:27:46 id : 001  
2016/11/28 22:27:46 name : customer1  
2016/11/28 22:27:46 money : 38  
2016/11/28 22:27:46 -----------------  
2016/11/28 22:27:46 id : 002  
2016/11/28 22:27:46 name : customer2  
2016/11/28 22:27:46 money : 400  
2016/11/28 22:27:46 -----------------  
2016/11/28 22:27:46 id : 100  
2016/11/28 22:27:46 money : 1062  
2016/11/28 22:27:46 -----------------  
事务

上述的做法,没有考虑到事务

事务(Transaction)是由一系列对系统中数据进行访问与更新的操作所组成的一个程序执行逻辑单元(Unit),狭义上的事务特指数据库事务。

数据库的事务有四个特性,ACID
A指 原子性(atomicity)。一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做。
C指 一致性(consistency)。事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
I指 隔离性(isolation)。一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
D指 持久性(durability)。持久性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。

个人理解,4个特性中最基本的特性是数据一致性,其他三个都是因为一致性而衍生的。
就原子性来说,单单满足原子性,是不足以满足数据一致性的,比方说
1. 事件一是customer1花费了200元
2. 事件二是customer2花费了200块

e.png

正常事件一流程应该走虚线,但是并发情况下,可能如图所示导致商家最终只增加了一次200

所以单纯保证原子性是不能保证数据一致性,需要引入隔离性 谈到隔离性,就得谈到锁的概念,悲观锁和乐观锁
个人理解的悲观锁,就是在事务开始的时候,把后面需要更新的数据进行提前加锁
乐观锁,就是在更新的时候,才把数据库中和更新的数据之间进行比对时间戳(或者其他标志)

悲观锁容易产生死锁,比方说支付和退款两个事件
支付上锁顺序是顾客、商家
退款上锁顺序是商家、顾客
两个过程就会产生死锁

隔离性分为四个级别:
1. 读未提交:(Read Uncommitted)
2. 读已提交(Read Committed) 大多数数据库默认的隔离级别
3. 可重复读(Repeatable-Read) mysql数据库所默认的级别
4. 序列化(serializable)

读未提交,容易造成“读脏”
读已提交,容易造成“不可重复读”、“幻读”
可重复读,也会造成问题,在参考2中,有明确的例子

以上三个级别,mysql可以都是通过数据库设置的
第四点,就需要我们手动锁机制,当然会影响性能

我用golang做的测试
1. golang语言里面tx在Exec方法select后,如果不使用Scan把结果拿出来,再次使用tx.Exec就会报错

rowOfCus = tx.QueryRow("select * from `customer_table`  
rowOfMer = tx.QueryRow("select * from `merchant_table` WHERE id = ? for update", MerchantId)  
err = rowOfMer.Scan(&mId, &mMoney)  

keng.png

  1. 上锁避免死锁,需要按一定的表顺序上锁,避免
    线程1抢占资源A,需要资源B;
    线程2抢占资源B,需要资源A;
其他

商家退款演示 simulateConcurrencyMerchantRefund

func simulateConcurrencyMerchantRefund(CustomerId string, MerchantId string, RefundMoney int, db *sql.DB) {  
    defer func() {
        if err := recover(); nil != err {
            log.Printf("simulateConcurrencyMerchantRefund get the error %v\n", err)
        }
    }()

    var err error
    //var res sql.Result
    //var affNum int64

    var tx *sql.Tx
    var res sql.Result
    //var rowsOfCus *sql.Rows
    //var rowsOfMer *sql.Rows

    var rowOfCus *sql.Row
    var rowOfMer *sql.Row

    func() {
        defer func() {
            if p := recover(); nil != p {
                switch p := p.(type) {
                case error:
                    err = p
                default:
                    err = fmt.Errorf("%s", p)
                }
            }
            //rollback
            if nil != err {
                if nil != tx {
                    tx.Rollback()
                    checkError(err, "tx.Rollback")
                }
                checkError(err, "tx not start")
            }

            //commit
            if nil != tx {
                log.Printf("the transcation commit\n")
                err = tx.Commit()
                checkError(err, "tx.Commit")
            }
        }()

        //begin transaction
        tx, err = db.Begin()
        checkError(err, "db.Begin")

        var cId string
        var cName string
        var cMoney int
        var mId string
        var mMoney int

        rowOfCus = tx.QueryRow("select * from `customer_table` WHERE id = ? for update", CustomerId)
        err = rowOfCus.Scan(&cId, &cName, &cMoney)
        checkError(err, "tx.QueryRow")
        log.Printf("get the data is %s %s %d\n", cId, cName, cMoney)

        rowOfMer = tx.QueryRow("select * from `merchant_table` WHERE id = ? for update", MerchantId)
        err = rowOfMer.Scan(&mId, &mMoney)
        checkError(err, "tx.QueryRow")
        log.Printf("get the data is %s %d\n", mId, mMoney)

        //not enought money
        if mMoney < RefundMoney {
            log.Printf("the merchant has no enough money\n")
            return
        }

        res, err = tx.Exec("UPDATE `customer_table` SET `money` = `money` + ? WHERE id = ?", RefundMoney, CustomerId)
        checkError(err, "db.Update")
        checkAffectNum(res, 1, "db.Update")

        time.Sleep(10 * time.Second)

        res, err = tx.Exec("UPDATE `merchant_table` SET `money` = `money` - ? WHERE id = ?", RefundMoney, MerchantId)
        checkError(err, "db.Update")
        checkAffectNum(res, 1, "db.Update")
    }()
}

代码
未经授权,禁止转载
by chainhelen

参考

1.分布式系列文章——从ACID到CAP/BASE
2.数据库事务原子性、一致性是怎样实现的?
3.脏读、不可重复读、幻读