AI大模型教程
一起来学习

从 0 到 1 精通 SpringBoot 集成 RocketMQ

引言:

  • 本文总字数:约 8500 字
  • 预计阅读时间:35 分钟

为什么选择 RocketMQ?

在分布式系统架构中,消息队列扮演着至关重要的角色,它不仅能实现系统解耦、流量削峰,还能保障数据最终一致性。目前市面上主流的消息中间件有 RabbitMQ、Kafka 和 RocketMQ,它们各有侧重:

  • RabbitMQ:基于 AMQP 协议,社区活跃,插件丰富,但在超高吞吐量场景下性能略显不足
  • Kafka:擅长日志收集和大数据场景,吞吐量惊人,但消息可靠性和事务支持较弱
  • RocketMQ:阿里开源的分布式消息中间件,兼顾了高吞吐量、低延迟和高可靠性,完美支持事务消息

根据 Apache 官方数据,RocketMQ 在单节点下就能支持每秒数十万级的消息处理能力,并且提供了完善的消息追踪、重试机制和流量控制功能,这也是它能在阿里内部支撑双 11 等超高并发场景的核心原因。

本文将带你全面掌握 SpringBoot 与 RocketMQ 的集成方案,从环境搭建到高级特性,从代码实现到性能调优,让你既能理解底层原理,又能解决实际开发中的各种问题。

一、RocketMQ 核心概念与架构

1.1 核心概念解析

要学好 RocketMQ,首先需要理解其核心概念,这些概念是后续开发的基础:

  • Producer:消息生产者,负责发送消息到 Broker
  • Consumer:消息消费者,负责从 Broker 接收并处理消息
  • Broker:消息服务器,存储消息并转发消息
  • Topic:消息主题,用于消息分类,每个消息必须属于一个 Topic
  • Tag:消息标签,用于同一 Topic 下的消息进一步分类
  • Queue:消息队列,每个 Topic 包含多个 Queue,用于负载均衡
  • NameServer:命名服务,管理 Broker 的路由信息,提供轻量级的服务发现

RocketMQ 的消息模型采用了发布 – 订阅模式,生产者将消息发送到特定 Topic,消费者通过订阅 Topic 来获取消息。这种模型的优势在于生产者和消费者完全解耦,它们不需要知道对方的存在。

1.2 架构原理

RocketMQ 的整体架构如图所示:

RocketMQ 架构具有以下特点:

  1. NameServer 无状态:各个 NameServer 之间不进行通信,通过 Broker 主动上报路由信息
  2. Broker 主从架构:每个 Master 可以有多个 Slave,实现消息的高可用存储
  3. 负载均衡:通过多个 Queue 实现 Producer 和 Consumer 的负载均衡
  4. 水平扩展:所有组件都可以水平扩展,满足不同规模的业务需求

根据 RocketMQ 官方文档(https://rocketmq.apache.org/docs/quick-start/),这种架构设计使得 RocketMQ 能够支持万亿级别的消息堆积,并且在消息丢失率上可以做到接近零。

二、环境搭建

2.1 安装 RocketMQ

我们采用最新稳定版 RocketMQ 5.2.0 进行安装,步骤如下:

  1. 下载安装包:
wget https://archive.apache.org/dist/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip
  1. 解压安装包:
unzip rocketmq-all-5.2.0-bin-release.zip
cd rocketmq-all-5.2.0-bin-release
  1. 调整 JVM 参数(根据服务器配置调整):
# 编辑runserver.sh
sed -i 's/-Xms4g -Xmx4g -Xmn2g/-Xms512m -Xmx512m -Xmn256m/g' bin/runserver.sh
# 编辑runbroker.sh
sed -i 's/-Xms8g -Xmx8g -Xmn4g/-Xms512m -Xmx512m -Xmn256m/g' bin/runbroker.sh
  1. 启动 NameServer:
nohup sh bin/mqnamesrv &
  1. 启动 Broker:
nohup sh bin/mqbroker -n localhost:9876 &
  1. 验证启动是否成功:
# 查看NameServer日志
tail -f ~/logs/rocketmqlogs/namesrv.log
# 查看Broker日志
tail -f ~/logs/rocketmqlogs/broker.log

如果日志中出现 “The Name Server boot success” 和 “The broker […:10911] boot success” 字样,说明启动成功。

2.2 安装控制台

为了方便管理和监控 RocketMQ,我们安装最新的控制台:

  1. 克隆代码仓库:
git clone https://github.com/apache/rocketmq-dashboard.git
cd rocketmq-dashboard
  1. 修改配置文件:
vi src/main/resources/application.properties
  1. 修改 NameServer 地址:
rocketmq.config.namesrvAddr=localhost:9876
  1. 打包并运行:
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
  1. 访问控制台:
    打开浏览器访问http://localhost:8080,即可看到 RocketMQ 的管理界面。

三、SpringBoot 集成 RocketMQ 基础

3.1 创建项目并添加依赖

我们使用 SpringBoot 3.2.0(最新稳定版)来创建项目,首先在 pom.xml 中添加必要的依赖:

4.0.0org.springframework.bootspring-boot-starter-parent3.2.0com.jamspringboot-rocketmq-demo0.0.1-SNAPSHOTspringboot-rocketmq-demoSpringBoot集成RocketMQ示例项目172.2.31.18.303.14.03.5.58.2.02.1.0org.springframework.bootspring-boot-starter-weborg.apache.rocketmqrocketmq-spring-boot-starter${rocketmq-spring-boot.version}org.projectlomboklombok${lombok.version}providedorg.apache.commonscommons-lang3${commons-lang3.version}com.baomidoumybatis-plus-boot-starter${mybatis-plus.version}com.mysqlmysql-connector-j${mysql-connector.version}runtimeorg.springdocspringdoc-openapi-starter-webmvc-ui${springdoc.version}org.springframework.bootspring-boot-starter-testtestorg.springframework.bootspring-boot-maven-pluginorg.projectlomboklombok

3.2 配置 RocketMQ

在 application.yml 中添加 RocketMQ 的配置:

spring:
  application:
    name: springboot-rocketmq-demo
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
    username: root
    password: root

rocketmq:
  name-server: localhost:9876
  producer:
    # 生产者组名,同一组的生产者具有相同的属性
    group: demo_producer_group
    # 发送消息超时时间,单位毫秒
    send-message-timeout: 3000
    # 消息最大长度,单位字节
    max-message-size: 4194304
    # 消息发送失败重试次数
    retry-times-when-send-failed: 2
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 2
    # 消息压缩阈值,超过此阈值则进行压缩,单位字节
    compress-message-body-threshold: 4096
    # 重试时选择另一个Broker的开关
    retry-next-server: true

mybatis-plus:
  mapper-locations: classpath:mapper/*.xml
  type-aliases-package: com.jam.entity
  configuration:
    map-underscore-to-camel-case: true
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

springdoc:
  api-docs:
    path: /api-docs
  swagger-ui:
    path: /swagger-ui.html
    operationsSorter: method

server:
  port: 8081

3.3 创建消息实体类

创建一个通用的消息实体类,用于封装发送的消息内容:

package com.jam.entity;

import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * 消息实体类
 * 用于封装发送到RocketMQ的消息内容
 *
 * @author 果酱
 */
@Data
public class MessageEntity implements Serializable {
    /**
     * 消息ID
     */
    private String messageId;
    
    /**
     * 消息内容
     */
    private String content;
    
    /**
     * 业务类型
     */
    private String businessType;
    
    /**
     * 创建时间
     */
    private LocalDateTime createTime;
    
    /**
     * 扩展字段,用于存储额外信息
     */
    private String extra;
}

3.4 创建消息生产者

使用 RocketMQ 提供的 RocketMQTemplate 来发送消息,创建一个消息生产者服务:

package com.jam.service;

import com.jam.entity.MessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 消息生产者服务
 * 负责向RocketMQ发送各种类型的消息
 *
 * @author 果酱
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageProducerService {
    /**
     * RocketMQ模板类,提供发送消息的各种方法
     */
    private final RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送同步消息
     * 同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式
     *
     * @param topic 消息主题
     * @param tag 消息标签
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 发送结果
     */
    public SendResult sendSyncMessage(String topic, String tag, String content, 
                                     String businessType, String extra) {
        // 参数校验
        StringUtils.hasText(topic, "消息主题不能为空");
        StringUtils.hasText(content, "消息内容不能为空");
        
        // 构建完整的主题标签字符串
        String destination = buildDestination(topic, tag);
        
        // 创建消息实体
        MessageEntity messageEntity = new MessageEntity();
        messageEntity.setMessageId(UUID.randomUUID().toString());
        messageEntity.setContent(content);
        messageEntity.setBusinessType(businessType);
        messageEntity.setCreateTime(LocalDateTime.now());
        messageEntity.setExtra(extra);
        
        // 构建Spring的Message对象
        Message message = MessageBuilder
                .withPayload(messageEntity)
                .build();
        
        log.info("发送同步消息,主题:{},消息ID:{}", destination, messageEntity.getMessageId());
        
        // 发送同步消息
        SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
        
        log.info("同步消息发送完成,结果:{}", sendResult);
        return sendResult;
    }
    
    /**
     * 发送异步消息
     * 异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式
     * 需要通过回调函数处理响应结果
     *
     * @param topic 消息主题
     * @param tag 消息标签
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     */
    public void sendAsyncMessage(String topic, String tag, String content, 
                                String businessType, String extra) {
        // 参数校验
        StringUtils.hasText(topic, "消息主题不能为空");
        StringUtils.hasText(content, "消息内容不能为空");
        
        // 构建完整的主题标签字符串
        String destination = buildDestination(topic, tag);
        
        // 创建消息实体
        MessageEntity messageEntity = new MessageEntity();
        messageEntity.setMessageId(UUID.randomUUID().toString());
        messageEntity.setContent(content);
        messageEntity.setBusinessType(businessType);
        messageEntity.setCreateTime(LocalDateTime.now());
        messageEntity.setExtra(extra);
        
        // 构建Spring的Message对象
        Message message = MessageBuilder
                .withPayload(messageEntity)
                .build();
        
        log.info("发送异步消息,主题:{},消息ID:{}", destination, messageEntity.getMessageId());
        
        // 发送异步消息,设置回调函数
        rocketMQTemplate.asyncSend(destination, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步消息发送成功,消息ID:{},结果:{}", 
                        messageEntity.getMessageId(), sendResult);
            }
            
            @Override
            public void onException(Throwable e) {
                log.error("异步消息发送失败,消息ID:{}", messageEntity.getMessageId(), e);
                // 可以在这里添加消息发送失败的处理逻辑,如重试、保存到数据库等
            }
        });
    }
    
    /**
     * 发送单向消息
     * 单向发送是指发送方只负责发送消息,不等待服务器回应,且没有回调函数触发
     * 适用于某些耗时非常短,但对可靠性要求并不高的场景
     *
     * @param topic 消息主题
     * @param tag 消息标签
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     */
    public void sendOneWayMessage(String topic, String tag, String content, 
                                 String businessType, String extra) {
        // 参数校验
        StringUtils.hasText(topic, "消息主题不能为空");
        StringUtils.hasText(content, "消息内容不能为空");
        
        // 构建完整的主题标签字符串
        String destination = buildDestination(topic, tag);
        
        // 创建消息实体
        MessageEntity messageEntity = new MessageEntity();
        messageEntity.setMessageId(UUID.randomUUID().toString());
        messageEntity.setContent(content);
        messageEntity.setBusinessType(businessType);
        messageEntity.setCreateTime(LocalDateTime.now());
        messageEntity.setExtra(extra);
        
        log.info("发送单向消息,主题:{},消息ID:{}", destination, messageEntity.getMessageId());
        
        // 发送单向消息
        rocketMQTemplate.sendOneWay(destination, messageEntity);
        
        log.info("单向消息发送完成,消息ID:{}", messageEntity.getMessageId());
    }
    
    /**
     * 构建消息目的地字符串
     * 格式为"topic:tag",如果tag为空则只返回topic
     *
     * @param topic 消息主题
     * @param tag 消息标签
     * @return 构建后的目的地字符串
     */
    private String buildDestination(String topic, String tag) {
        if (StringUtils.hasText(tag)) {
            return topic + ":" + tag;
        }
        return topic;
    }
}

3.5 创建消息消费者

使用 @RocketMQMessageListener 注解来创建消息消费者:

package com.jam.service;

import com.jam.entity.MessageEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * 消息消费者服务
 * 负责从RocketMQ接收并处理消息
 *
 * @author 果酱
 */
@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = "demo_consumer_group", // 消费者组名
        topic = "demo_topic", // 订阅的主题
        selectorExpression = "*" // 消息选择表达式,*表示所有标签
)
public class MessageConsumerService implements RocketMQListener {
    
    /**
     * 处理接收到的消息
     * 当有消息到达时,RocketMQ会自动调用此方法
     *
     * @param message 接收到的消息
     */
    @Override
    public void onMessage(MessageEntity message) {
        // 打印接收到的消息
        log.info("接收到消息,消息ID:{},内容:{},业务类型:{}",
                message.getMessageId(),
                message.getContent(),
                message.getBusinessType());
        
        try {
            // 处理消息的业务逻辑
            processMessage(message);
            
            // 如果没有抛出异常,RocketMQ会认为消息消费成功
            log.info("消息处理成功,消息ID:{}", message.getMessageId());
        } catch (Exception e) {
            // 如果抛出异常,RocketMQ会根据重试策略进行消息重试
            log.error("消息处理失败,消息ID:{}", message.getMessageId(), e);
            // 注意:不要在这里捕获异常后不重新抛出,否则RocketMQ会认为消息已成功消费
            throw new RuntimeException("消息处理失败", e);
        }
    }
    
    /**
     * 处理消息的业务逻辑
     *
     * @param message 要处理的消息
     */
    private void processMessage(MessageEntity message) {
        // 根据业务类型处理不同的消息
        String businessType = message.getBusinessType();
        if ("ORDER_CREATE".equals(businessType)) {
            // 处理订单创建消息
            processOrderCreateMessage(message);
        } else if ("USER_REGISTER".equals(businessType)) {
            // 处理用户注册消息
            processUserRegisterMessage(message);
        } else {
            // 处理未知类型消息
            log.warn("收到未知类型的消息,消息ID:{},业务类型:{}",
                    message.getMessageId(), businessType);
        }
    }
    
    /**
     * 处理订单创建消息
     *
     * @param message 订单创建消息
     */
    private void processOrderCreateMessage(MessageEntity message) {
        log.info("处理订单创建消息,消息ID:{},订单信息:{}",
                message.getMessageId(), message.getContent());
        // 实际业务处理逻辑...
    }
    
    /**
     * 处理用户注册消息
     *
     * @param message 用户注册消息
     */
    private void processUserRegisterMessage(MessageEntity message) {
        log.info("处理用户注册消息,消息ID:{},用户信息:{}",
                message.getMessageId(), message.getContent());
        // 实际业务处理逻辑...
    }
}

3.6 创建控制器

创建一个控制器,用于测试消息发送功能:

package com.jam.controller;

import com.jam.service.MessageProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消息测试控制器
 * 提供API接口用于测试消息发送功能
 *
 * @author 果酱
 */
@Slf4j
@RestController
@RequestMapping("/api/message")
@RequiredArgsConstructor
@Tag(name = "消息测试接口", description = "用于测试RocketMQ消息发送的API接口")
public class MessageController {
    /**
     * 消息生产者服务
     */
    private final MessageProducerService messageProducerService;
    
    /**
     * 发送同步消息
     *
     * @param topic 消息主题
     * @param tag 消息标签
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 发送结果
     */
    @PostMapping("/sync")
    @Operation(summary = "发送同步消息", description = "发送同步消息并返回发送结果")
    public ResponseEntity sendSyncMessage(
            @Parameter(description = "消息主题", required = true)
            @RequestParam String topic,
            
            @Parameter(description = "消息标签")
            @RequestParam(required = false) String tag,
            
            @Parameter(description = "消息内容", required = true)
            @RequestParam String content,
            
            @Parameter(description = "业务类型")
            @RequestParam(required = false) String businessType,
            
            @Parameter(description = "额外信息")
            @RequestParam(required = false) String extra) {
        
        log.info("接收到发送同步消息请求,主题:{},标签:{}", topic, tag);
        SendResult sendResult = messageProducerService.sendSyncMessage(
                topic, tag, content, businessType, extra);
        return ResponseEntity.ok(sendResult);
    }
    
    /**
     * 发送异步消息
     *
     * @param topic 消息主题
     * @param tag 消息标签
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 响应信息
     */
    @PostMapping("/async")
    @Operation(summary = "发送异步消息", description = "发送异步消息,结果通过日志输出")
    public ResponseEntity sendAsyncMessage(
            @Parameter(description = "消息主题", required = true)
            @RequestParam String topic,
            
            @Parameter(description = "消息标签")
            @RequestParam(required = false) String tag,
            
            @Parameter(description = "消息内容", required = true)
            @RequestParam String content,
            
            @Parameter(description = "业务类型")
            @RequestParam(required = false) String businessType,
            
            @Parameter(description = "额外信息")
            @RequestParam(required = false) String extra) {
        
        log.info("接收到发送异步消息请求,主题:{},标签:{}", topic, tag);
        messageProducerService.sendAsyncMessage(topic, tag, content, businessType, extra);
        return ResponseEntity.ok("异步消息发送请求已受理,结果请查看日志");
    }
    
    /**
     * 发送单向消息
     *
     * @param topic 消息主题
     * @param tag 消息标签
     * @param content 消息内容
     * @param businessType 业务类型
     * @param extra 额外信息
     * @return 响应信息
     */
    @PostMapping("/oneWay")
    @Operation(summary = "发送单向消息", description = "发送单向消息,不等待响应结果")
    public ResponseEntity sendOneWayMessage(
            @Parameter(description = "消息主题", required = true)
            @RequestParam String topic,
            
            @Parameter(description = "消息标签")
            @RequestParam(required = false) String tag,
            
            @Parameter(description = "消息内容", required = true)
            @RequestParam String content,
            
            @Parameter(description = "业务类型")
            @RequestParam(required = false) String businessType,
            
            @Parameter(description = "额外信息")
            @RequestParam(required = false) String extra) {
        
        log.info("接收到发送单向消息请求,主题:{},标签:{}", topic, tag);
        messageProducerService.sendOneWayMessage(topic, tag, content, businessType, extra);
        return ResponseEntity.ok("单向消息已发送");
    }
}

3.7 创建启动类

package com.jam;

import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * SpringBoot应用启动类
 *
 * @author 果酱
 */
@SpringBootApplication
@MapperScan("com.jam.mapper")
@OpenAPIDefinition(
        info = @Info(
                title = "SpringBoot集成RocketMQ示例项目",
                version = "1.0",
                description = "SpringBoot集成RocketMQ的示例项目,包含各种消息发送和消费的示例"
        )
)
public class SpringbootRocketmqDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringbootRocketmqDemoApplication.class, args);
    }
}

3.8 测试消息发送与消费

启动应用程序后,可以通过以下方式测试消息发送与消费:

  1. 使用 Swagger UI 测试:访问http://localhost:8081/swagger-ui.html,通过界面调用消息发送接口
  2. 使用 curl 命令测试:
# 发送同步消息
curl -X POST "http://localhost:8081/api/message/sync?topic=demo_topic&tag=test&content=Hello RocketMQ&businessType=TEST"

# 发送异步消息
curl -X POST "http://localhost:8081/api/message/async?topic=demo_topic&tag=async&content=Async Message&businessType=TEST"

# 发送单向消息
curl -X POST "http://localhost:8081/api/message/oneWay?topic=demo_topic&tag=oneway&content=OneWay Message&businessType=TEST"

发送消息后,可以在控制台看到生产者和消费者的日志输出,证明消息已经成功发送和消费。

四、RocketMQ 高级特性

4.1 消息过滤

RocketMQ 提供了两种消息过滤方式:Tag 过滤和 SQL92 过滤。

4.1.1 Tag 过滤

Tag 过滤是最简单也是最常用的过滤方式,我们在前面的例子中已经使用过。消费者可以通过 selectorExpression 指定要消费的 Tag:

@RocketMQMessageListener(
        consumerGroup = "tag_consumer_group",
        topic = "demo_topic",
        selectorExpression = "ORDER_CREATE || USER_REGISTER" // 只消费这两个Tag的消息
)
public class TagFilterConsumer implements RocketMQListener {
    // 实现代码...
}
4.1.2 SQL92 过滤

SQL92 过滤提供了更强大的过滤能力,可以基于消息的属性进行过滤。首先需要在 Broker 配置中开启 SQL 过滤支持:

# 在broker.conf中添加
enablePropertyFilter=true

然后在发送消息时设置消息属性:

/**
 * 发送带有属性的消息,支持SQL过滤
 */
public SendResult sendMessageWithProperties(String topic, String content, Map properties) {
    StringUtils.hasText(topic, "消息主题不能为空");
    StringUtils.hasText(content, "消息内容不能为空");
    Objects.requireNonNull(properties, "消息属性不能为空");
    
    MessageEntity messageEntity = new MessageEntity();
    messageEntity.setMessageId(UUID.randomUUID().toString());
    messageEntity.setContent(content);
    messageEntity.setCreateTime(LocalDateTime.now());
    
    // 构建消息并设置属性
    Message message = MessageBuilder
            .withPayload(messageEntity)
            .build();
    
    // 设置消息属性
    properties.forEach(message::setHeader);
    
    log.info("发送带属性的消息,主题:{},消息ID:{},属性:{}", 
            topic, messageEntity.getMessageId(), properties);
    
    return rocketMQTemplate.syncSend(topic, message);
}

最后创建支持 SQL 过滤的消费者:

@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = "sql_consumer_group",
        topic = "demo_topic",
        selectorType = SelectorType.SQL92, // 指定使用SQL92过滤
        selectorExpression = "age > 18 AND level = 'VIP'" // SQL过滤表达式
)
public class SqlFilterConsumer implements RocketMQListener {
    @Override
    public void onMessage(MessageEntity message) {
        log.info("SQL过滤后的消息,消息ID:{},内容:{}", 
                message.getMessageId(), message.getContent());
    }
}

根据 RocketMQ 官方文档,SQL 过滤表达式支持以下语法:

  • 数值比较:>、>=、
  • 字符比较:=、、IN
  • 逻辑运算:AND、OR、NOT
  • 空值判断:IS NULL、IS NOT NULL

4.2 事务消息

RocketMQ 提供了可靠的事务消息支持,解决分布式事务问题。事务消息的执行流程如下:

实现事务消息需要以下步骤:

4.2.1 创建事务消息生产者
package com.jam.service;

import com.jam.entity.MessageEntity;
import com.jam.entity.Order;
import com.jam.mapper.OrderMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 事务消息服务
 * 处理订单创建的事务消息
 *
 * @author 果酱
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class TransactionMessageService implements RocketMQLocalTransactionListener {
    private final RocketMQTemplate rocketMQTemplate;
    private final OrderMapper orderMapper;
    
    /**
     * 发送事务消息
     *
     * @param order 订单信息
     */
    public void sendTransactionMessage(Order order) {
        Objects.requireNonNull(order, "订单信息不能为空");
        StringUtils.hasText(order.getOrderNo(), "订单号不能为空");
        
        // 创建消息实体
        MessageEntity messageEntity = new MessageEntity();
        messageEntity.setMessageId(UUID.randomUUID().toString());
        messageEntity.setContent("订单创建:" + order.getOrderNo());
        messageEntity.setBusinessType("ORDER_CREATE");
        messageEntity.setCreateTime(LocalDateTime.now());
        messageEntity.setExtra(order.getOrderNo());
        
        // 构建消息,将订单号存入消息头,用于回查
        Message message = MessageBuilder
                .withPayload(messageEntity)
                .setHeader("orderNo", order.getOrderNo())
                .build();
        
        log.info("发送事务消息,订单号:{},消息ID:{}", 
                order.getOrderNo(), messageEntity.getMessageId());
        
        // 发送事务消息
        rocketMQTemplate.sendMessageInTransaction(
                "order_topic:create", // 主题:标签
                message,
                order // 额外参数,会传递到executeLocalTransaction方法
        );
    }
    
    /**
     * 执行本地事务
     *
     * @param message 消息对象
     * @param arg 额外参数
     * @return 事务状态
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message> message, Object arg) {
        Order order = (Order) arg;
        String orderNo = order.getOrderNo();
        
        try {
            log.info("开始执行本地事务,订单号:{}", orderNo);
            
            // 保存订单信息到数据库
            orderMapper.insert(order);
            
            log.info("本地事务执行成功,订单号:{}", orderNo);
            // 本地事务执行成功,提交消息
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("本地事务执行失败,订单号:{}", orderNo, e);
            // 本地事务执行失败,回滚消息
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    /**
     * 本地事务回查
     * 当Broker没有收到事务状态时,会调用此方法查询事务状态
     *
     * @param message 消息对象
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message> message) {
        // 从消息头中获取订单号
        String orderNo = message.getHeaders().get("orderNo", String.class);
        StringUtils.hasText(orderNo, "订单号不能为空");
        
        try {
            log.info("开始回查本地事务,订单号:{}", orderNo);
            
            // 查询数据库,检查订单是否已创建
            Order order = orderMapper.selectByOrderNo(orderNo);
            
            if (order != null) {
                // 订单已创建,说明本地事务执行成功,提交消息
                log.info("回查发现本地事务已成功,订单号:{}", orderNo);
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                // 订单不存在,说明本地事务执行失败,回滚消息
                log.info("回查发现本地事务未执行成功,订单号:{}", orderNo);
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            log.error("本地事务回查失败,订单号:{}", orderNo, e);
            // 回查过程中发生异常,返回UNKNOWN,等待下一次回查
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}
4.2.2 创建订单实体和 Mapper

订单实体类:

package com.jam.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 订单实体类
 *
 * @author 果酱
 */
@Data
@TableName("t_order")
public class Order {
    /**
     * 主键ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;
    
    /**
     * 订单号
     */
    private String orderNo;
    
    /**
     * 用户ID
     */
    private Long userId;
    
    /**
     * 订单金额
     */
    private BigDecimal amount;
    
    /**
     * 订单状态:0-待支付,1-已支付,2-已取消
     */
    private Integer status;
    
    /**
     * 创建时间
     */
    private LocalDateTime createTime;
    
    /**
     * 更新时间
     */
    private LocalDateTime updateTime;
}

OrderMapper 接口:

package com.jam.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.entity.Order;
import org.apache.ibatis.annotations.Param;

/**
 * 订单Mapper
 *
 * @author 果酱
 */
public interface OrderMapper extends BaseMapper {
    /**
     * 根据订单号查询订单
     *
     * @param orderNo 订单号
     * @return 订单信息
     */
    Order selectByOrderNo(@Param("orderNo") String orderNo);
}

对应的 Mapper XML 文件(resources/mapper/OrderMapper.xml):


        SELECT * FROM t_order WHERE order_no = #{orderNo}
    

创建订单表的 SQL:

CREATE TABLE `t_order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `order_no` varchar(64) NOT NULL COMMENT '订单号',
  `user_id` bigint NOT NULL COMMENT '用户ID',
  `amount` decimal(10,2) NOT NULL COMMENT '订单金额',
  `status` tinyint NOT NULL DEFAULT '0' COMMENT '订单状态:0-待支付,1-已支付,2-已取消',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_order_no` (`order_no`),
  KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
4.2.3 创建事务消息消费者
package com.jam.service;

import com.jam.entity.MessageEntity;
import com.jam.entity.Order;
import com.jam.mapper.OrderMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * 订单消息消费者
 * 处理订单创建的消息
 *
 * @author 果酱
 */
@Slf4j
@Service
@RequiredArgsConstructor
@RocketMQMessageListener(
        consumerGroup = "order_consumer_group",
        topic = "order_topic",
        selectorExpression = "create"
)
public class OrderMessageConsumer implements RocketMQListener {
    private final OrderMapper orderMapper;
    
    @Override
    public void onMessage(MessageEntity message) {
        log.info("接收到订单消息,消息ID:{},内容:{}", 
                message.getMessageId(), message.getContent());
        
        try {
            // 获取订单号
            String orderNo = message.getExtra();
            // 查询订单信息
            Order order = orderMapper.selectByOrderNo(orderNo);
            
            if (order != null) {
                log.info("订单信息存在,开始处理后续业务,订单号:{}", orderNo);
                // 处理订单后续业务,如通知用户、库存扣减等
                processOrderAfterCreate(order);
                log.info("订单后续业务处理完成,订单号:{}", orderNo);
            } else {
                log.error("订单信息不存在,无法处理,订单号:{}", orderNo);
                throw new RuntimeException("订单信息不存在");
            }
        } catch (Exception e) {
            log.error("订单消息处理失败,消息ID:{}", message.getMessageId(), e);
            throw new RuntimeException("订单消息处理失败", e);
        }
    }
    
    /**
     * 处理订单创建后的后续业务
     *
     * @param order 订单信息
     */
    private void processOrderAfterCreate(Order order) {
        // 实际业务处理逻辑
        log.info("处理订单创建后的业务,订单号:{},用户ID:{},金额:{}",
                order.getOrderNo(), order.getUserId(), order.getAmount());
        // 例如:发送短信通知、扣减库存、积分计算等
    }
}
4.2.4 创建测试接口

在控制器中添加测试事务消息的接口:

/**
 * 创建订单并发送事务消息
 */
@PostMapping("/order/create")
@Operation(summary = "创建订单", description = "创建订单并发送事务消息")
public ResponseEntity createOrder(
        @Parameter(description = "用户ID", required = true)
        @RequestParam Long userId,
        
        @Parameter(description = "订单金额", required = true)
        @RequestParam BigDecimal amount) {
    
    // 生成订单号
    String orderNo = "ORDER_" + System.currentTimeMillis() + 
            RandomStringUtils.randomNumeric(6);
    
    // 创建订单对象
    Order order = new Order();
    order.setOrderNo(orderNo);
    order.setUserId(userId);
    order.setAmount(amount);
    order.setStatus(0); // 待支付
    order.setCreateTime(LocalDateTime.now());
    order.setUpdateTime(LocalDateTime.now());
    
    // 发送事务消息
    transactionMessageService.sendTransactionMessage(order);
    
    return ResponseEntity.ok("订单创建请求已提交,订单号:" + orderNo);
}

4.3 延时消息

RocketMQ 支持发送延时消息,即消息发送后,并不立即被消费,而是延迟一定时间后才被消费。这在很多场景下非常有用,如订单超时未支付自动取消、定时任务等。

RocketMQ 默认支持 18 个级别的延时消息:

1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h

发送延时消息的代码如下:

/**
 * 发送延时消息
 *
 * @param topic 消息主题
 * @param tag 消息标签
 * @param content 消息内容
 * @param delayLevel 延时级别,1-18
 * @return 发送结果
 */
public SendResult sendDelayMessage(String topic, String tag, String content, int delayLevel) {
    // 参数校验
    StringUtils.hasText(topic, "消息主题不能为空");
    StringUtils.hasText(content, "消息内容不能为空");
    if (delayLevel  18) {
        throw new IllegalArgumentException("延时级别必须在1-18之间");
    }
    
    // 构建完整的主题标签字符串
    String destination = buildDestination(topic, tag);
    
    // 创建消息实体
    MessageEntity messageEntity = new MessageEntity();
    messageEntity.setMessageId(UUID.randomUUID().toString());
    messageEntity.setContent(content);
    messageEntity.setBusinessType("DELAY_TASK");
    messageEntity.setCreateTime(LocalDateTime.now());
    
    // 构建消息并设置延时级别
    Message message = MessageBuilder
            .withPayload(messageEntity)
            .build();
    
    log.info("发送延时消息,主题:{},消息ID:{},延时级别:{}",
            destination, messageEntity.getMessageId(), delayLevel);
    
    // 发送延时消息,指定延时级别
    SendResult sendResult = rocketMQTemplate.syncSend(
            destination, 
            message,
            3000, // 超时时间
            delayLevel // 延时级别
    );
    
    log.info("延时消息发送完成,结果:{}", sendResult);
    return sendResult;
}

对应的消费者不需要特殊处理,和普通消费者一样:

@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = "delay_consumer_group",
        topic = "delay_topic"
)
public class DelayMessageConsumer implements RocketMQListener {
    @Override
    public void onMessage(MessageEntity message) {
        log.info("接收到延时消息,当前时间:{},消息ID:{},内容:{},发送时间:{}",
                LocalDateTime.now(),
                message.getMessageId(),
                message.getContent(),
                message.getCreateTime());
    }
}

4.4 消息重试

当消息消费失败时,RocketMQ 会自动进行消息重试。消息重试的机制如下:

  1. 对于集群消费模式,消息消费失败后,会被发送到重试队列
  2. 重试队列的名称为 “% RETRY%+ 消费者组名”
  3. 重试次数默认 16 次,每次重试的间隔时间逐渐增加

可以在消费者注解中配置重试策略:

@RocketMQMessageListener(
        consumerGroup = "retry_consumer_group",
        topic = "retry_topic",
        // 最大重试次数
        maxReconsumeTimes = 3,
        // 消息模式:CLUSTERING(集群模式)或BROADCASTING(广播模式)
        messageModel = MessageModel.CLUSTERING
)
public class RetryMessageConsumer implements RocketMQListener {
    @Override
    public void onMessage(MessageEntity message) {
        log.info("接收到需要重试的消息,消息ID:{}", message.getMessageId());
        
        // 模拟消息处理失败
        throw new RuntimeException("消息处理失败,需要重试");
    }
}

当消息重试达到最大次数后,仍然消费失败,消息会被发送到死信队列。死信队列的名称为 “% DLQ%+ 消费者组名”。

我们可以创建一个死信队列的消费者来处理这些无法正常消费的消息:

@Slf4j
@Service
@RocketMQMessageListener(
        consumerGroup = "dlq_consumer_group",
        topic = "%DLQ%retry_consumer_group" // 死信队列的名称
)
public class DlqMessageConsumer implements RocketMQListener {
    @Override
    public void onMessage(MessageEntity message) {
        log.error("接收到死信消息,消息ID:{},内容:{}",
                message.getMessageId(), message.getContent());
        
        // 处理死信消息的逻辑,如人工干预、记录到数据库等
    }
}

五、RocketMQ 与数据库交互

在实际项目中,消息队列经常需要与数据库交互,例如:

  • 消息消费完成后更新数据库状态
  • 消息发送前将消息持久化到数据库,保证消息不丢失
  • 处理死信消息时记录到数据库

下面我们以消息轨迹跟踪为例,展示 RocketMQ 与数据库的交互。

5.1 创建消息轨迹表

CREATE TABLE `t_message_trace` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `message_id` varchar(64) NOT NULL COMMENT '消息ID',
  `topic` varchar(128) NOT NULL COMMENT '消息主题',
  `tag` varchar(64) DEFAULT NULL COMMENT '消息标签',
  `business_type` varchar(64) DEFAULT NULL COMMENT '业务类型',
  `content` text COMMENT '消息内容',
  `send_time` datetime DEFAULT NULL COMMENT '发送时间',
  `send_status` tinyint DEFAULT NULL COMMENT '发送状态:0-待发送,1-发送成功,2-发送失败',
  `send_result` text COMMENT '发送结果',
  `consume_time` datetime DEFAULT NULL COMMENT '消费时间',
  `consume_status` tinyint DEFAULT NULL COMMENT '消费状态:0-待消费,1-消费成功,2-消费失败',
  `consume_result` text COMMENT '消费结果',
  `reconsume_times` int DEFAULT '0' COMMENT '重试次数',
  `create_time` datetime NOT NULL COMMENT '创建时间',
  `update_time` datetime NOT NULL COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_message_id` (`message_id`),
  KEY `idx_topic` (`topic`),
  KEY `idx_business_type` (`business_type`),
  KEY `idx_send_status` (`send_status`),
  KEY `idx_consume_status` (`consume_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息轨迹表';

5.2 创建消息轨迹实体和 Mapper

实体类:

package com.jam.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 消息轨迹实体类
 * 记录消息的发送和消费情况
 *
 * @author 果酱
 */
@Data
@TableName("t_message_trace")
public class MessageTrace {
    /**
     * 主键ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;
    
    /**
     * 消息ID
     */
    private String messageId;
    
    /**
     * 消息主题
     */
    private String topic;
    
    /**
     * 消息标签
     */
    private String tag;
    
    /**
     * 业务类型
     */
    private String businessType;
    
    /**
     * 消息内容
     */
    private String content;
    
    /**
     * 发送时间
     */
    private LocalDateTime sendTime;
    
    /**
     * 发送状态:0-待发送,1-发送成功,2-发送失败
     */
    private Integer sendStatus;
    
    /**
     * 发送结果
     */
    private String sendResult;
    
    /**
     * 消费时间
     */
    private LocalDateTime consumeTime;
    
    /**
     * 消费状态:0-待消费,1-消费成功,2-消费失败
     */
    private Integer consumeStatus;
    
    /**
     * 消费结果
     */
    private String consumeResult;
    
    /**
     * 重试次数
     */
    private Integer reconsumeTimes;
    
    /**
     * 创建时间
     */
    private LocalDateTime createTime;
    
    /**
     * 更新时间
     */
    private LocalDateTime updateTime;
}

Mapper 接口:

package com.jam.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.entity.MessageTrace;
import org.apache.ibatis.annotations.Param;

/**
 * 消息轨迹Mapper
 *
 * @author 果酱
 */
public interface MessageTraceMapper extends BaseMapper {
    /**
     * 根据消息ID查询消息轨迹
     *
     * @param messageId 消息ID
     * @return 消息轨迹信息
     */
    MessageTrace selectByMessageId(@Param("messageId") String messageId);
}

Mapper XML 文件(resources/mapper/MessageTraceMapper.xml):


        SELECT * FROM t_message_trace WHERE message_id = #{messageId}
    

5.3 创建消息轨迹服务

package com.jam.service;

import com.jam.entity.MessageEntity;
import com.jam.entity.MessageTrace;
import com.jam.mapper.MessageTraceMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;

/**
 * 消息轨迹服务
 * 记录消息的发送和消费轨迹
 *
 * @author 果酱
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageTraceService {
    private final MessageTraceMapper messageTraceMapper;
    
    /**
     * 记录消息发送前的轨迹
     *
     * @param message 消息实体
     * @param topic 消息主题
     * @param tag 消息标签
     * @return 消息轨迹ID
     */
    @Transactional(rollbackFor = Exception.class)
    public Long recordBeforeSend(MessageEntity message, String topic, String tag) {
        Objects.requireNonNull(message, "消息实体不能为空");
        StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
        StringUtils.hasText(topic, "消息主题不能为空");
        
        MessageTrace trace = new MessageTrace();
        trace.setMessageId(message.getMessageId());
        trace.setTopic(topic);
        trace.setTag(tag);
        trace.setBusinessType(message.getBusinessType());
        trace.setContent(message.getContent());
        trace.setSendStatus(0); // 待发送
        trace.setReconsumeTimes(0);
        trace.setCreateTime(LocalDateTime.now());
        trace.setUpdateTime(LocalDateTime.now());
        
        messageTraceMapper.insert(trace);
        log.info("记录消息发送前轨迹,消息ID:{},轨迹ID:{}", message.getMessageId(), trace.getId());
        return trace.getId();
    }
    
    /**
     * 记录消息发送成功的轨迹
     *
     * @param messageId 消息ID
     * @param sendResult 发送结果
     */
    @Transactional(rollbackFor = Exception.class)
    public void recordSendSuccess(String messageId, SendResult sendResult) {
        StringUtils.hasText(messageId, "消息ID不能为空");
        Objects.requireNonNull(sendResult, "发送结果不能为空");
        
        MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
        if (trace == null) {
            log.warn("未找到消息轨迹,消息ID:{}", messageId);
            return;
        }
        
        trace.setSendTime(LocalDateTime.now());
        trace.setSendStatus(1); // 发送成功
        trace.setSendResult(sendResult.toString());
        trace.setUpdateTime(LocalDateTime.now());
        
        messageTraceMapper.updateById(trace);
        log.info("记录消息发送成功轨迹,消息ID:{}", messageId);
    }
    
    /**
     * 记录消息发送失败的轨迹
     *
     * @param messageId 消息ID
     * @param e 异常信息
     */
    @Transactional(rollbackFor = Exception.class)
    public void recordSendFailure(String messageId, Exception e) {
        StringUtils.hasText(messageId, "消息ID不能为空");
        Objects.requireNonNull(e, "异常信息不能为空");
        
        MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
        if (trace == null) {
            log.warn("未找到消息轨迹,消息ID:{}", messageId);
            return;
        }
        
        trace.setSendTime(LocalDateTime.now());
        trace.setSendStatus(2); // 发送失败
        trace.setSendResult(e.getMessage());
        trace.setUpdateTime(LocalDateTime.now());
        
        messageTraceMapper.updateById(trace);
        log.info("记录消息发送失败轨迹,消息ID:{}", messageId);
    }
    
    /**
     * 记录消息消费成功的轨迹
     *
     * @param message 消息实体
     * @param reconsumeTimes 重试次数
     */
    @Transactional(rollbackFor = Exception.class)
    public void recordConsumeSuccess(MessageEntity message, int reconsumeTimes) {
        Objects.requireNonNull(message, "消息实体不能为空");
        StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
        
        MessageTrace trace = messageTraceMapper.selectByMessageId(message.getMessageId());
        if (trace == null) {
            log.warn("未找到消息轨迹,消息ID:{}", message.getMessageId());
            return;
        }
        
        trace.setConsumeTime(LocalDateTime.now());
        trace.setConsumeStatus(1); // 消费成功
        trace.setReconsumeTimes(reconsumeTimes);
        trace.setUpdateTime(LocalDateTime.now());
        
        messageTraceMapper.updateById(trace);
        log.info("记录消息消费成功轨迹,消息ID:{},重试次数:{}", 
                message.getMessageId(), reconsumeTimes);
    }
    
    /**
     * 记录消息消费失败的轨迹
     *
     * @param message 消息实体
     * @param e 异常信息
     * @param reconsumeTimes 重试次数
     */
    @Transactional(rollbackFor = Exception.class)
    public void recordConsumeFailure(MessageEntity message, Exception e, int reconsumeTimes) {
        Objects.requireNonNull(message, "消息实体不能为空");
        StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
        Objects.requireNonNull(e, "异常信息不能为空");
        
        MessageTrace trace = messageTraceMapper.selectByMessageId(message.getMessageId());
        if (trace == null) {
            log.warn("未找到消息轨迹,消息ID:{}", message.getMessageId());
            return;
        }
        
        trace.setConsumeTime(LocalDateTime.now());
        trace.setConsumeStatus(2); // 消费失败
        trace.setConsumeResult(e.getMessage());
        trace.setReconsumeTimes(reconsumeTimes);
        trace.setUpdateTime(LocalDateTime.now());
        
        messageTraceMapper.updateById(trace);
        log.info("记录消息消费失败轨迹,消息ID:{},重试次数:{}", 
                message.getMessageId(), reconsumeTimes);
    }
}

5.4 在生产者和消费者中使用消息轨迹服务

修改消息生产者,添加消息轨迹记录:

/**
 * 发送同步消息(带轨迹记录)
 */
public SendResult sendSyncMessageWithTrace(String topic, String tag, String content, 
                                         String businessType, String extra) {
    // 参数校验
    StringUtils.hasText(topic, "消息主题不能为空");
    StringUtils.hasText(content, "消息内容不能为空");
    
    // 构建完整的主题标签字符串
    String destination = buildDestination(topic, tag);
    
    // 创建消息实体
    MessageEntity messageEntity = new MessageEntity();
    messageEntity.setMessageId(UUID.randomUUID().toString());
    messageEntity.setContent(content);
    messageEntity.setBusinessType(businessType);
    messageEntity.setCreateTime(LocalDateTime.now());
    messageEntity.setExtra(extra);
    
    // 记录消息发送前的轨迹
    messageTraceService.recordBeforeSend(messageEntity, topic, tag);
    
    // 构建Spring的Message对象
    Message message = MessageBuilder
            .withPayload(messageEntity)
            .build();
    
    log.info("发送同步消息,主题:{},消息ID:{}", destination, messageEntity.getMessageId());
    
    try {
        // 发送同步消息
        SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
        // 记录发送成功轨迹
        messageTraceService.recordSendSuccess(messageEntity.getMessageId(), sendResult);
        log.info("同步消息发送完成,结果:{}", sendResult);
        return sendResult;
    } catch (Exception e) {
        // 记录发送失败轨迹
        messageTraceService.recordSendFailure(messageEntity.getMessageId(), e);
        log.error("同步消息发送失败,消息ID:{}", messageEntity.getMessageId(), e);
        throw e;
    }
}

修改消息消费者,添加消息轨迹记录:

@Override
public void onMessage(MessageEntity message) {
    // 获取重试次数
    MessageHeaders headers = MessageHeaderAccessor.getAccessor(message, MessageHeaders.class);
    int reconsumeTimes = headers.get("RECONSUME_TIMES", Integer.class) == null ? 
            0 : headers.get("RECONSUME_TIMES", Integer.class);
    
    // 打印接收到的消息
    log.info("接收到消息,消息ID:{},内容:{},业务类型:{},重试次数:{}",
            message.getMessageId(),
            message.getContent(),
            message.getBusinessType(),
            reconsumeTimes);
    
    try {
        // 处理消息的业务逻辑
        processMessage(message);
        
        // 记录消费成功轨迹
        messageTraceService.recordConsumeSuccess(message, reconsumeTimes);
        
        // 如果没有抛出异常,RocketMQ会认为消息消费成功
        log.info("消息处理成功,消息ID:{}", message.getMessageId());
    } catch (Exception e) {
        // 记录消费失败轨迹
        messageTraceService.recordConsumeFailure(message, e, reconsumeTimes);
        
        // 如果抛出异常,RocketMQ会根据重试策略进行消息重试
        log.error("消息处理失败,消息ID:{}", message.getMessageId(), e);
        throw new RuntimeException("消息处理失败", e);
    }
}

六、RocketMQ 性能调优

为了让 RocketMQ 在生产环境中发挥最佳性能,我们需要进行合理的调优。以下是一些关键的调优方向:

6.1 Broker 调优

  1. 内存配置
    根据服务器内存大小合理配置 Broker 的 JVM 参数,避免内存不足或浪费

    plaintext

    # 推荐配置(8G内存服务器)
    -Xms4g -Xmx4g -Xmn2g
    
  2. 刷盘策略
    根据业务对消息可靠性的要求选择合适的刷盘策略

    properties

    # 同步刷盘(可靠性高,性能较低)
    flushDiskType = SYNC_FLUSH
    # 异步刷盘(性能高,可能丢失少量消息)
    flushDiskType = ASYNC_FLUSH
    
  3. 文件存储

    • 尽量将 Broker 的数据存储在 SSD 上,提高 IO 性能
    • 配置合适的映射文件大小和数量

    properties

    # 单个映射文件大小,默认1G
    mapedFileSizeCommitLog=1073741824
    # ConsumeQueue每个文件包含的条目数
    mapedFileSizeConsumeQueue=300000
    
  4. 消息清理
    配置合理的消息保留时间,避免磁盘空间耗尽

    properties

    # 消息保留时间,单位小时,默认72小时
    fileReservedTime=72
    # 磁盘空间阈值,低于此值将删除过期消息
    diskMaxUsedSpaceRatio=88
    

6.2 生产者调优

  1. 批量发送
    对于大量小消息,采用批量发送可以显著提高吞吐量

    /**
     * 批量发送消息
     */
    public SendResult sendBatchMessages(String topic, List messages) {
        StringUtils.hasText(topic, "消息主题不能为空");
        if (CollectionUtils.isEmpty(messages)) {
            throw new IllegalArgumentException("消息列表不能为空");
        }
        
        log.info("批量发送消息,主题:{},消息数量:{}", topic, messages.size());
        
        // 构建批量消息
        List> messageList = messages.stream()
                .map(msg -> MessageBuilder.withPayload(msg).build())
                .collect(Collectors.toList());
        
        // 发送批量消息
        SendResult sendResult = rocketMQTemplate.syncSend(topic, messageList);
        
        log.info("批量消息发送完成,结果:{}", sendResult);
        return sendResult;
    }
    
  2. 压缩消息
    对于大消息,启用压缩可以减少网络传输和存储开销

    properties

    # 在application.yml中配置
    rocketmq:
      producer:
        # 消息压缩阈值,超过此值则进行压缩,单位字节
        compress-message-body-threshold: 4096
    
  3. 异步发送
    非关键路径的消息采用异步发送,避免阻塞主线程

6.3 消费者调优

  1. 消费线程池
    根据业务处理能力配置合理的消费线程池大小

    @RocketMQMessageListener(
            consumerGroup = "optimize_consumer_group",
            topic = "optimize_topic",
            // 消费线程池核心线程数
            consumeThreadMin = 20,
            // 消费线程池最大线程数
            consumeThreadMax = 64
    )
    public class OptimizeConsumer implements RocketMQListener {
        // 实现代码...
    }
    
  2. 批量消费
    开启批量消费可以提高消费效率

    @RocketMQMessageListener(
            consumerGroup = "batch_consumer_group",
            topic = "batch_topic",
            // 开启批量消费
            consumeMode = ConsumeMode.BATCH,
            // 批量消费的最大消息数
            batchMaxSize = 32
    )
    public class BatchConsumer implements RocketMQListener> {
        @Override
        public void onMessage(List messages) {
            log.info("接收到批量消息,数量:{}", messages.size());
            // 批量处理消息
            for (MessageEntity message : messages) {
                // 处理单条消息
            }
        }
    }
    
  3. 消费模式
    根据业务需求选择合适的消费模式

    @RocketMQMessageListener(
            consumerGroup = "consume_mode_consumer_group",
            topic = "mode_topic",
            // 消费模式:ORDERLY(有序消费)或CONCURRENTLY(并发消费)
            consumeMode = ConsumeMode.CONCURRENTLY
    )
    public class ConsumeModeConsumer implements RocketMQListener {
        // 实现代码...
    }
    

     

    • 并发消费:吞吐量高,但消息可能乱序
    • 有序消费:保证消息顺序,但吞吐量较低

七、常见问题与解决方案

7.1 消息丢失问题

消息丢失可能发生在三个阶段:生产阶段、存储阶段和消费阶段。

  1. 生产阶段丢失

    • 解决方案:使用同步发送,并处理发送失败的情况
    // 发送消息并处理失败情况
    try {
        SendResult result = rocketMQTemplate.syncSend(topic, message);
        // 检查发送状态
        if (result.getSendStatus() != SendStatus.SEND_OK) {
            // 发送状态异常,进行处理
            log.error("消息发送状态异常:{}", result);
            // 可以记录到数据库进行重试
        }
    } catch (Exception e) {
        log.error("消息发送异常", e);
        // 记录到数据库进行重试
    }
    
  2. 存储阶段丢失

    • 解决方案:配置 Broker 为同步刷盘和主从同步

    properties

    # 同步刷盘
    flushDiskType=SYNC_FLUSH
    # 主从同步
    brokerRole=SYNC_MASTER
    
  3. 消费阶段丢失

    • 解决方案:确保消息处理完成后再提交 offset
    @Override
    public void onMessage(MessageEntity message) {
        try {
            // 处理消息
            processMessage(message);
            // 处理完成后,不抛出异常,RocketMQ会自动提交offset
        } catch (Exception e) {
            // 处理失败,抛出异常,RocketMQ不会提交offset
            log.error("消息处理失败", e);
            throw e;
        }
    }
    

7.2 消息重复消费

消息重复消费是分布式系统中不可避免的问题,我们需要保证消息消费的幂等性。

  1. 基于数据库唯一索引

    /**
     * 处理消息(幂等性保证)
     */
    @Transactional(rollbackFor = Exception.class)
    public void processMessageWithIdempotency(MessageEntity message) {
        String messageId = message.getMessageId();
        
        // 检查消息是否已经处理过
        MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
        if (trace != null && trace.getConsumeStatus() == 1) {
            log.info("消息已经处理过,消息ID:{}", messageId);
            return;
        }
        
        // 处理消息业务逻辑
        // ...
        
        // 记录消息处理状态(通过数据库唯一索引保证幂等)
        if (trace == null) {
            // 新增记录
            // ...
        } else {
            // 更新记录
            // ...
        }
    }
    
  2. 基于 Redis 的分布式锁

    /**
     * 使用Redis分布式锁保证幂等性
     */
    public void processMessageWithRedisLock(MessageEntity message) {
        String messageId = message.getMessageId();
        String lockKey = "message:process:" + messageId;
        
        // 获取分布式锁
        Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES);
        
        if (Boolean.TRUE.equals(locked)) {
            try {
                // 检查消息是否已经处理过
                // ...
                
                // 处理消息业务逻辑
                // ...
                
            } finally {
                // 释放锁
                redisTemplate.delete(lockKey);
            }
        } else {
            log.info("消息正在处理中,消息ID:{}", messageId);
        }
    }
    

7.3 消息堆积问题

消息堆积通常是因为消费速度跟不上生产速度,解决方案如下:

  1. 优化消费逻辑

    • 减少单次消息处理时间
    • 异步处理非关键流程
  2. 增加消费者实例

    • 水平扩展消费者集群
    • 确保消费者数量不超过 Topic 的 Queue 数量
  3. 使用批量消费

    • 开启批量消费,提高消费效率
  4. 临时扩容

    • 对于突发流量,可以临时启动更多的消费者实例
  5. 监控告警

    • 配置消息堆积监控和告警,及时发现问题
    /**
     * 消息堆积监控
     */
    @Scheduled(fixedRate = 60000) // 每分钟检查一次
    public void monitorMessage堆积() {
        // 获取消息队列的offset信息
        // ...
        
        // 计算消息堆积量
        long堆积Count = 总消息数 - 已消费消息数;
        
        // 如果堆积量超过阈值,发送告警
        if (堆积Count > 10000) {
            log.warn("消息堆积严重,堆积数量:{}", 堆积Count);
            // 发送告警通知(邮件、短信等)
            alertService.sendAlert("消息堆积告警", "堆积数量:" + 堆积Count);
        }
    }
    

八、总结

本文详细介绍了 SpringBoot 集成 RocketMQ 的全过程,从基础概念到高级特性,从代码实现到性能调优,涵盖了实际开发中可能遇到的各种场景。

    九、参考资料

    1. RocketMQ 核心概念与架构:参考 Apache RocketMQ 官方文档(https://rocketmq.apache.org/docs/concepts/
    2. SpringBoot 集成 RocketMQ:参考 RocketMQ Spring Boot Starter 官方文档(GitHub – apache/rocketmq-spring: Apache RocketMQ Spring Integration
    3. 事务消息实现原理:参考《RocketMQ 技术内幕》一书及官方文档(https://rocketmq.apache.org/docs/transaction-example/
    4. 消息重试机制:参考 RocketMQ 官方文档(https://rocketmq.apache.org/docs/retry-mechanism/
    5. 性能调优参数:参考 RocketMQ 官方文档(https://rocketmq.apache.org/docs/performance-tuning/
    6. 消息丢失与重复消费解决方案:参考阿里中间件团队技术博客及《分布式系统原理与实践》一书

    文章来源于互联网:从 0 到 1 精通 SpringBoot 集成 RocketMQ

    相关推荐: AI人工智能领域,文心一言的技术架构

    文心一言的技术架构解析 关键词:文心一言、大语言模型、Transformer架构、知识增强、ERNIE、深度学习、自然语言处理 摘要:本文深入解析百度文心一言的技术架构,从其底层的大语言模型基础到顶层的应用实现。我们将详细探讨文心一言的核心技术原理,包括其基于…

    赞(0)
    未经允许不得转载:5bei.cn大模型教程网 » 从 0 到 1 精通 SpringBoot 集成 RocketMQ
    分享到: 更多 (0)

    AI大模型,我们的未来

    小欢软考联系我们