Apache Pulsar消息队列使用

前言

最近在项目上需要引用到mq消息队列,尤其是支持延时消息的队列,首先延迟消息的方法有很多,比如redis的键过期监听是我用过的,用户量小,所以到时美出现过什么问题,但是这个方法是不推荐的,因为redis也说明了这个不是稳定的,一定会推送的,而且这种方法不稳定,比如网络问题正好丢失了等等,所以研究了一下支持延时消息的消息队列,因为项目中也需要用到消息队列,所以对比了下几大mq中间件,最终选择了apache pulsar,简单,支持秒级别(分钟、小时、天)的延时消息,测试了一下效率都挺不多,毕竟大厂都在用,记录一下使用过程

介绍

Pulsar是一个用于服务端到服务端的消息中间件,具有多租户、高性能等优势。Pulsar最初由Yahoo开发,目前由Apache软件基金会管理。Pulsar采用发布-订阅的设计模式,Producer发布消息到Topic,Consumer订阅Topic、处理Topic中的消息。

特性
  • Pulsar的单个实例原生支持集群。
  • 极低的发布延迟和端到端延迟。
  • 可无缝扩展到超过一百万个Topic。
  • 简单易用的客户端API,支持Java、Go、Python和C++。
  • 支持多种Topic订阅模式(独占订阅、共享订阅、故障转移订阅)。
  • 通过Apache BookKeeper提供的持久化消息存储机制保证消息传递。
安装

docker安装非常方便,所以选择docker安装

1
2
3
4
5
6
7
docker run -it -p 6650:6650  -p 8091:8080  \ 
--mount source=pulsardata,target=/data/pulsar/data \
--mount source=pulsarconf,target=/data/pulsar/conf \
-e advertisedAddress=pulsar-standalone \
--name pulsar-standalone \
--hostname pulsar-standalone \ apachepulsar/pulsar:2.8.0 \
sh -c "bin/apply-config-from-env.py conf/standalone.conf&&bin/pulsar standalone -nfw -nss"

接着安装ui可视化组建(非必须)

1
2
3
4
docker run -d -it --name pulsar-manager\    
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \ --link pulsar-standalone \
apachepulsar/pulsar-manager:v0.3.0

下载完成后运行pulsar-manager容器,从9527端口访问Web页面;

运行成功后,需要创建管理员账号,这里创建账号为admin:apachepulsar,使用curl命名请求接口创建用户

1
2
3
4
5
6
7
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token) 

curl \ -H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'

返回成功后,在页面输入刚创建的用户名和密码,进人可视化管理后台:

new environment service url里输入 你的服务地址 http://localhost:8091,就可以了,后面可以在这里看,如果不需要可以不安装

springboot中使用pulsar

Pulsar结合SpringBoot使用也是非常简单的,我们可以使用Pulsar官方的Java SDK,也可以使用第三方的SpringBoot Starter。这里使用Starter,非常简单!

首先在pom.xml中添加Pulsar相关依赖;

1
2
3
4
5
6
<!--SpringBoot整合Pulsar-->
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>1.1.2</version>
</dependency>

然后在application.yml中添加Pulsar的Service URL配置

1
2
pulsar:
service-url: pulsar://121.xxx.212.xx:6650

再添加Pulsar的Java配置,声明两个Topic,并确定好发送的消息类型

1
2
3
4
5
6
7
8
9
@Configuration
public class PulsarConfig {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
.addProducer("bootTopic", MessageDto.class)
.addProducer("topic_test", String.class);
}
}

创建Pulsar生产者,往Topic中发送消息,Pulsar是支持直接发送消息对象的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//引入PulsarTemplate模板工具类
private final PulsarTemplate pulsarTemplate;

//同步发送消息
pulsarTemplate.send("topic_test", "hello pulsar")

//异步发送消息
pulsarTemplate.sendAsync("topic_test", "hello pulsar");

//发送延时消息 deliverAfter 延时多久发送
pulsarTemplate.createMessage("topic_test", "hello pulsar").deliverAfter(10, TimeUnit.SECONDS).send();

//发送延时消息 deliverAt 在指定的时间戳发送
pulsarTemplate.createMessage("topic_test", "hello pulsar").deliverAt(1682123456712L).send()
订阅消息
1
2
3
4
5
6
7
8
9
10
11
12

//PulsarConsumer 订阅注解,topic指定要订阅的话题,clazz指定消息类型,subscriptionType是订阅类型,支持以下四种类型
/**
* Exclusive:独占模式,同一个 Topic 只能有一个消费者,如果多个消费者,就会出错。
* Failover:灾备模式,同一个 Topic 可以有多个消费者,但是只能有一个消费者消费,其他消费者作为故障转移备用,如果当前消费者出了故障,就从备用消费者中选择一个进行消费
* Shared:共享模式,同一个 Topic 可以由多个消费者订阅和消费。消息通过 round robin 轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开,如果发送给它消息没有被消费,这些消息会被重新分发给其它存活的消费者
* Key_Shared:消息和消费者都会绑定一个key,消息只会发送给绑定同一个key的消费者。如果有新消费者建立连接或者有消费者断开连接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好处是既可以让消费者并发地消费消息,又能保证同一Key下的消息顺序
*/
@PulsarConsumer(topic="topic_test", clazz= String.class, subscriptionType = SubscriptionType.Shared)
public void consume(String message) {
log.info("PulsarRealConsumer consume User: {}", message);
}
注意

特别注意,如果需要发送延时消息,则订阅者必须设置subscriptionType,否则延时设置无效,均为立即发送

广播

如果要发送广播消息,即多个消费者订阅同一个topic,都想同时消费,需要进行以下配置,在消费者的注解里添加subscriptionName 参数,即订阅名称,设置不同的订阅名称,还有就是subscriptionType = SubscriptionType.Exclusive 消费类型需要设置成独占模式,如果消费者是集群模式,一个消费后另一个不能消费的话,那就是 subscriptionName 要相同,但是 subscriptionType 要用shared 模式