Spring Boot 集成 RocketMQ 最简使用示例
下面是 Spring Boot 集成 RocketMQ 的最简单使用示例,包含了生产者和消费者的基本配置和使用方法。
1. 添加依赖
在 pom.xml
中添加 RocketMQ Spring Boot Starter 依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
|
2. 配置 RocketMQ
在 application.yml
中添加 RocketMQ 配置:
1 2 3 4 5 6 7 8 9 10 11 12
| server: port: 8080
spring: application: name: rocketmq-demo
rocketmq: name-server: 127.0.0.1:9876 producer: group: my-producer-group send-message-timeout: 3000
|
3. 创建消息生产者
创建一个简单的生产者服务类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.example.demo.service;
import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service public class ProducerService {
@Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message) { rocketMQTemplate.convertAndSend(topic, message); System.out.println("消息发送成功,topic:" + topic + ",消息内容:" + message); }
public void sendSyncMessage(String topic, String message) { rocketMQTemplate.syncSend(topic, message); System.out.println("同步消息发送成功,topic:" + topic + ",消息内容:" + message); } }
|
4. 创建消息消费者
创建一个简单的消费者类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.example.demo.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Component @RocketMQMessageListener( topic = "test-topic", // 监听的主题 consumerGroup = "my-consumer-group" // 消费者组名 ) public class MessageConsumer implements RocketMQListener<String> {
@Override public void onMessage(String message) { System.out.println("收到消息:" + message); } }
|
5. 创建测试控制器
创建一个控制器用于测试消息发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.example.demo.controller;
import com.example.demo.service.ProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;
@RestController @RequestMapping("/mq") public class RocketMQController {
@Autowired private ProducerService producerService;
@GetMapping("/send") public String sendMessage(@RequestParam String message) { producerService.sendMessage("test-topic", message); return "消息发送成功:" + message; }
@GetMapping("/sendSync") public String sendSyncMessage(@RequestParam String message) { producerService.sendSyncMessage("test-topic", message); return "同步消息发送成功:" + message; } }
|
6. 应用启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.example.demo;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RocketMQDemoApplication {
public static void main(String[] args) { SpringApplication.run(RocketMQDemoApplication.class, args); } }
|
7. 测试应用
- 启动 RocketMQ 服务(确保 NameServer 和 Broker 已经启动)
- 启动 Spring Boot 应用
- 访问
http://localhost:8080/mq/send?message=Hello RocketMQ
发送消息
- 观察控制台输出,应该能看到生产者发送消息和消费者接收消息的日志
扩展:使用标签(Tag)
如果需要使用 RocketMQ 的标签功能:
1 2 3 4 5 6 7 8 9
| rocketMQTemplate.convertAndSend("test-topic:tagA", "带标签的消息");
@RocketMQMessageListener( topic = "test-topic", consumerGroup = "my-consumer-group", selectorExpression = "tagA || tagB" // 订阅多个标签 )
|
日志配置
可以在 application.yml
中添加日志配置,查看更多 RocketMQ 运行信息:
1 2 3
| logging: level: org.apache.rocketmq: debug
|
以上就是 Spring Boot 集成 RocketMQ 最简单的使用示例,通过这个基本框架,你可以进一步扩展其他功能,如顺序消息、事务消息等。