使用golang理解mysql的两阶段提交

2020年4月5日 303点热度 0人点赞 0条评论
图片

文章源于一个问题:如果我们现在有两个mysql实例,在我们要尽量简单地完成分布式事务,怎么处理?

场景重现

比如我们现在有两个数据库,mysql3306和mysql3307。这里我们使用docker来创建这两个实例:

  1. # mysql3306创建命令

  2. docker run -d -p 3306:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3306.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3306:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7


  3. # msyql3306的配置:

  4. [mysqld]

  5. pid-file = /var/run/mysqld/mysqld.pid

  6. socket = /var/run/mysqld/mysqld.sock

  7. datadir = /var/lib/mysql

  8. server-id = 1

  9. log_bin = mysql-bin

  10. binlog_format = ROW

  11. expire_logs_days = 30


  12. # mysql3307创建命令

  13. docker run -d -p 3307:3306 -v /Users/yjf/Documents/workspace/mysql-docker/my3307.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -v /Users/yjf/Documents/workspace/mysql-docker/data3307:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name mysql-3307 mysql:5.7


  14. # msyql3307的配置:

  15. [mysqld]

  16. pid-file = /var/run/mysqld/mysqld.pid

  17. socket = /var/run/mysqld/mysqld.sock

  18. datadir = /var/lib/mysql

  19. server-id = 2

  20. log_bin = mysql-bin

  21. binlog_format = ROW

  22. expire_logs_days = 30

在mysql3306中
我们有一个user表

  1. create table user (

  2. id int,

  3. name varchar(10),

  4. score int

  5. );



  6. insert into user values(1, "foo", 10)

在mysql3307中,我们有一个wallet表。

  1. create table wallet (

  2. id int,

  3. money float

  4. );



  5. insert into wallet values(1, 10.1)

我们可以看到,id为1的用户初始分数(score)为10,而它的钱,在wallet中初始钱(money)为10.1。

现在假设我们有一个操作,需要对这个用户进行操作:每次操作增加分数2,并且增加钱数1.2。

这个操作需要很强的一致性。

思考

两阶段提交

这里是一个分布式事务的概念,我们可以使用2PC的方法进行保证事务

图片

2PC的概念如图所示,引入一个资源协调者的概念,由这个资源协调者进行事务协调。

第一阶段,由这个资源协调者对每个mysql实例调用prepare命令,让所有的mysql实例准备好,如果其中由mysql实例没有准备好,协调者就让所有实例调用rollback命令进行回滚。如果所有mysql都prepare完成,那么就进入第二阶段。

第二阶段,资源协调者让每个mysql实例都调用commit方法,进行提交。

mysql里面也提供了分布式事务的语句XA。

用单个实例的事务行不行

等等,这个两阶段提交和我们的事务感觉也差不多,都是进行一次开始,然后执行,最后commit,mysql为什么还要专门定义一个xa的命令呢?于是我陷入了思考...

思考不如实操,于是我用golang写了一个使用mysql的事务实现的“两阶段提交”:

  1. package main


  2. import (

  3. "database/sql"

  4. "fmt"


  5. _ "github.com/go-sql-driver/mysql"

  6. "github.com/pkg/errors"

  7. )


  8. func main() {

  9. var err error


  10. // db1的连接

  11. db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")

  12. if err != nil {

  13. panic(err.Error())

  14. }

  15. defer db1.Close()


  16. // db2的连接

  17. db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")

  18. if err != nil {

  19. panic(err.Error())

  20. }

  21. defer db2.Close()


  22. // 开始前显示

  23. var score int

  24. db1.QueryRow("select score from user where id = 1").Scan(&score)

  25. fmt.Println("user1 score:", score)

  26. var money float64

  27. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  28. fmt.Println("wallet1 money:", money)


  29. tx1, err := db1.Begin()

  30. if err != nil {

  31. panic(errors.WithStack(err))

  32. }

  33. tx2, err := db2.Begin()

  34. if err != nil {

  35. panic(errors.WithStack(err))

  36. }


  37. defer func() {

  38. if err := recover(); err != nil {

  39. fmt.Printf("%+v\n", err)

  40. fmt.Println("=== call rollback ====")

  41. tx1.Rollback()

  42. tx2.Rollback()

  43. }


  44. db1.QueryRow("select score from user where id = 1").Scan(&score)

  45. fmt.Println("user1 score:", score)

  46. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  47. fmt.Println("wallet1 money:", money)

  48. }()


  49. // DML操作

  50. if _, err = tx1.Exec("update user set score=score+2 where id =1"); err != nil {

  51. panic(errors.WithStack(err))

  52. }

  53. if _, err = tx2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {

  54. panic(errors.WithStack(err))

  55. }


  56. // panic(errors.New("commit before error"))


  57. // commit

  58. fmt.Println("=== call commit ====")

  59. err = tx1.Commit()

  60. if err != nil {

  61. panic(errors.WithStack(err))

  62. }


  63. // panic(errors.New("commit db2 before error"))


  64. err = tx2.Commit()

  65. if err != nil {

  66. panic(errors.WithStack(err))

  67. }


  68. db1.QueryRow("select score from user where id = 1").Scan(&score)

  69. fmt.Println("user1 score:", score)

  70. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  71. fmt.Println("wallet1 money:", money)

  72. }

我这里已经非常小心地在defer中recover错误信息,并且执行了rollback命令。

如果我在commit命令之前的任意一个地方调用了 panic(errors.New("commit before error")) 那么命令就会进入到了rollback这里,就会把两个实例的事务都进行回滚。

图片

通过结果我们可以看到,分数和钱数都没有改变。这个是ok的。

但是如果我在db2的commit之前触发了panic,那么这个命令进入到了rollback中,但是db1已经commit了,db2还没有commit,这个时候会出现什么情况?

图片

非常可惜,我们看到了这里的score增长了,但是money没有增长,这个就说明无法做到事务一致性了。

回到mysql的xa

那么还要回归到2PC,mysql为2PC的实现增加了xa命令,那么使用这个命令我们能不能避免这个问题呢?

同样,我用golang写了一个使用xa命令的代码

  1. package main


  2. import (

  3. "database/sql"

  4. "fmt"

  5. "strconv"

  6. "time"


  7. _ "github.com/go-sql-driver/mysql"

  8. "github.com/pkg/errors"

  9. )


  10. func main() {

  11. var err error


  12. // db1的连接

  13. db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")

  14. if err != nil {

  15. panic(err.Error())

  16. }

  17. defer db1.Close()


  18. // db2的连接

  19. db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")

  20. if err != nil {

  21. panic(err.Error())

  22. }

  23. defer db2.Close()


  24. // 开始前显示

  25. var score int

  26. db1.QueryRow("select score from user where id = 1").Scan(&score)

  27. fmt.Println("user1 score:", score)

  28. var money float64

  29. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  30. fmt.Println("wallet1 money:", money)


  31. // 生成xid

  32. xid := strconv.FormatInt(time.Now().Unix(), 10)

  33. fmt.Println("=== xid:" + xid + " ====")

  34. defer func() {

  35. if err := recover(); err != nil {

  36. fmt.Printf("%+v\n", err)

  37. fmt.Println("=== call rollback ====")

  38. db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))

  39. db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))

  40. }


  41. db1.QueryRow("select score from user where id = 1").Scan(&score)

  42. fmt.Println("user1 score:", score)

  43. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  44. fmt.Println("wallet1 money:", money)

  45. }()


  46. // XA 启动

  47. fmt.Println("=== call start ====")

  48. if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {

  49. panic(errors.WithStack(err))

  50. }

  51. if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {

  52. panic(errors.WithStack(err))

  53. }


  54. // DML操作

  55. if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {

  56. panic(errors.WithStack(err))

  57. }

  58. if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {

  59. panic(errors.WithStack(err))

  60. }


  61. // XA end

  62. fmt.Println("=== call end ====")

  63. if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {

  64. panic(errors.WithStack(err))

  65. }

  66. if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {

  67. panic(errors.WithStack(err))

  68. }


  69. // prepare

  70. fmt.Println("=== call prepare ====")

  71. if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {

  72. panic(errors.WithStack(err))

  73. }

  74. // panic(errors.New("db2 prepare error"))

  75. if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {

  76. panic(errors.WithStack(err))

  77. }


  78. // commit

  79. fmt.Println("=== call commit ====")

  80. if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {

  81. panic(errors.WithStack(err))

  82. }

  83. // panic(errors.New("db2 commit error"))

  84. if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {

  85. panic(errors.WithStack(err))

  86. }


  87. db1.QueryRow("select score from user where id = 1").Scan(&score)

  88. fmt.Println("user1 score:", score)

  89. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  90. fmt.Println("wallet1 money:", money)

  91. }

首先看成功的情况:图片

一切完美。

如果我们在prepare阶段抛出panic,那么结果如下:

图片

证明在第一阶段出现异常是可以回滚的。

但是如果我们在commit阶段抛出panic:

图片

我们发现,这里的分数增加了,但是money却没有增加。

那么这个xa和单个事务有什么区别呢?我又陷入了深深的沉思...

xa的用法不对

经过在技术群(全栈神盾局)请教,讨论之后,发现这里对2pc的两个阶段理解还没到位,这里之所以分为两个阶段,是强调的是每个阶段都会持久化,就是第一个阶段完成了之后,每个mysql实例就把第一个阶段的请求实例化了,这个时候不管是mysql实例停止了还是其他问题,每次重启的时候都会重新回复这个commit。

我们把这个代码的rollback去掉,假设commit必须成功。

  1. package main


  2. import (

  3. "database/sql"

  4. "fmt"

  5. "strconv"

  6. "time"


  7. _ "github.com/go-sql-driver/mysql"

  8. "github.com/pkg/errors"

  9. )


  10. func main() {

  11. var err error


  12. // db1的连接

  13. db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")

  14. if err != nil {

  15. panic(err.Error())

  16. }

  17. defer db1.Close()


  18. // db2的连接

  19. db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")

  20. if err != nil {

  21. panic(err.Error())

  22. }

  23. defer db2.Close()


  24. // 开始前显示

  25. var score int

  26. db1.QueryRow("select score from user where id = 1").Scan(&score)

  27. fmt.Println("user1 score:", score)

  28. var money float64

  29. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  30. fmt.Println("wallet1 money:", money)


  31. // 生成xid

  32. xid := strconv.FormatInt(time.Now().Unix(), 10)

  33. fmt.Println("=== xid:" + xid + " ====")

  34. defer func() {

  35. if err := recover(); err != nil {

  36. fmt.Printf("%+v\n", err)

  37. fmt.Println("=== call rollback ====")

  38. // db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))

  39. // db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))

  40. }


  41. db1.QueryRow("select score from user where id = 1").Scan(&score)

  42. fmt.Println("user1 score:", score)

  43. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  44. fmt.Println("wallet1 money:", money)

  45. }()


  46. // XA 启动

  47. fmt.Println("=== call start ====")

  48. if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {

  49. panic(errors.WithStack(err))

  50. }

  51. if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {

  52. panic(errors.WithStack(err))

  53. }


  54. // DML操作

  55. if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {

  56. panic(errors.WithStack(err))

  57. }

  58. if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {

  59. panic(errors.WithStack(err))

  60. }


  61. // XA end

  62. fmt.Println("=== call end ====")

  63. if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {

  64. panic(errors.WithStack(err))

  65. }

  66. if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {

  67. panic(errors.WithStack(err))

  68. }


  69. // prepare

  70. fmt.Println("=== call prepare ====")

  71. if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {

  72. panic(errors.WithStack(err))

  73. }

  74. // panic(errors.New("db2 prepare error"))

  75. if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {

  76. panic(errors.WithStack(err))

  77. }


  78. // commit

  79. fmt.Println("=== call commit ====")

  80. if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {

  81. panic(errors.WithStack(err))

  82. }

  83. panic(errors.New("db2 commit error"))

  84. if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {

  85. panic(errors.WithStack(err))

  86. }


  87. db1.QueryRow("select score from user where id = 1").Scan(&score)

  88. fmt.Println("user1 score:", score)

  89. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  90. fmt.Println("wallet1 money:", money)

  91. }

图片

这个时候,我们停掉程序(停掉mysql的链接),使用 xa recover可以发现,db2的xa事务还留在db2中了。图片

我们在控制台直接调用 xa commit'1585644880' 还能继续把这个xa事务进行提交。

图片

这下money就进行了提交,又恢复了一致性。

所以呢,我琢磨了一下,我们写xa的代码应该如下:

  1. package main


  2. import (

  3. "database/sql"

  4. "fmt"

  5. "log"

  6. "strconv"

  7. "time"


  8. _ "github.com/go-sql-driver/mysql"

  9. "github.com/pkg/errors"

  10. )


  11. func main() {

  12. var err error


  13. // db1的连接

  14. db1, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3306)/hade1")

  15. if err != nil {

  16. panic(err.Error())

  17. }

  18. defer db1.Close()


  19. // db2的连接

  20. db2, err := sql.Open("mysql", "root:123456@tcp(127.0.0.1:3307)/hade2")

  21. if err != nil {

  22. panic(err.Error())

  23. }

  24. defer db2.Close()


  25. // 开始前显示

  26. var score int

  27. db1.QueryRow("select score from user where id = 1").Scan(&score)

  28. fmt.Println("user1 score:", score)

  29. var money float64

  30. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  31. fmt.Println("wallet1 money:", money)


  32. // 生成xid

  33. xid := strconv.FormatInt(time.Now().Unix(), 10)

  34. fmt.Println("=== xid:" + xid + " ====")

  35. defer func() {

  36. if err := recover(); err != nil {

  37. fmt.Printf("%+v\n", err)

  38. fmt.Println("=== call rollback ====")

  39. db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))

  40. db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))

  41. }


  42. db1.QueryRow("select score from user where id = 1").Scan(&score)

  43. fmt.Println("user1 score:", score)

  44. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  45. fmt.Println("wallet1 money:", money)

  46. }()


  47. // XA 启动

  48. fmt.Println("=== call start ====")

  49. if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {

  50. panic(errors.WithStack(err))

  51. }

  52. if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {

  53. panic(errors.WithStack(err))

  54. }


  55. // DML操作

  56. if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {

  57. panic(errors.WithStack(err))

  58. }

  59. if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {

  60. panic(errors.WithStack(err))

  61. }


  62. // XA end

  63. fmt.Println("=== call end ====")

  64. if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {

  65. panic(errors.WithStack(err))

  66. }

  67. if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {

  68. panic(errors.WithStack(err))

  69. }


  70. // prepare

  71. fmt.Println("=== call prepare ====")

  72. if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {

  73. panic(errors.WithStack(err))

  74. }

  75. // panic(errors.New("db2 prepare error"))

  76. if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {

  77. panic(errors.WithStack(err))

  78. }


  79. // commit

  80. fmt.Println("=== call commit ====")

  81. if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {

  82. // TODO: 尝试重新提交COMMIT

  83. // TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交

  84. log.Println("xid:" + xid)

  85. }

  86. // panic(errors.New("db2 commit error"))

  87. if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {

  88. log.Println("xid:" + xid)

  89. }


  90. db1.QueryRow("select score from user where id = 1").Scan(&score)

  91. fmt.Println("user1 score:", score)

  92. db2.QueryRow("select money from wallet where id = 1").Scan(&money)

  93. fmt.Println("wallet1 money:", money)

  94. }

就是第二阶段的commit,我们必须设定它一定会“成功”,如果有不成功的情况,那么就需要记录下不成功的xid,有一个数据恢复逻辑,重新commit这个xid。来保证最终一致性。

binlog

其实我们使用binlog也能看出一些端倪

  1. # 这里的mysql-bin.0003替换成为你当前的log

  2. SHOW BINLOG EVENTS in 'mysql-bin.000003';

  1. ## XA的binlog

  2. | mysql-bin.000003 | 1967 | Anonymous_Gtid | 1 | 2032 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |

  3. | mysql-bin.000003 | 2032 | Query | 1 | 2138 | XA START X'31353835363338363233',X'',1 |

  4. | mysql-bin.000003 | 2138 | Table_map | 1 | 2190 | table_id: 108 (hade1.user) |

  5. | mysql-bin.000003 | 2190 | Update_rows | 1 | 2252 | table_id: 108 flags: STMT_END_F |

  6. | mysql-bin.000003 | 2252 | Query | 1 | 2356 | XA END X'31353835363338363233',X'',1 |

  7. | mysql-bin.000003 | 2356 | XA_prepare | 1 | 2402 | XA PREPARE X'31353835363338363233',X'',1 |

  8. | mysql-bin.000003 | 2402 | Anonymous_Gtid | 1 | 2467 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |

  9. | mysql-bin.000003 | 2467 | Query | 1 | 2574 | XA COMMIT X'31353835363338363233',X'',1




  10. ## 非xa的事务

  11. | mysql-bin.000003 | 2574 | Anonymous_Gtid | 1 | 2639 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS' |

  12. | mysql-bin.000003 | 2639 | Query | 1 | 2712 | BEGIN |

  13. | mysql-bin.000003 | 2712 | Table_map | 1 | 2764 | table_id: 108 (hade1.user) |

  14. | mysql-bin.000003 | 2764 | Update_rows | 1 | 2826 | table_id: 108 flags: STMT_END_F |

  15. | mysql-bin.000003 | 2826 | Xid | 1 | 2857 | COMMIT /* xid=67 */

我们很明显可以看到两阶段提交中是有两个GTID的,生成一个GTID就代表内部生成一个事务,所以第一个阶段prepare结束之后,第二个阶段commit的时候就持久化了第一个阶段的内容,并且生成了第二个事务。当commit失败的时候,最多就是第二个事务丢失,第一个事务实际上已经保存起来了了(只是还没commit)。

而非xa的事务,只有一个GTID,在commit之前任意一个阶段出现问题,整个事务就全部丢失,无法找回了。所以这就是mysql xa命令的机制。

总结

看了一些资料,原来mysql从5.7之后才真正实现了两阶段的xa。当然这个两阶段方式在真实的工程中的使用其实很少的,xa的第一定律是避免使用xa。工程中会有很多方式来避免这种分库的事务情况。

不过,不妨碍掌握了mysql的xa,在一些特定的场合,我们也能完美解决问题。

图片

图片

Hi,我是轩脉刃,一个名不见经传码农,体制内的小愤青,躁动的骚年,2020年想坚持写一些学习/工作/思考笔记,谓之倒逼学习。欢迎关注个人公众号:轩脉刃的刀光剑影。

图片

MORE | 更多原创文章

 gorm日志模块源码解析

② 测试用例是开发人员最后一块遮羞布

③ mariaDB vs mysql的区别

④ 一次composer错误使用引发的思考

⑤ colly源码学习

⑥ 使用chan的时候选择对象还是指针

69020使用golang理解mysql的两阶段提交

这个人很懒,什么都没留下

文章评论