1.下载安装;
https://developer.aliyun.com/article/1479156
RocketMQ 使用笔记 (Markdown 版本)
2. 简单测试(bin 目录)
2.1 发送消息(Producer)
1
| tools.cmd org.apache.rocketmq.example.quickstart.Producer
|
2.2 接收消息(Consumer)
1
| tools.cmd org.apache.rocketmq.example.quickstart.Consumer
|
3. QuickStart(普通 Maven 项目)
3.1 导入依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>5.1.4</version> </dependency>
|
3.2 构建生产者
1 2 3 4 5 6
| DefaultMQProducer producer = new DefaultMQProducer("group1"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message message = new Message("topic1", "tag1", "hello xiangxiang".getBytes()); SendResult sendResult = producer.send(message); producer.shutdown();
|
3.3 构建消费者
1 2 3 4 5 6 7 8 9 10
| DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("topic1", "*"); consumer.registerMessageListener((msgs, ctx) -> { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start();
|
4. 多消费者组模式
- 同组:平均消费(负载均衡)
- 不同组:每组都能收到所有消息
- IDEA 允许多实例运行需在配置处勾选
4.1 消费模式
- 广播模式:
MessageModel.BROADCASTING(每个消费者都收到)
- 集群模式:
MessageModel.CLUSTERING(默认,负载均衡)
5. 消息类别
5.1 同步消息
发送后等待回执:producer.send(message)
5.2 异步消息
1 2 3 4
| producer.send(message, new SendCallback(){ public void onSuccess(SendResult r){} public void onException(Throwable t){} });
|
不能关闭生产者。
5.3 单向消息
1
| producer.sendOneway(message);
|
5.4 延时消息
1 2
| message.setDelayTimeLevel(3); producer.send(message);
|
5.5 批量消息
1 2
| List<Message> list = Arrays.asList(msg1, msg2); producer.send(list);
|
6. 消息过滤
6.1 Tag 过滤
1
| consumer.subscribe("topic1", "tag1 || tag2");
|
6.2 SQL 过滤
Broker broker.conf 启用:enablePropertyFilter=true
1 2
| message.putUserProperty("age", "18"); consumer.subscribe("topic2", MessageSelector.bySql("age > 16"));
|
7. Spring Boot 生产者
7.1 依赖
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.4</version> </dependency>
|
7.2 配置
1 2 3 4 5 6
| rocketmq: name-server: localhost:9876 producer: group: group1 server: port: 8989
|
7.3 使用
1
| rocketMQTemplate.convertAndSend("topic1", msg);
|
8. Spring Boot 消费者
1 2 3 4
| @RocketMQMessageListener(topic="topic1", selectorExpression="tag1", consumerGroup="group1") public class DemoConsumer implements RocketMQListener<String>{ public void onMessage(String s){ System.out.println(s); } }
|
9. Spring Boot 发送各种消息
- 同步:
syncSend
- 异步:
asyncSend
- 单向:
sendOneWay
- 延时:
syncSend(..., timeout, delayLevel)
- 批量:
syncSend(topic, msgList, timeout)
10. Spring Boot 消费者过滤与模式
- Tag 过滤:
selectorExpression="tag1"
- SQL 过滤:
selectorType = SQL92
- 广播模式:
messageModel = MessageModel.BROADCASTING
11. 消息顺序
(此处可补充:顺序消息需使用顺序生产者,发送到同一队列)
12. 事务消息
12.1 事务生产者
1 2 3 4 5 6 7 8 9 10 11
| TransactionMQProducer producer = new TransactionMQProducer("group1"); producer.setTransactionListener(new TransactionListener(){ public LocalTransactionState executeLocalTransaction(Message msg, Object arg){ return LocalTransactionState.COMMIT_MESSAGE; } public LocalTransactionState checkLocalTransaction(MessageExt msg){ return LocalTransactionState.COMMIT_MESSAGE; } }); producer.start(); TransactionSendResult result = producer.sendMessageInTransaction(message, null);
|
13. 主从架构