1.下载安装;
https://developer.aliyun.com/article/1479156

RocketMQ 使用笔记 (Markdown 版本)

2. 简单测试(bin 目录)

2.1 发送消息(Producer)

1
tools.cmd org.apache.rocketmq.example.quickstart.Producer

2.2 接收消息(Consumer)

1
tools.cmd org.apache.rocketmq.example.quickstart.Consumer

3. QuickStart(普通 Maven 项目)

3.1 导入依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.4</version>
</dependency>

3.2 构建生产者

1
2
3
4
5
6
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("topic1", "tag1", "hello xiangxiang".getBytes());
SendResult sendResult = producer.send(message);
producer.shutdown();

3.3 构建消费者

1
2
3
4
5
6
7
8
9
10
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic1", "*");
consumer.registerMessageListener((msgs, ctx) -> {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();

4. 多消费者组模式

  • 同组:平均消费(负载均衡)
  • 不同组:每组都能收到所有消息
  • IDEA 允许多实例运行需在配置处勾选

4.1 消费模式

  • 广播模式MessageModel.BROADCASTING(每个消费者都收到)
  • 集群模式MessageModel.CLUSTERING(默认,负载均衡)

5. 消息类别

5.1 同步消息

发送后等待回执:producer.send(message)

5.2 异步消息

1
2
3
4
producer.send(message, new SendCallback(){
public void onSuccess(SendResult r){}
public void onException(Throwable t){}
});

不能关闭生产者。

5.3 单向消息

1
producer.sendOneway(message);

5.4 延时消息

1
2
message.setDelayTimeLevel(3);
producer.send(message);

5.5 批量消息

1
2
List<Message> list = Arrays.asList(msg1, msg2);
producer.send(list);

6. 消息过滤

6.1 Tag 过滤

1
consumer.subscribe("topic1", "tag1 || tag2");

6.2 SQL 过滤

Broker broker.conf 启用:enablePropertyFilter=true

1
2
message.putUserProperty("age", "18");
consumer.subscribe("topic2", MessageSelector.bySql("age > 16"));

7. Spring Boot 生产者

7.1 依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.4</version>
</dependency>

7.2 配置

1
2
3
4
5
6
rocketmq:
name-server: localhost:9876
producer:
group: group1
server:
port: 8989

7.3 使用

1
rocketMQTemplate.convertAndSend("topic1", msg);

8. Spring Boot 消费者

1
2
3
4
@RocketMQMessageListener(topic="topic1", selectorExpression="tag1", consumerGroup="group1")
public class DemoConsumer implements RocketMQListener<String>{
public void onMessage(String s){ System.out.println(s); }
}

9. Spring Boot 发送各种消息

  • 同步syncSend
  • 异步asyncSend
  • 单向sendOneWay
  • 延时syncSend(..., timeout, delayLevel)
  • 批量syncSend(topic, msgList, timeout)

10. Spring Boot 消费者过滤与模式

  • Tag 过滤:selectorExpression="tag1"
  • SQL 过滤:selectorType = SQL92
  • 广播模式:messageModel = MessageModel.BROADCASTING

11. 消息顺序

(此处可补充:顺序消息需使用顺序生产者,发送到同一队列)


12. 事务消息

12.1 事务生产者

1
2
3
4
5
6
7
8
9
10
11
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setTransactionListener(new TransactionListener(){
public LocalTransactionState executeLocalTransaction(Message msg, Object arg){
return LocalTransactionState.COMMIT_MESSAGE;
}
public LocalTransactionState checkLocalTransaction(MessageExt msg){
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
TransactionSendResult result = producer.sendMessageInTransaction(message, null);

13. 主从架构

  • 主从物理机应保持距离,避免同时故障导致不可用。