Go工程师体系课 013【学习笔记】

Table of Contents

订单事务

  • 先扣库存 后扣库存 都会对库存和订单都会有影响, 所以要使用分布式事务
  • 业务(下单不对付)业务问题
  • 支付成功再扣减(下单了,支付时没库存了)
  • 订单扣减,不支付(订单超时归还)【常用方式】

事务和分布式事务

1. 什么是事务?

事务(Transaction)是数据库管理系统中的一个重要概念,它是一组数据库操作的集合,这些操作要么全部成功执行,要么全部失败回滚。

1.1 事务的 ACID 特性

  • 原子性(Atomicity):事务中的所有操作要么全部成功,要么全部失败,不存在部分成功的情况
  • 一致性(Consistency):事务执行前后,数据库从一个一致状态转换到另一个一致状态
  • 隔离性(Isolation):并发执行的事务之间相互隔离,一个事务的执行不应影响其他事务
  • 持久性(Durability):事务一旦提交,其结果就永久保存在数据库中

1.2 事务的隔离级别

  1. 读未提交(Read Uncommitted):最低级别,可能读到脏数据
  2. 读已提交(Read Committed):只能读到已提交的数据
  3. 可重复读(Repeatable Read):同一事务中多次读取结果一致
  4. 串行化(Serializable):最高级别,完全串行执行

2. 什么是分布式事务?

分布式事务是指涉及多个数据库或服务的事务操作,需要保证跨多个节点的数据一致性。

2.1 分布式事务的挑战

  • 网络分区:网络故障导致节点间通信中断
  • 节点故障:某个节点宕机或重启
  • 时钟不同步:各节点时间不一致
  • 数据一致性:如何保证跨节点的数据一致性

2.2 CAP 理论

  • 一致性(Consistency):所有节点在同一时间看到相同的数据(更新返回客户端后 )
  • 可用性(Availability):系统持续可用,不会出现操作失败
  • 分区容错性(Partition Tolerance):系统能够容忍网络分区故障

CAP 定理:在分布式系统中,最多只能同时满足 CAP 中的两个特性。

2.3 BASE 理论(与 CAP 的工程化取舍)

  • Basically Available(基本可用):在发生故障时,允许系统降级提供有限功能(如响应变慢、部分功能不可用)
  • Soft state(软状态):系统状态允许在一段时间内存在中间态(未强一致)
  • Eventual consistency(最终一致):经过一段时间(或重试/补偿)后,数据达到一致

工程实践中:多数互联网业务选择 AP → 以 BASE 理论为指导,牺牲强一致,换取高可用与可扩展,通过“补偿、重试、去重、对账”实现最终一致。

3. 分布式事务解决方案

3.1 两阶段提交(2PC)

原理

  1. 准备阶段:协调者询问所有参与者是否可以提交
  2. 提交阶段:根据参与者响应决定提交或回滚

优点:强一致性
缺点:性能差、单点故障、阻塞问题

流程细化(示意)

  1. 协调者向参与者发送 prepare 请求,各参与者预留资源、写预提交日志,并返回 yes/no
  2. 协调者汇总:全部 yes → 下发 commit;任一 no/超时 → 下发 rollback
  3. 参与者根据指令提交或回滚,并回执协调者

常见问题:协调者单点、参与者阻塞(长时间持锁),网络分区时恢复复杂。

sequenceDiagram
  participant C as 协调者(Coordinator)
  participant P1 as 参与者1
  participant P2 as 参与者2

  C->>P1: prepare
  C->>P2: prepare
  P1-->>C: yes/预提交成功
  P2-->>C: yes/预提交成功
  alt 全部yes
    C->>P1: commit
    C->>P2: commit
    P1-->>C: ack
    P2-->>C: ack
  else 任一no/超时
    C->>P1: rollback
    C->>P2: rollback
  end

3.2 三阶段提交(3PC)

在 2PC 基础上增加预提交阶段,减少阻塞时间,但仍存在单点故障问题。

3.3 TCC(Try-Confirm-Cancel)

原理

  • Try:尝试执行业务,预留资源
  • Confirm:确认执行业务,提交资源
  • Cancel:取消执行业务,释放资源

优点:性能好、无阻塞
缺点:实现复杂、需要业务补偿

落地要点(以订单-库存-支付为例)

  • Try:创建订单预状态、预占库存(扣减可用库存、增加预占库存)、预下单支付
  • Confirm(支付成功回调或异步确认):订单变为已支付、库存从预占转正式扣减
  • Cancel(支付失败/超时):订单取消、释放预占库存

实现细节:接口幂等(去重表/唯一业务键)、空回滚/悬挂处理、事务日志记录与重试任务。

sequenceDiagram
  participant Order as 订单服务
  participant Inv as 库存服务
  participant Pay as 支付服务

  rect rgb(230,250,230)
  Note over Order,Inv: Try 阶段(预留资源)
  Order->>Inv: Try 预占库存
  Order->>Pay: Try 预下单/冻结
  end

  alt 支付成功
    rect rgb(230,230,255)
    Note over Order,Inv: Confirm 阶段
    Pay-->>Order: 支付成功回调
    Order->>Inv: Confirm 扣减库存
    Order->>Pay: Confirm 确认扣款
    end
  else 失败/超时
    rect rgb(255,230,230)
    Note over Order,Inv: Cancel 阶段
    Order->>Inv: Cancel 释放预占
    Order->>Pay: Cancel 解冻/撤销
    end
  end

3.4 基于消息的最终一致性

原理

  1. 本地事务执行
  2. 发送消息到消息队列
  3. 消费者处理消息,保证最终一致性

优点:性能好、实现相对简单
缺点:只能保证最终一致性

3.4.1 基于本地消息表(Outbox Pattern)

流程:同库同事务内写业务数据与 outbox 消息表 → 后台转发器轮询投递到 MQ → 消费者处理并落库 → 发送确认/对账。

要点:

  • 生产端强一致(业务+消息同库同事务)
  • 转发幂等(按消息 ID 投递、消费去重)
  • 失败重试与死信队列、人工对账修复
flowchart LR
  A[应用/业务服务] -->|同库同事务| B[(业务表 + Outbox表)]
  B -->|后台转发器扫描/拉取| MQ[消息队列]
  MQ --> C[下游服务]
  C --> D[(消费落库/去重表)]

  subgraph 重试与对账
    E[失败重投/死信队列]
    F[对账/人工修复]
  end
  MQ --> E
  E --> F
3.4.2 基于可靠消息的最终一致性(常用)

流程:

  1. 业务方向 MQ 申请“预消息/半消息”(prepare)
  2. 业务本地提交成功后调用 MQ 确认(commit),否则回滚(rollback)
  3. MQ 挂起未确认的半消息并回查(check)业务方最终状态,决定提交或丢弃

要点:

  • 依赖 MQ 的事务消息/回查能力(RocketMQ 等)
  • 生产与消费两端均需幂等处理
sequenceDiagram
  participant Biz as 业务服务
  participant MQ as MQ(事务消息)
  participant D as 下游服务

  Biz->>MQ: 发送半消息(Prepare)
  Biz->>Biz: 执行业务本地事务
  alt 成功
    Biz->>MQ: Commit 确认
  else 失败
    Biz->>MQ: Rollback 撤销
  end
  MQ->>D: 投递正式消息
  D-->>MQ: Ack/重试

  MQ->>Biz: 事务回查(Check) 未确认半消息
  Biz-->>MQ: 返回最终状态(提交/回滚)
3.4.3 最大努力通知

流程:事件发生后向下游发起通知(HTTP/MQ),失败则按策略重试若干次,超过阈值进入人工处理。

适用:对一致性要求相对宽松的场景(如短信、站内信、积分发放)。

flowchart LR
  A[事件源] --> B{通知}
  B -->|HTTP/MQ| C[下游]
  B --> R1[重试1]
  R1 --> R2[重试2]
  R2 --> R3[重试N]
  R3 --> DLQ[死信/人工补偿]
  C --> Idem[去重/幂等处理]

4. 订单系统中的事务处理

4.1 库存扣减问题

在订单系统中,库存扣减是关键操作:

// 库存扣减示例
func (s *InventoryServer) Sell(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    // 开启事务
    tx := global.DB.Begin()
    if tx.Error != nil {
        return nil, status.Error(codes.Internal, "开启事务失败")
    }

    // 遍历所有商品
    for _, goodsInfo := range req.GoodsInvInfo {
        var inv model.Inventory
        // 使用行锁查询
        result := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
            Where("goods_id = ?", goodsInfo.GoodsId).
            First(&inv)

        // 检查库存是否充足
        if inv.Stock < goodsInfo.Num {
            tx.Rollback()
            return nil, status.Error(codes.ResourceExhausted, "库存不足")
        }

        // 使用乐观锁更新库存
        updateResult := tx.Model(&model.Inventory{}).
            Where("goods_id = ? AND version = ?", goodsInfo.GoodsId, inv.Version).
            Updates(map[string]interface{}{
                "stock":   inv.Stock - goodsInfo.Num,
                "version": inv.Version + 1,
            })
    }

    // 提交事务
    if err := tx.Commit().Error; err != nil {
        return nil, status.Error(codes.Internal, "提交事务失败")
    }
    return &emptypb.Empty{}, nil
}

4.2 分布式锁解决并发问题

// 基于Redis的分布式锁
func (s *InventoryServer) SellWithDistributedLock(ctx context.Context, req *proto.SellInfo) (*emptypb.Empty, error) {
    // 获取分布式锁
    lockKey := fmt.Sprintf("inventory_lock_%d", req.GoodsInvInfo[0].GoodsId)
    lock := s.redisClient.NewMutex(lockKey, time.Second*10)

    if err := lock.Lock(); err != nil {
        return nil, status.Error(codes.Internal, "获取锁失败")
    }
    defer lock.Unlock()

    // 执行库存扣减逻辑
    return s.Sell(ctx, req)
}

5. 业务场景分析

5.1 下单不支付问题

问题:用户下单后不支付,导致库存被占用

解决方案

  1. 订单超时机制:设置订单超时时间,超时后自动取消
  2. 库存预占:下单时预占库存,支付成功后确认扣减
  3. 定时任务:定期清理超时订单,释放库存

5.2 支付时库存不足问题

问题:下单时库存充足,支付时库存不足

解决方案

  1. 库存预占:下单时预占库存,避免超卖
  2. 支付时再次检查:支付前再次验证库存
  3. 补偿机制:库存不足时提供替代方案

6. 最佳实践

  1. 合理使用事务:避免长事务,减少锁竞争
  2. 选择合适的隔离级别:根据业务需求选择
  3. 使用乐观锁:减少锁竞争,提高并发性能
  4. 实现重试机制:处理临时性失败
  5. 监控和告警:及时发现和处理问题

7. 总结

事务和分布式事务是保证数据一致性的重要机制。在微服务架构中,需要根据业务场景选择合适的分布式事务解决方案,平衡一致性、可用性和性能。订单系统作为典型的分布式事务场景,需要特别注意库存扣减、订单状态管理等关键操作的数据一致性。

总结与思考

TCC 分布式事务总结

总结一下,你要玩儿 TCC 分布式事务的话:

  1. 首先需要选择某种 TCC 分布式事务框架,各个服务里就会有这个 TCC 分布式事务框架在运行。
  2. 然后你原本的一个接口,要改造为 3 个逻辑:Try-Confirm-Cancel。

TCC 流程:

  • 先是服务调用链路依次执行 Try 逻辑
  • 如果都正常的话,TCC 分布式事务框架推进执行 Confirm 逻辑,完成整个事务
  • 如果某个服务的 Try 逻辑有问题,TCC 分布式事务框架感知到之后就会推进执行各个服务的 Cancel 逻辑,撤销之前执行的各种操作
  • 这就是所谓的 TCC 分布式事务

TCC 分布式事务的核心思想,说白了,就是当遇到下面这些情况时:

  1. 某个服务的数据库宕机了
  2. 某个服务自己挂了
  3. 那个服务的 redis、elasticsearch、MQ 等基础设施故障了
  4. 某些资源不足了,比如说库存不够这些

进一步解释:

  • 先来 Try 一下,不到 3:20 业务逻辑完成,先试试看,看各个服务能可能基本正常运转,能不能先冻结我需要的资源
  • 如果 Try 都 ok,也就是说,底层的数据库、redis、elasticsearch、MQ 都是可以写入数据的,并且你保留好了需要使用的一些资源(比如冻结了一部分库存)

基于本地消息表的最终一致性

本地消息表方案详解

方案来源:本地消息表方案最初由 eBay 提出,是一种经典的分布式事务最终一致性解决方案。

核心原理

  • 通过本地事务来保证数据业务操作和消息的一致性
  • 使用定时任务将消息发送到消息中间件(如 MQ)
  • 消息只有在成功投递给消费者后才被删除

实现流程

1. 基本流程

  1. 业务操作:在本地数据库中执行业务操作
  2. 消息记录:在同一个本地事务中,将消息记录到本地消息表
  3. 消息发送:定时任务扫描本地消息表,将未发送的消息发送到 MQ
  4. 消息确认:消费者处理完消息后,发送确认回执
  5. 消息清理:收到确认后,从本地消息表中删除该消息

2. 关键设计要点

本地消息表结构

CREATE TABLE local_message (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    business_id VARCHAR(64) NOT NULL,  -- 业务ID
    message_content TEXT NOT NULL,     -- 消息内容
    message_status TINYINT DEFAULT 0,  -- 0:待发送 1:已发送 2:已确认
    retry_count INT DEFAULT 0,         -- 重试次数
    created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_created (message_status, created_time)
);

幂等性保证

  • 消费者端需要实现幂等性处理
  • 使用业务唯一标识避免重复处理
  • 记录已处理消息的 ID 或业务键

重试机制

  • 消息发送失败时进行重试
  • 设置最大重试次数,超过后进入死信队列
  • 采用指数退避策略避免频繁重试

实际应用示例:注册送积分

业务场景:用户注册成功后,自动赠送积分

涉及服务

  • 用户服务:负责用户注册
  • 积分服务:负责积分管理

实现步骤

  1. 用户注册:用户服务接收注册请求
  2. 新增用户:在用户表中插入新用户记录
  3. 记录消息:在同一事务中,向本地消息表插入"赠送积分"消息
  4. 发送消息:定时任务将消息发送到 MQ
  5. 处理积分:积分服务消费消息,为用户增加积分
  6. 确认处理:积分服务处理完成后发送确认
  7. 清理消息:用户服务收到确认后删除本地消息

流程图解

sequenceDiagram
    participant User as 用户
    participant UserService as 用户服务
    participant LocalDB as 本地数据库
    participant MessageTable as 本地消息表
    participant Scheduler as 定时任务
    participant MQ as 消息队列
    participant PointsService as 积分服务

    User->>UserService: 1. 用户注册请求
    UserService->>LocalDB: 2. 开启本地事务
    UserService->>LocalDB: 3. 新增用户记录
    UserService->>MessageTable: 4. 插入消息记录
    UserService->>LocalDB: 5. 提交事务

    Note over Scheduler: 定时扫描未发送消息
    Scheduler->>MessageTable: 6. 查询待发送消息
    Scheduler->>MQ: 7. 发送消息到MQ
    Scheduler->>MessageTable: 8. 更新消息状态为已发送

    MQ->>PointsService: 9. 投递消息
    PointsService->>PointsService: 10. 处理积分逻辑
    PointsService->>MQ: 11. 发送确认ACK

    Note over Scheduler: 收到确认后清理
    Scheduler->>MessageTable: 12. 删除已确认消息

架构图

graph TB
    subgraph "用户服务"
        A[用户注册接口] --> B[本地数据库]
        B --> C[本地消息表]
    end

    subgraph "消息处理"
        D[定时任务扫描器] --> C
        D --> E[消息队列MQ]
    end

    subgraph "积分服务"
        E --> F[积分服务消费者]
        F --> G[积分数据库]
    end

    subgraph "监控与重试"
        H[重试机制] --> D
        I[死信队列] --> D
        J[监控告警] --> D
    end

    A --> D
    F --> E

优缺点分析

优点

  1. 强一致性:本地事务保证业务操作和消息记录的一致性
  2. 可靠性高:消息持久化存储,不会丢失
  3. 实现简单:不需要复杂的事务协调器
  4. 性能较好:异步处理,不阻塞主业务流程

缺点

  1. 最终一致性:只能保证最终一致性,不能保证强一致性
  2. 消息延迟:定时任务扫描存在延迟
  3. 资源消耗:需要额外的存储空间和定时任务
  4. 复杂度增加:需要处理消息重试、死信等场景

适用场景

  1. 对一致性要求不是特别严格的业务场景
  2. 可以接受最终一致性的分布式系统
  3. 消息处理可以异步进行的场景
  4. 需要保证消息不丢失的重要业务

最佳实践

  1. 消息表设计:合理设计消息表结构,添加必要的索引
  2. 批量处理:定时任务批量处理消息,提高效率
  3. 监控告警:监控消息积压、处理失败等情况
  4. 幂等设计:消费者端必须实现幂等性处理
  5. 异常处理:完善的异常处理和重试机制

基于可靠消息的最终一致性(事务消息)

RocketMQ 事务消息方案详解

方案特点:基于 RocketMQ 的事务消息机制,实现分布式事务的最终一致性。

核心原理

  • 通过 RocketMQ 的事务消息功能,保证消息发送和业务操作的一致性
  • 利用消息回查机制,确保事务的最终一致性
  • 支持半消息(Half Message)和事务回查(Transaction Check)

RocketMQ 事务消息流程

1. 基本流程

  1. 发送半消息:业务方发送半消息到 RocketMQ
  2. 执行业务:执行业务逻辑(本地事务)
  3. 提交/回滚:根据业务执行结果,提交或回滚事务消息
  4. 消息投递:RocketMQ 将已提交的消息投递给消费者
  5. 事务回查:对于未确认的半消息,RocketMQ 会回查业务方状态

2. 关键概念

半消息(Half Message)

  • 对消费者不可见的消息
  • 用于确保消息发送和业务操作的一致性
  • 只有事务提交后才会投递给消费者

事务回查(Transaction Check)

  • RocketMQ 主动回查业务方的事务状态
  • 用于处理网络异常、服务重启等场景
  • 确保事务消息的最终一致性

RocketMQ 事务消息流程图

sequenceDiagram
    participant Producer as 业务生产者
    participant RMQ as RocketMQ
    participant Consumer as 业务消费者
    participant LocalDB as 本地数据库

    Note over Producer,LocalDB: 第一阶段:发送半消息
    Producer->>RMQ: 1. 发送半消息(事务消息)
    RMQ-->>Producer: 2. 返回半消息发送结果

    Note over Producer,LocalDB: 第二阶段:执行业务逻辑
    Producer->>LocalDB: 3. 开启本地事务
    Producer->>LocalDB: 4. 执行业务操作
    Producer->>LocalDB: 5. 记录事务状态

    alt 业务执行成功
        Note over Producer,LocalDB: 第三阶段:提交事务
        Producer->>RMQ: 6a. 提交事务(commit)
        RMQ->>RMQ: 7a. 将半消息转为正式消息
        RMQ->>Consumer: 8a. 投递消息给消费者
        Consumer->>Consumer: 9a. 处理业务逻辑
        Consumer-->>RMQ: 10a. 返回消费确认
    else 业务执行失败
        Note over Producer,LocalDB: 第三阶段:回滚事务
        Producer->>RMQ: 6b. 回滚事务(rollback)
        RMQ->>RMQ: 7b. 丢弃半消息
    end

    Note over Producer,RMQ: 事务回查机制
    RMQ->>Producer: 11. 事务回查(未收到commit/rollback)
    Producer->>LocalDB: 12. 查询本地事务状态
    Producer-->>RMQ: 13. 返回事务状态(commit/rollback)

    alt 回查结果为commit
        RMQ->>RMQ: 14a. 将半消息转为正式消息
        RMQ->>Consumer: 15a. 投递消息
    else 回查结果为rollback
        RMQ->>RMQ: 14b. 丢弃半消息
    end

详细架构图

graph TB
    subgraph "业务生产者"
        A[业务应用] --> B[事务消息发送器]
        B --> C[本地事务执行器]
        C --> D[本地数据库]
        C --> E[事务状态记录]
    end

    subgraph "RocketMQ集群"
        F[NameServer] --> G[Broker Master]
        G --> H[Broker Slave]
        I[事务消息存储] --> G
        J[消息队列] --> G
    end

    subgraph "业务消费者"
        K[消息消费者] --> L[业务处理逻辑]
        L --> M[消费者数据库]
    end

    subgraph "事务回查"
        N[事务回查服务] --> E
        N --> G
    end

    B --> F
    G --> K
    G --> N

    style I fill:#e1f5fe
    style N fill:#fff3e0

实现示例代码

生产者端实现

// RocketMQ 事务消息生产者
type TransactionProducer struct {
    producer rocketmq.TransactionProducer
}

// 发送事务消息
func (tp *TransactionProducer) SendTransactionMessage(topic, message string) error {
    msg := &rocketmq.Message{
        Topic: topic,
        Body:  []byte(message),
    }

    // 发送事务消息
    result, err := tp.producer.SendMessageInTransaction(msg, func(msg *rocketmq.Message) rocketmq.LocalTransactionState {
        // 执行本地事务
        return tp.executeLocalTransaction(msg)
    })

    if err != nil {
        return err
    }

    log.Printf("事务消息发送结果: %s", result.String())
    return nil
}

// 执行本地事务
func (tp *TransactionProducer) executeLocalTransaction(msg *rocketmq.Message) rocketmq.LocalTransactionState {
    // 开启本地事务
    tx := global.DB.Begin()
    if tx.Error != nil {
        return rocketmq.RollbackMessage
    }

    // 执行业务逻辑
    if err := tp.doBusinessLogic(msg); err != nil {
        tx.Rollback()
        return rocketmq.RollbackMessage
    }

    // 记录事务状态
    if err := tp.recordTransactionState(msg, "COMMIT"); err != nil {
        tx.Rollback()
        return rocketmq.RollbackMessage
    }

    tx.Commit()
    return rocketmq.CommitMessage
}

// 事务回查
func (tp *TransactionProducer) checkLocalTransaction(msg *rocketmq.Message) rocketmq.LocalTransactionState {
    // 查询本地事务状态
    state, err := tp.getTransactionState(msg)
    if err != nil {
        return rocketmq.RollbackMessage
    }

    switch state {
    case "COMMIT":
        return rocketmq.CommitMessage
    case "ROLLBACK":
        return rocketmq.RollbackMessage
    default:
        return rocketmq.Unknow
    }
}

消费者端实现

// RocketMQ 消息消费者
type MessageConsumer struct {
    consumer rocketmq.PushConsumer
}

// 消费消息
func (mc *MessageConsumer) ConsumeMessage(msgs []*rocketmq.MessageExt) rocketmq.ConsumeResult {
    for _, msg := range msgs {
        // 幂等性检查
        if mc.isProcessed(msg) {
            continue
        }

        // 处理业务逻辑
        if err := mc.processBusinessLogic(msg); err != nil {
            // 记录处理失败,稍后重试
            mc.recordProcessFailure(msg, err)
            return rocketmq.ConsumeRetryLater
        }

        // 记录处理成功
        mc.recordProcessSuccess(msg)
    }

    return rocketmq.ConsumeSuccess
}

// 幂等性检查
func (mc *MessageConsumer) isProcessed(msg *rocketmq.MessageExt) bool {
    // 检查消息是否已处理
    var count int64
    global.DB.Model(&ProcessedMessage{}).
        Where("message_id = ?", msg.MsgId).
        Count(&count)

    return count > 0
}

关键配置

RocketMQ 配置

# RocketMQ 配置
rocketmq:
  name-server: '127.0.0.1:9876'
  producer:
    group: 'transaction-producer-group'
    send-message-timeout: 3000
    retry-times: 2
  consumer:
    group: 'business-consumer-group'
    consume-timeout: 30000
    max-reconsume-times: 3

优缺点分析

优点

  1. 强一致性保证:通过事务消息机制保证消息和业务的一致性
  2. 自动回查:RocketMQ 自动处理事务回查,减少业务复杂度
  3. 高性能:异步处理,不阻塞主业务流程
  4. 可靠性高:消息持久化,支持集群部署

缺点

  1. 依赖特定 MQ:需要支持事务消息的消息队列
  2. 实现复杂:需要处理事务回查逻辑
  3. 最终一致性:只能保证最终一致性,不能保证强一致性
  4. 资源消耗:需要额外的存储和处理资源

适用场景

  1. 对一致性要求较高的分布式事务场景
  2. 可以接受最终一致性的业务系统
  3. 使用 RocketMQ 作为消息中间件的系统
  4. 需要保证消息不丢失的重要业务

最佳实践

  1. 事务回查实现:必须实现可靠的事务回查逻辑
  2. 幂等性处理:消费者端必须实现幂等性处理
  3. 异常处理:完善的异常处理和重试机制
  4. 监控告警:监控事务消息的发送和消费情况
  5. 性能优化:合理设置消息队列参数,优化性能

最大努力通知

最大努力通知方案详解

方案特点:最大努力通知是一种相对宽松的分布式事务解决方案,适用于对一致性要求不是特别严格的场景。

核心原理

  • 业务方在完成本地事务后,尽力通知其他业务方
  • 如果通知失败,则按照策略进行重试
  • 超过最大重试次数后,进入人工处理流程

支付通知场景分析

业务场景

以电商支付为例:

  1. 用户支付:用户完成支付操作
  2. 支付成功:支付系统确认支付成功
  3. 通知订单:支付系统通知订单系统更新订单状态
  4. 通知库存:支付系统通知库存系统扣减库存
  5. 通知积分:支付系统通知积分系统增加用户积分

实现流程

sequenceDiagram
    participant User as 用户
    participant Payment as 支付系统
    participant Order as 订单系统
    participant Inventory as 库存系统
    participant Points as 积分系统
    participant Notification as 通知服务
    participant Retry as 重试服务
    participant Manual as 人工处理

    User->>Payment: 1. 发起支付
    Payment->>Payment: 2. 处理支付逻辑
    Payment->>Payment: 3. 更新支付状态

    Note over Payment,Notification: 异步通知阶段
    Payment->>Notification: 4. 发送通知任务

    par 并行通知多个系统
        Notification->>Order: 5a. 通知订单系统
        Order-->>Notification: 6a. 返回处理结果
    and
        Notification->>Inventory: 5b. 通知库存系统
        Inventory-->>Notification: 6b. 返回处理结果
    and
        Notification->>Points: 5c. 通知积分系统
        Points-->>Notification: 6c. 返回处理结果
    end

    alt 通知成功
        Notification->>Notification: 7a. 记录通知成功
    else 通知失败
        Notification->>Retry: 7b. 进入重试队列
        Retry->>Retry: 8. 按策略重试

        alt 重试成功
            Retry->>Notification: 9a. 标记通知成功
        else 超过最大重试次数
            Retry->>Manual: 9b. 进入人工处理
            Manual->>Manual: 10. 人工干预处理
        end
    end

详细架构设计

graph TB
    subgraph "支付系统"
        A[支付处理] --> B[支付状态更新]
        B --> C[通知任务创建]
    end

    subgraph "通知服务"
        D[通知调度器] --> E[通知执行器]
        E --> F[通知记录表]
        G[重试调度器] --> H[重试执行器]
        H --> I[重试记录表]
    end

    subgraph "下游系统"
        J[订单系统] --> K[订单状态更新]
        L[库存系统] --> M[库存扣减]
        N[积分系统] --> O[积分增加]
    end

    subgraph "监控告警"
        P[失败监控] --> Q[告警通知]
        R[人工处理] --> S[补偿操作]
    end

    C --> D
    E --> J
    E --> L
    E --> N
    F --> G
    I --> P
    P --> R

    style F fill:#e8f5e8
    style I fill:#fff3e0
    style R fill:#ffebee

核心实现代码

1. 通知任务模型

// 通知任务模型
type NotificationTask struct {
    ID           int64     `json:"id" gorm:"primaryKey"`
    BusinessID   string    `json:"business_id" gorm:"index"`   // 业务ID
    BusinessType string    `json:"business_type"`              // 业务类型
    TargetURL    string    `json:"target_url"`                 // 通知目标URL
    Payload      string    `json:"payload"`                    // 通知内容
    Status       int       `json:"status"`                     // 0:待通知 1:通知成功 2:通知失败
    RetryCount   int       `json:"retry_count"`                // 重试次数
    MaxRetry     int       `json:"max_retry"`                  // 最大重试次数
    NextRetryAt  time.Time `json:"next_retry_at"`              // 下次重试时间
    CreatedAt    time.Time `json:"created_at"`
    UpdatedAt    time.Time `json:"updated_at"`
}

// 通知记录模型
type NotificationLog struct {
    ID         int64     `json:"id" gorm:"primaryKey"`
    TaskID     int64     `json:"task_id" gorm:"index"`
    Status     int       `json:"status"`     // 0:成功 1:失败
    Response   string    `json:"response"`   // 响应内容
    ErrorMsg   string    `json:"error_msg"`  // 错误信息
    CreatedAt  time.Time `json:"created_at"`
}

2. 通知服务实现

// 通知服务
type NotificationService struct {
    db     *gorm.DB
    client *http.Client
    logger *log.Logger
}

// 创建通知任务
func (ns *NotificationService) CreateNotificationTask(businessID, businessType, targetURL, payload string) error {
    task := &NotificationTask{
        BusinessID:   businessID,
        BusinessType: businessType,
        TargetURL:    targetURL,
        Payload:      payload,
        Status:       0, // 待通知
        RetryCount:   0,
        MaxRetry:     5, // 默认最大重试5次
        NextRetryAt:  time.Now(),
    }

    return ns.db.Create(task).Error
}

// 执行通知
func (ns *NotificationService) ExecuteNotification(task *NotificationTask) error {
    // 构建HTTP请求
    req, err := http.NewRequest("POST", task.TargetURL, strings.NewReader(task.Payload))
    if err != nil {
        return err
    }

    req.Header.Set("Content-Type", "application/json")
    req.Header.Set("X-Business-ID", task.BusinessID)
    req.Header.Set("X-Business-Type", task.BusinessType)

    // 发送请求
    resp, err := ns.client.Do(req)
    if err != nil {
        ns.recordNotificationLog(task.ID, 1, "", err.Error())
        return ns.handleNotificationFailure(task)
    }
    defer resp.Body.Close()

    // 读取响应
    body, _ := io.ReadAll(resp.Body)

    if resp.StatusCode == 200 {
        // 通知成功
        ns.recordNotificationLog(task.ID, 0, string(body), "")
        return ns.handleNotificationSuccess(task)
    } else {
        // 通知失败
        ns.recordNotificationLog(task.ID, 1, string(body), fmt.Sprintf("HTTP %d", resp.StatusCode))
        return ns.handleNotificationFailure(task)
    }
}

// 处理通知成功
func (ns *NotificationService) handleNotificationSuccess(task *NotificationTask) error {
    return ns.db.Model(task).Updates(map[string]interface{}{
        "status": 1, // 通知成功
    }).Error
}

// 处理通知失败
func (ns *NotificationService) handleNotificationFailure(task *NotificationTask) error {
    task.RetryCount++

    if task.RetryCount >= task.MaxRetry {
        // 超过最大重试次数,标记为失败
        return ns.db.Model(task).Updates(map[string]interface{}{
            "status": 2, // 通知失败
        }).Error
    } else {
        // 计算下次重试时间(指数退避)
        nextRetryAt := time.Now().Add(time.Duration(math.Pow(2, float64(task.RetryCount))) * time.Minute)
        return ns.db.Model(task).Updates(map[string]interface{}{
            "retry_count":   task.RetryCount,
            "next_retry_at": nextRetryAt,
        }).Error
    }
}

// 记录通知日志
func (ns *NotificationService) recordNotificationLog(taskID int64, status int, response, errorMsg string) {
    log := &NotificationLog{
        TaskID:    taskID,
        Status:    status,
        Response:  response,
        ErrorMsg:  errorMsg,
        CreatedAt: time.Now(),
    }

    ns.db.Create(log)
}

3. 重试服务实现

// 重试服务
type RetryService struct {
    db                *gorm.DB
    notificationSvc   *NotificationService
    logger           *log.Logger
}

// 启动重试服务
func (rs *RetryService) Start() {
    ticker := time.NewTicker(30 * time.Second) // 每30秒检查一次
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            rs.processRetryTasks()
        }
    }
}

// 处理重试任务
func (rs *RetryService) processRetryTasks() {
    var tasks []NotificationTask

    // 查询需要重试的任务
    err := rs.db.Where("status = ? AND retry_count < max_retry AND next_retry_at <= ?",
        0, time.Now()).Find(&tasks).Error

    if err != nil {
        rs.logger.Printf("查询重试任务失败: %v", err)
        return
    }

    for _, task := range tasks {
        go rs.retryNotification(&task)
    }
}

// 重试通知
func (rs *RetryService) retryNotification(task *NotificationTask) {
    rs.logger.Printf("开始重试通知任务: %d, 重试次数: %d", task.ID, task.RetryCount+1)

    if err := rs.notificationSvc.ExecuteNotification(task); err != nil {
        rs.logger.Printf("重试通知失败: %v", err)
    }
}

4. 支付系统集成

// 支付服务
type PaymentService struct {
    db                *gorm.DB
    notificationSvc   *NotificationService
}

// 处理支付成功
func (ps *PaymentService) HandlePaymentSuccess(paymentID string, amount float64, userID int64) error {
    // 开启事务
    tx := ps.db.Begin()
    if tx.Error != nil {
        return tx.Error
    }

    // 更新支付状态
    if err := tx.Model(&Payment{}).Where("id = ?", paymentID).
        Updates(map[string]interface{}{
            "status": "SUCCESS",
            "paid_at": time.Now(),
        }).Error; err != nil {
        tx.Rollback()
        return err
    }

    // 提交事务
    if err := tx.Commit().Error; err != nil {
        return err
    }

    // 异步创建通知任务
    go ps.createNotificationTasks(paymentID, amount, userID)

    return nil
}

// 创建通知任务
func (ps *PaymentService) createNotificationTasks(paymentID string, amount float64, userID int64) {
    // 通知订单系统
    orderPayload := map[string]interface{}{
        "payment_id": paymentID,
        "status": "PAID",
        "amount": amount,
    }
    payload, _ := json.Marshal(orderPayload)
    ps.notificationSvc.CreateNotificationTask(
        paymentID, "PAYMENT_SUCCESS",
        "http://order-service/api/payment/notify",
        string(payload),
    )

    // 通知库存系统
    inventoryPayload := map[string]interface{}{
        "payment_id": paymentID,
        "user_id": userID,
        "action": "DEDUCT",
    }
    payload, _ = json.Marshal(inventoryPayload)
    ps.notificationSvc.CreateNotificationTask(
        paymentID, "PAYMENT_SUCCESS",
        "http://inventory-service/api/payment/notify",
        string(payload),
    )

    // 通知积分系统
    pointsPayload := map[string]interface{}{
        "user_id": userID,
        "points": int(amount), // 1元=1积分
        "source": "PAYMENT",
    }
    payload, _ = json.Marshal(pointsPayload)
    ps.notificationSvc.CreateNotificationTask(
        paymentID, "PAYMENT_SUCCESS",
        "http://points-service/api/points/add",
        string(payload),
    )
}

重试策略配置

# 通知配置
notification:
  max_retry: 5 # 最大重试次数
  retry_interval: 30s # 重试检查间隔
  exponential_backoff: true # 是否使用指数退避
  base_delay: 1m # 基础延迟时间
  max_delay: 30m # 最大延迟时间

  # 不同业务类型的重试策略
  business_strategies:
    PAYMENT_SUCCESS:
      max_retry: 5
      base_delay: 1m
    ORDER_UPDATE:
      max_retry: 3
      base_delay: 30s
    INVENTORY_DEDUCT:
      max_retry: 5
      base_delay: 2m

监控和告警

// 监控服务
type MonitorService struct {
    db *gorm.DB
}

// 检查失败通知
func (ms *MonitorService) CheckFailedNotifications() {
    var failedCount int64

    // 统计24小时内失败的通知
    ms.db.Model(&NotificationTask{}).
        Where("status = ? AND updated_at >= ?", 2, time.Now().Add(-24*time.Hour)).
        Count(&failedCount)

    if failedCount > 10 { // 失败数量超过阈值
        ms.sendAlert(fmt.Sprintf("通知失败数量过多: %d", failedCount))
    }
}

// 发送告警
func (ms *MonitorService) sendAlert(message string) {
    // 发送邮件、短信、钉钉等告警
    log.Printf("告警: %s", message)
}

优缺点分析

优点

  1. 实现简单:逻辑清晰,易于理解和实现
  2. 性能较好:异步处理,不阻塞主业务流程
  3. 灵活性高:可以针对不同业务设置不同的重试策略
  4. 容错性强:通过重试机制提高成功率

缺点

  1. 最终一致性:只能保证最终一致性,不能保证强一致性
  2. 数据可能不一致:在重试期间,数据可能处于不一致状态
  3. 需要人工干预:超过重试次数后需要人工处理
  4. 监控复杂:需要完善的监控和告警机制

适用场景

  1. 对一致性要求相对宽松的业务场景
  2. 可以接受最终一致性的分布式系统
  3. 通知类业务:如支付通知、状态变更通知等
  4. 非核心业务:如积分发放、消息推送等

最佳实践

  1. 合理设置重试策略:根据业务重要性设置不同的重试次数和间隔
  2. 实现幂等性:接收方必须实现幂等性处理
  3. 完善监控:监控通知成功率、失败率等关键指标
  4. 人工处理流程:建立完善的人工处理流程和工具
  5. 数据对账:定期进行数据对账,发现和处理不一致问题

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://joyjs.cn/archives/4786

(0)
Walker的头像Walker
上一篇 2025年11月25日 13:00
下一篇 2025年11月25日 11:00

相关推荐

  • 测试专题

    个人 2025年2月7日
    1.2K00
  • 深入理解ES6 010【学习笔记】

    改进的数组功能 new Array()的怪异行为,当构造函数传入一个数值型的值,那么数组的length属性会被设为该值;如果传入多个值,此时无论这些值是不是数值型的,都会变为数组的元素。这个特性另人困惑,你不可能总是注意传入数据的类型,所以存在一定的风险。 Array.of() 无论传多少个参数,不存在单一数值的特例(一个参数且数值型),总是返回包含所有参数…

    个人 2025年3月8日
    1.0K00
  • Go工程师体系课 003【学习笔记】

    grpc grpc grpc-go grpc 无缝集成了 protobuf protobuf 习惯用 Json、XML 数据存储格式的你们,相信大多都没听过 Protocol Buffer。 Protocol Buffer 其实是 Google 出品的一种轻量 & 高效的结构化数据存储格式,性能比 Json、XML 真的强!太!多! protobuf…

    个人 2025年11月25日
    4800
  • Go工程师体系课 010【学习笔记】

    es 安装 elasticsearch(理解为库) kibana(理解为连接工具)es 和 kibana(5601) 的版本要保持一致 MySQL 对照学习 Elasticsearch(ES) 术语对照 MySQL Elasticsearch database index(索引) table type(7.x 起固定为 _doc,8.x 彻底移除多 type…

    个人 2025年11月25日
    4400
  • Go工程师体系课 017【学习笔记】

    限流、熔断与降级入门(含 Sentinel 实战) 结合课件第 3 章(3-1 ~ 3-9)的视频要点,整理一套面向初学者的服务保护指南,帮助理解“为什么需要限流、熔断和降级”,以及如何用 Sentinel 快速上手。 学习路线速览 3-1 理解服务雪崩与限流、熔断、降级的背景 3-2 Sentinel 与 Hystrix 对比,明确技术选型 3-3 Sen…

    个人 2025年11月25日
    5000

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
欢迎🌹 Coding never stops, keep learning! 💡💻 光临🌹