MessageQueue
使用消息队列的优势在于:
- 应用解耦:消费者存活与否不影响生产者,且如果增加消费者,无需修改消费者端的代码(只需让mq多复制一份消息给新消费者)
- 异步提速:生产者一旦发送完消息就可以继续进行下一步业务逻辑
- 削峰填谷: 使用mq来限制消费者消费速度,这样依赖高峰期产生的数据会被积压在mq中,不会对消费者造成巨大压力,而在高峰期过后的一段时间,消费者消费消息的速度依然维持在一个适中的速度(相比平常),这就叫填谷
使用mq自然也带来了劣势:
- 系统可用性降低:一旦mq宕机则整个系统都会故障
- 系统复杂度提高
- 带来一致性问题:假如A系统处理完业务后,,通过mq给B、C、D三个系统发送消息数据,如果B系统处理成功,而C、D系统处理失败,这样就会带来数据一致性问题
- 消息顺序性
- 消息丢失
- 消息一致性
- 消息重复使用
Rocketmq原理
安装与测试
1 | win10下安装所需环境为: |
1、启动NAMESERVER
cmd命令框执行进入至‘MQ文件夹\bin’下(端口9876)
1 | start mqnamesrv.cmd |
2、启动BROKER
1 | start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true |
3、测试官方demo
(1)可以参考官方安装rockermq-dashboard
的文档 Quick Start
(2)打开cmd,在clone后的父目录下运行mvn spring-boot:run
(3)最后浏览器输入 localhost:8080
(端口可改),即可得到以下结果:
单生产者-单消费者
(1)编写生产者
1 | public class Producer { |
(2)编写消费者
1 | public class Consumer { |
多消费者模式
- 让生产者发送多条消息,然后在idea中配置允许多实例启动(Allow Multiple Instances )
1 | //发送20条消息 |
启动两个消费者后,在console可以发现两个消费者各接收到10条消息(消费者处于同一group时,默认处于近似负载均衡模式下),但这样做的前提是两个消费者针对同一个topic来接收消息,且两个消费者隶属的组名Group为同一个
如果设置两个消费者,但让两消费者隶属的组名Group不相等,则每个消费者都会收到20条消息
每个消费者可以设置接收信息的模式:
1 | public enum MessageModel { |
消息类型
同步消息:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功),上述测试所使用的就是同步消息
异步消息:即时性较弱,但需要有回执的消息,例如订单中的某些信息(变相增加了吞吐量)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19//测试异步消息
for (int i= 0; i< 20; i++) {
String msg="--------------hello rocketmq-----------"+i; //String byte[] char[] 相互转换
Message message = new Message("topic2", "tag1", msg.getBytes());
producer.send(message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
public void onException(Throwable throwable) {
System.out.println(throwable);
}
});
}
//一定不能关闭生产者(异步消息相当于开启一个新线程同主线程一起运行),如果主线程先关闭了生产者,那么就无法继续发送消息了!
//producer.shutdown();单向消息:生产者只管发消息,不需要回执,例如日志类消息
1
2
3
4public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
//从源码中可看出单向消息不需要返回值延迟类消息:
1
2
3
4public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
//根据源码可看出可以设置消息延迟时间的级别批量消息:
批量发送消息能显著提高传递小消息的性能,批量消息不能是延时消息发送批量消息如下:
1
2
3
4
5
6
7
8
9
10
11List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));
msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);
SendResult result = producer.send(msgList);
sql过滤
首先修改broker.conf,
(1)在文件末尾添加:
enablPropertyFilter=true
(2)windows下还需要更新broker配置:直接cmd中输入
1
mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
重启broker后,新建两条消息,然后设置sql过滤规则为
age>22
1
2
3
4
5
6
7
8
9
10
11
12
13
14Product.java
String msg1="--------------hello rocketmq 1-----------";
Message message1 = new Message("topic5", "player",msg1.getBytes());
message1.putUserProperty("age","20");
message1.putUserProperty("name","haland");
String msg2="--------------hello rocketmq 2-----------";
Message message2 = new Message("topic5", "player",msg2.getBytes());
message2.putUserProperty("age","24");
message2.putUserProperty("name","mbappe");
Consumer.java
consumer.subscribe("topic5", MessageSelector.bySql("age>22"));
3.测试后发现,最后只有消息msg2会被消费者接收到
Springboot整合
首先导入依赖
1
2
3
4
5
6<!--引入rocketmq依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>修改application.yml文件:
1 | rocketmq: |
- 编写生产者Controller:
1 |
|
在postman输入请求http://localhost:8081/mq/send
后,在网页输出了结果为”success“,以及若干User(username=haland, age=20)接收成功
- 编写消费者:
1 | //容器一启动就会生成对象 |
再次发送localhost:8081/mq/send
后,消费者会接收到消息,并在控制台输出User(username=haland, age=20)接收成功
消息有序处理
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析:在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的(每个队列都开启一个线程)