Spring Boot 集成 RocketMQ 最简使用示例

Spring Boot 集成 RocketMQ 最简使用示例

下面是 Spring Boot 集成 RocketMQ 的最简单使用示例,包含了生产者和消费者的基本配置和使用方法。

1. 添加依赖

pom.xml 中添加 RocketMQ Spring Boot Starter 依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

2. 配置 RocketMQ

application.yml 中添加 RocketMQ 配置:

1
2
3
4
5
6
7
8
9
10
11
12
server:
port: 8080

spring:
application:
name: rocketmq-demo

rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ NameServer 地址
producer:
group: my-producer-group # 生产者组名
send-message-timeout: 3000 # 消息发送超时时间,单位:毫秒

3. 创建消息生产者

创建一个简单的生产者服务类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.example.demo.service;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
* 消息生产者服务
*/
@Service
public class ProducerService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 发送普通消息
* @param topic 主题
* @param message 消息内容
*/
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
System.out.println("消息发送成功,topic:" + topic + ",消息内容:" + message);
}

/**
* 发送同步消息
* @param topic 主题
* @param message 消息内容
*/
public void sendSyncMessage(String topic, String message) {
rocketMQTemplate.syncSend(topic, message);
System.out.println("同步消息发送成功,topic:" + topic + ",消息内容:" + message);
}
}

4. 创建消息消费者

创建一个简单的消费者类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.example.demo.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* 消息消费者
*/
@Component
@RocketMQMessageListener(
topic = "test-topic", // 监听的主题
consumerGroup = "my-consumer-group" // 消费者组名
)
public class MessageConsumer implements RocketMQListener<String> {

/**
* 消息处理方法
* @param message 接收到的消息
*/
@Override
public void onMessage(String message) {
// 处理接收到的消息
System.out.println("收到消息:" + message);

// 在这里添加业务处理逻辑
}
}

5. 创建测试控制器

创建一个控制器用于测试消息发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.example.demo.controller;

import com.example.demo.service.ProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
* 消息发送测试控制器
*/
@RestController
@RequestMapping("/mq")
public class RocketMQController {

@Autowired
private ProducerService producerService;

/**
* 发送普通消息
* @param message 消息内容
* @return 结果
*/
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
producerService.sendMessage("test-topic", message);
return "消息发送成功:" + message;
}

/**
* 发送同步消息
* @param message 消息内容
* @return 结果
*/
@GetMapping("/sendSync")
public String sendSyncMessage(@RequestParam String message) {
producerService.sendSyncMessage("test-topic", message);
return "同步消息发送成功:" + message;
}
}

6. 应用启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* RocketMQ 示例应用
*/
@SpringBootApplication
public class RocketMQDemoApplication {

public static void main(String[] args) {
SpringApplication.run(RocketMQDemoApplication.class, args);
}
}

7. 测试应用

  1. 启动 RocketMQ 服务(确保 NameServer 和 Broker 已经启动)
  2. 启动 Spring Boot 应用
  3. 访问 http://localhost:8080/mq/send?message=Hello RocketMQ 发送消息
  4. 观察控制台输出,应该能看到生产者发送消息和消费者接收消息的日志

扩展:使用标签(Tag)

如果需要使用 RocketMQ 的标签功能:

1
2
3
4
5
6
7
8
9
// 发送带标签的消息
rocketMQTemplate.convertAndSend("test-topic:tagA", "带标签的消息");

// 消费带标签的消息
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "my-consumer-group",
selectorExpression = "tagA || tagB" // 订阅多个标签
)

日志配置

可以在 application.yml 中添加日志配置,查看更多 RocketMQ 运行信息:

1
2
3
logging:
level:
org.apache.rocketmq: debug

以上就是 Spring Boot 集成 RocketMQ 最简单的使用示例,通过这个基本框架,你可以进一步扩展其他功能,如顺序消息、事务消息等。