Kratos 框架中如何优雅的使用 GORM 来完成 MySQL 事务 ?

2022年2月28日 302点热度 0人点赞 0条评论

准备工作

如果您还不了解 Kratos 框架[1]mysql 事务[2]GORM[3] 的话请先了解一下。

  • 创建新项目
kratos new helloworld

cd helloworld
# 拉取项目依赖
go mod download

# 项目中的 config 配置等请自行修改

应该在架构的那一层使用事务

kratos 框架中的目录分层主要有以下目录:

├── internal
│ ├── biz // 业务逻辑的组装层,但 repo 接口定义在这里
│ │ ├── biz.go
│ │ └── greeter.go
│ ├── conf
│ │ ├── conf.pb.go
│ │ └── conf.proto
│ ├── data // 业务数据访问,包含 cache、db 等封装,实现了 biz 的 repo 接口。
│ │ ├── data.go
│ │ └── greeter.go
│ ├── server // http和grpc实例的创建和配置
│ │ ├── grpc.go
│ │ ├── http.go
│ │ └── server.go
│ └── service // 实现了 api 定义的服务层,类似 DDD 的 application 层
│ ├── README.md
│ ├── greeter.go
│ └── service.go
  • data 层承载事务是否比较合适?

根据官方的目录介绍,其实最简单也最容易想到的方法就是在 data 层的具体操作数据库的方法中使用事务,比如,用户消费的业务,用户消费成功之后,修改用户积分表的数据(这里假设你没有消息队列之类的中间件),那么用户消费表和用户积分表两个表都要添加数据的时候,用户的消费记录更改与积分增加必须在一个事务里面。

但是如果加入到 data 层的具体每个方法的话,当上层(biz)有个业务是只消费不增加积分呢?这时候就用不到积分表了。这时你又要增加一个独立的只操作用户消费表的方法。随着业务增多,难道我们就一种逻辑写一个方法么,这样你会发现你写了大量的重复的方法,想想就可怕,

其次如果写到 data 层还有一个问题就是把业务逻辑下沉到了具体的 repo 中。

  • biz 层承载事务
    所以我们发现直接写到 data 层不太好,那么我们再往上看一层,去 Usecase biz 层处理,我们决定在 Usecase 来解决 data 层写了很多重复的方法这个问题。这时候我们在 biz 层提供一个事务接口,然后 usecase 进行调用, data 层的方法,只需要判断一下 DB 实例是不是事务实例,保证是事务执行的就执行事务,不是事务执行的就正常执行。

添加事务

集成 gorm 事务

  • 修改 internal/biz/biz.go

在这里定义 repo 事务接口,具体的业务逻辑是否会使用事务,使用依赖倒置的原则,

package biz

...

# 新增事务接口方法
type Transaction interface {
 ExecTx(context.Context, func(ctx context.Context) errorerror
}

  • 修改 internal/data/data.go

引入 gorm,实现 biz 的 repo 事务接口

package data

import (
    "context"
    "github.com/go-kratos/kratos/v2/log"
    "github.com/google/wire"
    "gorm.io/driver/mysql"
    "gorm.io/gorm"
    "helloworld/internal/biz"
    "helloworld/internal/conf"
)

// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewDB, NewTransaction, NewConsumeRepo, NewCredRepo)

// Data .
type Data struct {
    db  *gorm.DB
    log *log.Helper
}

// 用来承载事务的上下文
type contextTxKey struct{}

// NewTransaction .
func NewTransaction(d *Data) biz.Transaction {
    return d
}

// ExecTx gorm Transaction
func (d *Data) ExecTx(ctx context.Context, fn func(ctx context.Context) errorerror {
    return d.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
        ctx = context.WithValue(ctx, contextTxKey{}, tx)
        return fn(ctx)
    })
}

// DB 根据此方法来判断当前的 db 是不是使用 事务的 DB
func (d *Data) DB(ctx context.Context) *gorm.DB {
    tx, ok := ctx.Value(contextTxKey{}).(*gorm.DB)
    if ok {
        return tx
    }
    return d.db
}

// NewData .
func NewData(db *gorm.DB, logger log.Logger) (*Data, func()error) {
    l := log.NewHelper(log.With(logger, "module""transaction/data"))
    d := &Data{
        db:  db,
        log: l,
    }
    return d, func() {
    }, nil
}

// NewDB gorm Connecting to a Database
func NewDB(conf *conf.Data, logger log.Logger) *gorm.DB {
    log := log.NewHelper(log.With(logger, "module""order-service/data/gorm"))
    db, err := gorm.Open(mysql.Open(conf.Database.Source), &gorm.Config{})
    if err != nil {
        log.Fatalf("failed opening connection to mysql: %v", err)
    }
    if err := db.AutoMigrate(&Consume{}, &Cred{}); err != nil {
        log.Fatal(err)
    }
    return db
}

  • 修改 internal/service/greeter.go

刚才初始化项目之后,默认提供了一个 SayHello 方法,咱们就假设这个 GET 请求的方法是要用到事务的业务


...

// SayHello implements helloworld.GreeterServer
func (s *GreeterService) SayHello(ctx context.Context, in *v1.HelloRequest) (*v1.HelloReply, error) {
    // 这里手动指定数据信息
    consumeID, err := s.uc.Consume(ctx, &biz.Consume{
        UserID:     1,
        OrderID:    "202202251234567890",
        OrderPrice: 500,
    })

    if err != nil {
        return nil, err
    }

    return &v1.HelloReply{Message: "消费记录生成" + strconv.FormatInt(consumeID, 10)}, nil
}

  • 修改 internal/biz/greeter.go

编写业务方法 Consume,可以看到 Consume 方法使用了 repo 定义的 ExecTx 开启事务方法

package biz

import (
    "context"
    "github.com/go-kratos/kratos/v2/log"
)

type Consume struct {
    ID         int64
    UserID     int64
    OrderID    string
    OrderPrice int64
}

type Cred struct {
    ID       int64
    UserID   int64
    Source   int64
    Integral int64
}

type ConsumeRepo interface {
    CreateConsume(ctx context.Context, a *Consume) (int64, error)
}

type CredRepo interface {
    CreateCred(ctx context.Context, cred *Cred) (int64, error)
}

type GreeterUsecase struct {
    consumeRepo ConsumeRepo
    cardRepo    CredRepo
    tx          Transaction
    log         *log.Helper
}

func NewGreeterUsecase(repo ConsumeRepo, cardRepo CredRepo, tx Transaction, logger log.Logger) *GreeterUsecase {
    return &GreeterUsecase{
        consumeRepo: repo,
        cardRepo:    cardRepo,
        tx:          tx,
        log:         log.NewHelper(logger),
    }
}

func (uc *GreeterUsecase) Consume(ctx context.Context, c *Consume) (int64, error) {
    var (
        err error
        id  int64
    )
    // 调用事务实例
    err = uc.tx.ExecTx(ctx, func(ctx context.Context) error {
        id, err = uc.consumeRepo.CreateConsume(ctx, c)
        if err != nil {
            return err
        }

        _, err = uc.cardRepo.CreateCred(ctx, &Cred{
            UserID:   c.UserID,
            Source:   id,
            Integral: c.OrderPrice,
        })
        if err != nil {
            return err
        }
        return nil
    })
    if err != nil {
        return 0, err
    }
    return id, nil
}

  • 修改 internal/data/greeter.go

实现 repo 接口定义的方法

package data

import (
    "context"
    "github.com/go-kratos/kratos/v2/log"
    "helloworld/internal/biz"
)

type consumeRepo struct {
    data *Data
    log  *log.Helper
}

type credRepo struct {
    data *Data
    log  *log.Helper
}

type Consume struct {
    ID         int64
    UserID     int64
    OrderID    string
    OrderPrice int64
}

type Cred struct {
    ID       int64
    UserID   int64
    Source   int64
    Integral int64
}

// NewConsumeRepo  .
func NewConsumeRepo(data *Data, logger log.Logger) biz.ConsumeRepo {
    return &consumeRepo{
        data: data,
        log:  log.NewHelper(logger),
    }
}

func NewCredRepo(data *Data, logger log.Logger) biz.CredRepo {
    return &credRepo{
        data: data,
        log:  log.NewHelper(logger),
    }
}

func (c *consumeRepo) CreateConsume(ctx context.Context, a *biz.Consume) (int64, error) {
    consume := Consume{
        UserID:     a.UserID,
        OrderID:    a.OrderID,
        OrderPrice: a.OrderPrice,
    }
    result := c.data.DB(ctx).Create(&consume)
    return consume.ID, result.Error
}

func (c *credRepo) CreateCred(ctx context.Context, a *biz.Cred) (int64, error) {
    cred := Cred{
        UserID:   a.UserID,
        Source:   a.Source,
        Integral: a.Integral,
    }
    result := c.data.DB(ctx).Create(&cred)
    return cred.ID, result.Error
}

测试

  • 重新生成依赖
cd cmd/helloworld
# 执行 wire 命令
wire
# 回到主目录
cd ../../
# 启动服务
kratos run
  • 测试接口
curl 'http://127.0.0.1:8000/helloworld/kratos'

输出:
{
 "message""消费记录生成1"
}

结束

修改 data/greeter.go 中的 CreateConsume 或者 CreateCred 方法,return 一个错误,重启服务,再次访问地址,回到数据库查看,你会发现库中还是之前生成的一条记录,新的记录并未存储到库中,至此事务集成成功。

感谢您的耐心阅读,您准备动动手指点个赞还是关注呢。

参考

[1]

Kratos: https://go-kratos.dev/docs/

[2]

mysql 事务: https://www.runoob.com/mysql/mysql-transaction.html

[3]

GORM: https://gorm.io/zh_CN/docs/index.html

34030Kratos 框架中如何优雅的使用 GORM 来完成 MySQL 事务 ?

这个人很懒,什么都没留下

文章评论