MessageQueue

使用消息队列的优势在于:

  1. 应用解耦:消费者存活与否不影响生产者,且如果增加消费者,无需修改消费者端的代码(只需让mq多复制一份消息给新消费者)
  2. 异步提速:生产者一旦发送完消息就可以继续进行下一步业务逻辑
  3. 削峰填谷: 使用mq来限制消费者消费速度,这样依赖高峰期产生的数据会被积压在mq中,不会对消费者造成巨大压力,而在高峰期过后的一段时间,消费者消费消息的速度依然维持在一个适中的速度(相比平常),这就叫填谷

使用mq自然也带来了劣势:

  1. 系统可用性降低:一旦mq宕机则整个系统都会故障
  2. 系统复杂度提高
  3. 带来一致性问题:假如A系统处理完业务后,,通过mq给B、C、D三个系统发送消息数据,如果B系统处理成功,而C、D系统处理失败,这样就会带来数据一致性问题
    1. 消息顺序性
    2. 消息丢失
    3. 消息一致性
    4. 消息重复使用

Rocketmq原理


安装与测试

1
2
3
4
win10下安装所需环境为:
jdk 1.8(版本过高可能会无法启动nameserv)
maven
rocketmq 4.4

1、启动NAMESERVER

cmd命令框执行进入至‘MQ文件夹\bin’下(端口9876)

1
start mqnamesrv.cmd

2、启动BROKER

1
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

3、测试官方demo

(1)可以参考官方安装rockermq-dashboard的文档 Quick Start

(2)打开cmd,在clone后的父目录下运行mvn spring-boot:run

(3)最后浏览器输入 localhost:8080(端口可改),即可得到以下结果:

单生产者-单消费者

(1)编写生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
//谁来发?
DefaultMQProducer producer=new DefaultMQProducer("group1");
//生产者只需要发给NameServer
producer.setNamesrvAddr("localhost:9876");
producer.start();
//发送消息
String msg="hello rocketmq-----------"; //String byte[]与char[] 相互转换
Message message = new Message("topic1", "tag1", msg.getBytes());//同时设置该消息属于哪个topic类
SendResult sendResult = producer.send(message);
//输出发送结果
System.out.println(sendResult);
//关闭生产者(shutdown)
producer.shutdown();

}
}

(2)编写消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Consumer {
public static void main(String[] args) throws MQClientException {
//new一个push模式下的consumer
DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("GROUP1");
consumer.setNamesrvAddr("localhost:9876");// 设置nameserver地址(从哪里收消息)
//设置监听哪一个消息队列
consumer.subscribe("topic1","*");
//*表示都监听,subexpression可以进行tag过滤:指定想要接收的topic下特定的tag类里的消息


//业务流程处理-注册监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//写业务逻辑
for (MessageExt messageExt : list) {
System.out.println(messageExt);
byte[] body = messageExt.getBody();
System.out.println(new String(body));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS ;
}
}); //接口不能被new(例如集合) 用匿名内部类
consumer.start();

//消费者不能关闭连接,因为要时刻监听
}

}

多消费者模式

  1. 让生产者发送多条消息,然后在idea中配置允许多实例启动(Allow Multiple Instances )
1
2
3
4
5
6
7
//发送20条消息
for (int i= 0; i< 20; i++) {
String msg="--------------hello rocketmq-----------"+i; //String byte[] char[] 相互转换
Message message = new Message("topic2", "tag1", msg.getBytes());
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
}
  1. 启动两个消费者后,在console可以发现两个消费者各接收到10条消息(消费者处于同一group时,默认处于近似负载均衡模式下),但这样做的前提是两个消费者针对同一个topic来接收消息,且两个消费者隶属的组名Group为同一个

  2. 如果设置两个消费者,但让两消费者隶属的组名Group不相等,则每个消费者都会收到20条消息

  3. 每个消费者可以设置接收信息的模式:

1
2
3
public enum MessageModel {
BROADCASTING("BROADCASTING"), //组内广播
CLUSTERING("CLUSTERING"); //组内负载均衡

消息类型

  1. 同步消息:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功),上述测试所使用的就是同步消息

  2. 异步消息:即时性较弱,但需要有回执的消息,例如订单中的某些信息(变相增加了吞吐量)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    //测试异步消息
    for (int i= 0; i< 20; i++) {
    String msg="--------------hello rocketmq-----------"+i; //String byte[] char[] 相互转换
    Message message = new Message("topic2", "tag1", msg.getBytes());
    producer.send(message, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    System.out.println(sendResult);
    }
    @Override
    public void onException(Throwable throwable) {
    System.out.println(throwable);
    }
    });


    }
    //一定不能关闭生产者(异步消息相当于开启一个新线程同主线程一起运行),如果主线程先关闭了生产者,那么就无法继续发送消息了!
    //producer.shutdown();
  3. 单向消息:生产者只管发消息,不需要回执,例如日志类消息

    1
    2
    3
    4
    public void setMessageModel(MessageModel messageModel) {
    this.messageModel = messageModel;
    }
    //从源码中可看出单向消息不需要返回值
  4. 延迟类消息:

    1
    2
    3
    4
    public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
    //根据源码可看出可以设置消息延迟时间的级别
  5. 批量消息:

    批量发送消息能显著提高传递小消息的性能,批量消息不能是延时消息发送批量消息如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    List<Message> msgList = new ArrayList<Message>();
    Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
    Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
    Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));

    msgList.add(msg1);
    msgList.add(msg2);
    msgList.add(msg3);


    SendResult result = producer.send(msgList);

sql过滤

  1. 首先修改broker.conf,

    (1)在文件末尾添加:enablPropertyFilter=true

    (2)windows下还需要更新broker配置:直接cmd中输入

    1
    mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
  2. 重启broker后,新建两条消息,然后设置sql过滤规则为age>22

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Product.java
    String msg1="--------------hello rocketmq 1-----------";
    Message message1 = new Message("topic5", "player",msg1.getBytes());
    message1.putUserProperty("age","20");
    message1.putUserProperty("name","haland");

    String msg2="--------------hello rocketmq 2-----------";
    Message message2 = new Message("topic5", "player",msg2.getBytes());
    message2.putUserProperty("age","24");
    message2.putUserProperty("name","mbappe");


    Consumer.java
    consumer.subscribe("topic5", MessageSelector.bySql("age>22"));

3.测试后发现,最后只有消息msg2会被消费者接收到

Springboot整合

  1. 首先导入依赖

    1
    2
    3
    4
    5
    6
    <!--引入rocketmq依赖-->
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
    </dependency>
  2. 修改application.yml文件:

1
2
3
4
5
6
rocketmq:
name-server: localhost:9876 #nameserver运行端口
producer:
group: group1
server:
port: 8081
  1. 编写生产者Controller:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@RestController
@RequestMapping("/mq")
public class SendController {
@Resource
RocketMQTemplate rocketMQTemplate;

@RequestMapping("/send")
public String send()
{
//模板类: redisTemplate、 restTemplate、jdbcTemplate
String msg="hello springboot +rocketmq";
rocketMQTemplate.convertAndSend("topic5",msg);
User user=new User("haland",20);
rocketMQTemplate.convertAndSend("topic5",user);
return "success";


//发送同步消息
SendResult sendResult = rocketMQTemplate.syncSend("topic5", user);

//发送异步消息
rocketMQTemplate.asyncSend("topic5", user, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}

@Override
public void onException(Throwable throwable) {
System.out.println(throwable);
}
},1000);

//发送单向消息
rocketMQTemplate.sendOneWay("topic5",user);
}
}

在postman输入请求http://localhost:8081/mq/send后,在网页输出了结果为”success“,以及若干User(username=haland, age=20)接收成功

  1. 编写消费者:
1
2
3
4
5
6
7
8
9
10
@Service  //容器一启动就会生成对象
@RocketMQMessageListener(topic = "topic5",consumerGroup = "group2",
selectorType = SelectorType.SQL92,selectorExpression = "age>18",
messageModel = MessageModel.BROADCASTING)
public class ConsumerService implements RocketMQListener<User> {
@Override
public void onMessage(User user) {
System.out.println(user+"接收成功"); // 接收到消息后进行的业务逻辑处理
}
}

再次发送localhost:8081/mq/send后,消费者会接收到消息,并在控制台输出User(username=haland, age=20)接收成功

消息有序处理

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析:在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的(每个队列都开启一个线程)