前言
最近在项目上需要引用到mq消息队列,尤其是支持延时消息的队列,首先延迟消息的方法有很多,比如redis的键过期监听是我用过的,用户量小,所以到时美出现过什么问题,但是这个方法是不推荐的,因为redis也说明了这个不是稳定的,一定会推送的,而且这种方法不稳定,比如网络问题正好丢失了等等,所以研究了一下支持延时消息的消息队列,因为项目中也需要用到消息队列,所以对比了下几大mq中间件,最终选择了apache pulsar,简单,支持秒级别(分钟、小时、天)的延时消息,测试了一下效率都挺不多,毕竟大厂都在用,记录一下使用过程
介绍
Pulsar是一个用于服务端到服务端的消息中间件,具有多租户、高性能等优势。Pulsar最初由Yahoo开发,目前由Apache软件基金会管理。Pulsar采用发布-订阅的设计模式,Producer发布消息到Topic,Consumer订阅Topic、处理Topic中的消息。
特性
- Pulsar的单个实例原生支持集群。
- 极低的发布延迟和端到端延迟。
- 可无缝扩展到超过一百万个Topic。
- 简单易用的客户端API,支持Java、Go、Python和C++。
- 支持多种Topic订阅模式(独占订阅、共享订阅、故障转移订阅)。
- 通过Apache BookKeeper提供的持久化消息存储机制保证消息传递。
安装
docker安装非常方便,所以选择docker安装
1 2 3 4 5 6 7
| docker run -it -p 6650:6650 -p 8091:8080 \ --mount source=pulsardata,target=/data/pulsar/data \ --mount source=pulsarconf,target=/data/pulsar/conf \ -e advertisedAddress=pulsar-standalone \ --name pulsar-standalone \ --hostname pulsar-standalone \ apachepulsar/pulsar:2.8.0 \ sh -c "bin/apply-config-from-env.py conf/standalone.conf&&bin/pulsar standalone -nfw -nss"
|
接着安装ui可视化组建(非必须)
1 2 3 4
| docker run -d -it --name pulsar-manager\ -p 9527:9527 -p 7750:7750 \ -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \ --link pulsar-standalone \ apachepulsar/pulsar-manager:v0.3.0
|
下载完成后运行pulsar-manager容器,从9527端口访问Web页面;
运行成功后,需要创建管理员账号,这里创建账号为admin:apachepulsar,使用curl命名请求接口创建用户
1 2 3 4 5 6 7
| CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \ -H "X-XSRF-TOKEN: $CSRF_TOKEN" \ -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \ -H 'Content-Type: application/json' \ -X PUT http://localhost:7750/pulsar-manager/users/superuser \ -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
|
返回成功后,在页面输入刚创建的用户名和密码,进人可视化管理后台:
new environment
service url里输入 你的服务地址 http://localhost:8091,就可以了,后面可以在这里看,如果不需要可以不安装
springboot中使用pulsar
Pulsar结合SpringBoot使用也是非常简单的,我们可以使用Pulsar官方的Java SDK,也可以使用第三方的SpringBoot Starter。这里使用Starter,非常简单!
首先在pom.xml
中添加Pulsar相关依赖;
1 2 3 4 5 6
| <dependency> <groupId>io.github.majusko</groupId> <artifactId>pulsar-java-spring-boot-starter</artifactId> <version>1.1.2</version> </dependency>
|
然后在application.yml
中添加Pulsar的Service URL
配置
1 2
| pulsar: service-url: pulsar://121.xxx.212.xx:6650
|
再添加Pulsar的Java配置,声明两个Topic,并确定好发送的消息类型
1 2 3 4 5 6 7 8 9
| @Configuration public class PulsarConfig { @Bean public ProducerFactory producerFactory() { return new ProducerFactory() .addProducer("bootTopic", MessageDto.class) .addProducer("topic_test", String.class); } }
|
创建Pulsar生产者,往Topic中发送消息,Pulsar是支持直接发送消息对象的
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private final PulsarTemplate pulsarTemplate;
pulsarTemplate.send("topic_test", "hello pulsar")
pulsarTemplate.sendAsync("topic_test", "hello pulsar");
pulsarTemplate.createMessage("topic_test", "hello pulsar").deliverAfter(10, TimeUnit.SECONDS).send();
pulsarTemplate.createMessage("topic_test", "hello pulsar").deliverAt(1682123456712L).send()
|
订阅消息
1 2 3 4 5 6 7 8 9 10 11 12
|
@PulsarConsumer(topic="topic_test", clazz= String.class, subscriptionType = SubscriptionType.Shared) public void consume(String message) { log.info("PulsarRealConsumer consume User: {}", message); }
|
注意
特别注意,如果需要发送延时消息,则订阅者必须设置subscriptionType,否则延时设置无效,均为立即发送
广播
如果要发送广播消息,即多个消费者订阅同一个topic,都想同时消费,需要进行以下配置,在消费者的注解里添加subscriptionName 参数,即订阅名称,设置不同的订阅名称,还有就是subscriptionType = SubscriptionType.Exclusive 消费类型需要设置成独占模式,如果消费者是集群模式,一个消费后另一个不能消费的话,那就是 subscriptionName 要相同,但是 subscriptionType 要用shared 模式