Go工程师体系课 014【学习笔记】

rocketmq 快速入门

去我们的各种配置(podman)看是怎么安装的


概念介绍

RocketMQ 是阿里开源、Apache 顶级项目的分布式消息中间件,核心组件:

  • NameServer:服务发现与路由
  • Broker:消息存储、投递、拉取
  • Producer:消息生产者(发送消息)
  • Consumer:消息消费者(订阅并消费消息)
  • Topic/Tag:主题/标签,用于消息分组与过滤

生产与消费模型:Producer 将消息发送到某个 Topic;Broker 进行持久化并供 Consumer 拉取;Consumer 以集群或广播模式消费。

代码示例本章以 Go 为例(伪代码/示意),不同 SDK 方法名略有差异,请以实际版本为准。


按照发送的特点分

1. 同步发送

同步发送会等待 Broker 返回发送结果,适合对可靠性有要求的场景(如下单、创建订单事件)。

// 同步发送
msg := rocketmq.NewMessage("OrderTopic", []byte("order-created"))
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
    // 失败处理/重试
}
log.Printf("SendOK: %v", res)

2. 异步发送

异步发送不会阻塞主线程,通过回调获取结果,适合链路较长或吞吐要求高的场景。

// 异步发送
msg := rocketmq.NewMessage("LogTopic", []byte("user-action"))
producer.SendAsync(context.Background(), msg, func(res *SendResult, err error) {
    if err != nil {
        // 记录失败,后续重试
        return
    }
    log.Printf("AsyncSendOK: %v", res)
})

3. 单向发送(OneWay)

单向发送只负责把消息“尽力而为”地发出,不关心结果,适用于日志收集、埋点等对可靠性要求低的场景。

// 单向发送
_ = producer.SendOneWay(context.Background(), rocketmq.NewMessage("TraceTopic", []byte("trace")))

按照使用功能特点分

1. 普通消息(订阅)

最常见的发布/订阅模型。消费者可采用集群模式(负载均衡)或广播模式(每个消费者都收到)。

// 消费者订阅普通消息
consumer.Subscribe("OrderTopic", rocketmq.FilterByTag("created"), func(msg *MessageExt) ConsumeResult {
    // 幂等处理
    // 业务逻辑...
    return ConsumeSuccess
})

要点:

  • 幂等性:用业务唯一键或去重表避免重复消费
  • 重试与死信:失败返回重试,超过阈值进入 DLQ

2. 顺序消息

顺序消息分为全局顺序和分区顺序。常见做法是按业务键(如订单号)将消息路由到同一个队列,保证“同一订单”的消息有序。

// 生产者按业务键选择队列(示意)
shardingKey := orderID
msg := rocketmq.NewMessage("OrderSeqTopic", []byte("status-changed"))
msg.WithShardingKey(shardingKey)
_, _ = producer.SendSync(ctx, msg)

注意:要保证同一业务键落在同一队列,消费者通常单线程或按队列串行处理。

3. 延时消息(定时/延迟)

用于在指定时间后再投递给消费者,例如“订单超时取消”“支付结果稍后检查”等。

// 发送 30s 后可见的延时消息(不同 SDK 可用 delayLevel 或 deliverTime)
msg := rocketmq.NewMessage("DelayTopic", []byte("close-order"))
msg.SetDelay(time.Second * 30)
_, _ = producer.SendSync(ctx, msg)

实践要点:

  • 合理的延迟等级/绝对投递时间
  • 消费端仍需幂等与补偿

4. 事务消息(分布式事务)

用于保证“本地事务 + 消息”最终一致。流程:发送半消息 → 执行本地事务 → 根据结果 Commit/Rollback;Broker 未收到确认会回查业务状态。

sequenceDiagram
  participant P as Producer
  participant MQ as RocketMQ
  participant DB as LocalDB
  P->>MQ: 发送半消息
  P->>DB: 执行本地事务
  alt 成功
    P->>MQ: Commit
    MQ->>C: 投递正式消息
  else 失败
    P->>MQ: Rollback
  end
  MQ->>P: 回查未确认事务

更多细节可参考本仓库 013.md 中“事务消息”与“TCC/本地消息表”等章节。


生产者与消费者快速示例

// Producer 初始化(示意)
producer, _ := rocketmq.NewProducer(rocketmq.ProducerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-producer-group",
})
defer producer.Shutdown()

// Consumer 初始化(示意)
consumer, _ := rocketmq.NewPushConsumer(rocketmq.ConsumerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-consumer-group",
    Model:      rocketmq.Clustering, // 或 Broadcasting
})
defer consumer.Shutdown()

分布式事务消息的优势

  • 解耦:上下游通过事件协作,降低强耦合
  • 弹性与可扩展:异步削峰,支持高并发
  • 可靠性:消息持久化,失败可重试/对账
  • 最终一致:在 AP 取舍下通过补偿与回查达到一致

适用场景:订单创建/支付、库存扣减、积分/优惠券发放、资金记账、状态同步等。


常见实践建议

  • 消费端幂等:唯一业务键、去重表、乐观锁
  • 失败重试与死信队列(DLQ)配置
  • 监控与告警:积压、失败率、耗时
  • 结合延时消息实现“超时关闭/回查”
  • 事务消息只在关键链路使用,其余用本地消息表或最大努力通知

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://joyjs.cn/archives/4787

(0)
Walker的头像Walker
上一篇 2025年11月25日 14:00
下一篇 2025年11月25日 12:00

相关推荐

  • 深入理解ES6 008【学习笔记】

    迭代器(Iterator)和生成器(Generator) 这个新特性对于高效的数据处理而言是不可或缺的,你也会发现在语言的其他特性中也都有迭代器的身影:新的for-of循环、展开运算符(...)、甚至连异步编程都可以使用迭代器。 迭代器是一种特殊的对象,它具有一些专门为迭代过程设计的专有接口,所有的迭代器对象都有一个next()方法,每次调用都返回一个结果对…

    个人 2025年3月8日
    94000
  • 深入理解ES6 010【学习笔记】

    改进的数组功能 new Array()的怪异行为,当构造函数传入一个数值型的值,那么数组的length属性会被设为该值;如果传入多个值,此时无论这些值是不是数值型的,都会变为数组的元素。这个特性另人困惑,你不可能总是注意传入数据的类型,所以存在一定的风险。 Array.of() 无论传多少个参数,不存在单一数值的特例(一个参数且数值型),总是返回包含所有参数…

    个人 2025年3月8日
    1.0K00
  • Go工程师体系课 001【学习笔记】

    转型 想在短时间系统转到Go工程理由 提高CRUD,无自研框架经验 拔高技术深度,做专、做精需求的同学 进阶工程化,拥有良好开发规范和管理能力的 工程化的重要性 高级开的期望 良好的代码规范 深入底层原理 熟悉架构 熟悉k8s的基础架构 扩展知识广度,知识的深度,规范的开发体系 四个大的阶段 go语言基础 微服务开发的(电商项目实战) 自研微服务 自研然后重…

    个人 2025年11月25日
    5600
  • Go工程师体系课 009【学习笔记】

    其它一些功能 个人中心 收藏 管理收货地址(增删改查) 留言 拷贝inventory_srv--> userop_srv 查询替换所有的inventory Elasticsearch 深度解析文档 1. 什么是Elasticsearch Elasticsearch是一个基于Apache Lucene构建的分布式、RESTful搜索和分析引擎,能够快速地…

    个人 2025年11月25日
    6000
  • 深入理解ES6 001【学习笔记】

    块级作用域绑定 之前的变量声明var无论在哪里声明的都被认为是作域顶部声明的,由于函数是一等公民,所以顺序一般是function 函数名()、var 变量。 块级声明 块级声明用于声明在指定块的作用域之外无法访问的变量。块级作用域存在于: 函数内部 块中(字符和{和}之间的区域) 临时死区 javascript引擎在扫描代码发现变量声明时,要么将它们提升至作…

    个人 2025年3月8日
    1.5K00

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
欢迎🌹 Coding never stops, keep learning! 💡💻 光临🌹